class Smoking: ? """定义Smoking类""" ? # 浏览器驱动 ? __driver: webdriver = None ? # 配置帮助类 ? __cfg_info: dict = {} ? # 日志帮助类 ? __log_helper: LogHelper = None ? # 主程序目录 ? __work_path: str = '' ? # 是否正在运行 ? __running: bool = False ? # 无货 ? __no_stock = 'Currently Out of Stock' ? # 线程等待秒数 ? __wait_sec = 2 ? def __init__( self , work_path, cfg_info, log_helper: LogHelper): ?? """初始化""" ?? self .__cfg_info = cfg_info ?? self .__log_helper = log_helper ?? self .__work_path = work_path ?? self .__wait_sec = int (cfg_info[ 'wait_sec' ]) ?? # 如果小于2,则等于2 ?? self .__wait_sec = ( 2 if self .__wait_sec < 2 else self .__wait_sec) ? def checkIsExistsById( self , id ): ?? """通过ID判断是否存在""" ?? try : ??? i = 0 ??? while self .__running and i < 3 : ???? if len ( self .__driver.find_elements_by_id( id )) > 0 : ????? break ???? else : ????? time.sleep( self .__wait_sec) ????? i = i + 1 ??? return len ( self .__driver.find_elements_by_id( id )) > 0 ?? except BaseException as e: ??? return False ? def checkIsExistsByName( self , name): ?? """通过名称判断是否存在""" ?? try : ??? i = 0 ??? while self .__running and i < 3 : ???? if len ( self .__driver.find_elements_by_name(name)) > 0 : ????? break ???? else : ????? time.sleep( self .__wait_sec) ????? i = i + 1 ??? return len ( self .__driver.find_elements_by_name(name)) > 0 ?? except BaseException as e: ??? return False ? def checkIsExistsByPath( self , path): ?? """通过xpath判断是否存在""" ?? try : ??? i = 0 ??? while self .__running and i < 3 : ???? if len ( self .__driver.find_elements_by_xpath(path)) > 0 : ????? break ???? else : ????? time.sleep( self .__wait_sec) ????? i = i + 1 ??? return len ( self .__driver.find_elements_by_xpath(path)) > 0 ?? except BaseException as e: ??? return False ? def checkIsExistsByClass( self , cls ): ?? """通过class名称判断是否存在""" ?? try : ??? i = 0 ??? while self .__running and i < 3 : ???? if len ( self .__driver.find_elements_by_class_name( cls )) > 0 : ????? break ???? else : ????? time.sleep( self .__wait_sec) ????? i = i + 1 ??? return len ( self .__driver.find_elements_by_class_name( cls )) > 0 ?? except BaseException as e: ??? return False ? def checkIsExistsByLinkText( self , link_text): ?? """判断LinkText是否存在""" ?? try : ??? i = 0 ??? while self .__running and i < 3 : ???? if len ( self .__driver.find_elements_by_link_text(link_text)) > 0 : ????? break ???? else : ????? time.sleep( self .__wait_sec) ????? i = i + 1 ??? return len ( self .__driver.find_elements_by_link_text(link_text)) > 0 ?? except BaseException as e: ??? return False ? def checkIsExistsByPartialLinkText( self , link_text): ?? """判断包含LinkText是否存在""" ?? try : ??? i = 0 ??? while self .__running and i < 3 : ???? if len ( self .__driver.find_elements_by_partial_link_text(link_text)) > 0 : ????? break ???? else : ????? time.sleep( self .__wait_sec) ????? i = i + 1 ??? return len ( self .__driver.find_elements_by_partial_link_text(link_text)) > 0 ?? except BaseException as e: ??? return False ? # def waiting(self, *locator): ? #? """等待完成""" ? #? # self.__driver.switch_to.window(self.__driver.window_handles[1]) ? #? Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator)) ? def login( self , username, password): ?? """登录""" ?? # 5. 点击链接跳转到登录页面 ?? self .__driver.find_element_by_link_text( '账户登录' ).click() ?? # 6. 输入账号密码 ?? # 判断是否加载完成 ?? # self.waiting((By.ID, "email")) ?? if self .checkIsExistsById( 'email' ): ??? self .__driver.find_element_by_id( 'email' ).send_keys(username) ??? self .__driver.find_element_by_id( 'password' ).send_keys(password) ??? # 7. 点击登录按钮 ??? self .__driver.find_element_by_id( 'sign-in' ).click() ? def working( self , item_id): ?? """工作状态""" ?? while self .__running: ??? try : ???? # 正常获取信息 ???? if self .checkIsExistsById( 'string' ): ????? self .__driver.find_element_by_id( 'string' ).clear() ????? self .__driver.find_element_by_id( 'string' ).send_keys(item_id) ????? self .__driver.find_element_by_id( 'string' ).send_keys(Keys.ENTER) ???? # 判断是否查询到商品 ???? xpath = "//div[@class='specialty-header search']/div[@class='specialty-description']/div[" \ ?????? "@class='gt-450']/span[2] " ???? if self .checkIsExistsByPath(xpath): ????? count = int ( self .__driver.find_element_by_xpath(xpath).text) ????? if count < 1 : ?????? time.sleep( self .__wait_sec) ?????? self .__log_helper.put( '没有查询到item id =' + item_id + '对应的信息' ) ?????? continue ???? else : ????? time.sleep( self .__wait_sec) ????? self .__log_helper.put( '没有查询到item id2 =' + item_id + '对应的信息' ) ????? continue ???? # 判断当前库存是否有货 ???? xpath1 = "//div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \ ??????? "@class='price']/span[@class='noStock'] " ???? if self .checkIsExistsByPath(xpath1): ????? txt = self .__driver.find_element_by_xpath(xpath1).text ????? if txt = = self .__no_stock: ?????? # 当前无货 ?????? time.sleep( self .__wait_sec) ?????? self .__log_helper.put( '查询一次' + item_id + ',无货' ) ?????? continue ???? # 链接path1 ???? xpath2 = "//div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a" ???? # 判断是否加载完毕 ???? # self.waiting((By.CLASS_NAME, "imgDiv")) ???? if self .checkIsExistsByPath(xpath2): ????? self .__driver.find_element_by_xpath(xpath2).click() ????? time.sleep( self .__wait_sec) ????? # 加入购物车 ????? if self .checkIsExistsByClass( 'add-to-cart' ): ?????? self .__driver.find_element_by_class_name( 'add-to-cart' ).click() ?????? self .__log_helper.put( '加入购物车成功,商品item-id:' + item_id) ?????? break ????? else : ?????? self .__log_helper.put( '未找到加入购物车按钮' ) ???? else : ????? self .__log_helper.put( '没有查询到,可能是商品编码不对,或者已下架' ) ??? except BaseException as e: ???? self .__log_helper.put(e) ? def startRun( self ): ?? """运行起来""" ?? try : ??? self .__running = True ??? url: str = self .__cfg_info[ 'url' ] ??? username = self .__cfg_info[ 'username' ] ??? password = self .__cfg_info[ 'password' ] ??? item_id = self .__cfg_info[ 'item_id' ] ??? if url is None or len (url) = = 0 or username is None or len (username) = = 0 or password is None or len ( ????? password) = = 0 or item_id is None or len (item_id) = = 0 : ???? self .__log_helper.put( '配置信息不全,请检查config.cfg文件是否为空,然后再重启' ) ???? return ??? if self .__driver is None : ???? options = webdriver.IeOptions() ???? options.add_argument( 'encoding=UTF-8' ) ???? options.add_argument( 'Accept= text / css, * / *' ) ???? options.add_argument( 'Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5' ) ???? options.add_argument( 'Accept - Encoding= gzip, deflate' ) ???? options.add_argument( 'user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko' ) ???? # 2. 定义浏览器驱动对象 ???? self .__driver = webdriver.Ie(executable_path = self .__work_path + r '\IEDriverServer.exe' , options = options) ??? self .run(url, username, password, item_id) ?? except BaseException as e: ??? self .__log_helper.put( '运行过程中出错,请重新打开再试' ) ? def run( self , url, username, password, item_id): ?? """运行起来""" ?? # 3. 访问网站 ?? self .__driver.get(url) ?? # 4. 最大化窗口 ?? self .__driver.maximize_window() ?? if self .checkIsExistsByLinkText( '账户登录' ): ??? # 判断是否登录:未登录 ??? self .login(username, password) ?? if self .checkIsExistsByPartialLinkText( '欢迎回来' ): ??? # 判断是否登录:已登录 ??? self .__log_helper.put( '登录成功,下一步开始工作了' ) ??? self .working(item_id) ?? else : ??? self .__log_helper.put( '登录失败,请设置账号密码' ) ? def stop( self ): ?? """停止""" ?? try : ??? self .__running = False ??? # 如果驱动不为空,则关闭 ??? self .close_browser_nicely( self .__driver) ??? if self .__driver is not None : ???? self .__driver.quit() ???? # 关闭后切要为None,否则启动报错 ???? self .__driver = None ?? except BaseException as e: ??? print ( 'Stop Failure' ) ?? finally : ??? self .__driver = None ? def close_browser_nicely( self , browser): ?? try : ??? browser.execute_script( "window.onunload=null; window.onbeforeunload=null" ) ?? except Exception as err: ??? print ( "Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'" ) ?? socket.setdefaulttimeout( 10 ) ?? try : ??? browser.quit() ??? print ( "Close browser and firefox by calling quit()" ) ?? except Exception as err: ??? print ( "Fail to quit from browser, error-type:%s, reason:%s" % ( type (err), str (err))) ?? socket.setdefaulttimeout( 30 ) |