| class Smoking: ?"""定义Smoking类""" ?# 浏览器驱动 ?__driver: webdriver = None ?# 配置帮助类 ?__cfg_info: dict = {} ?# 日志帮助类 ?__log_helper: LogHelper = None ?# 主程序目录 ?__work_path: str = '' ?# 是否正在运行 ?__running: bool = False ?# 无货 ?__no_stock = 'Currently Out of Stock' ?# 线程等待秒数 ?__wait_sec = 2 ?def __init__(self, work_path, cfg_info, log_helper: LogHelper): ??"""初始化""" ??self.__cfg_info = cfg_info ??self.__log_helper = log_helper ??self.__work_path = work_path ??self.__wait_sec = int(cfg_info['wait_sec']) ??# 如果小于2,则等于2 ??self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec) ?def checkIsExistsById(self, id): ??"""通过ID判断是否存在""" ??try: ???i = 0 ???while self.__running and i < 3: ????if len(self.__driver.find_elements_by_id(id)) > 0: ?????break ????else: ?????time.sleep(self.__wait_sec) ?????i = i + 1 ???return len(self.__driver.find_elements_by_id(id)) > 0 ??except BaseException as e: ???return False ?def checkIsExistsByName(self, name): ??"""通过名称判断是否存在""" ??try: ???i = 0 ???while self.__running and i < 3: ????if len(self.__driver.find_elements_by_name(name)) > 0: ?????break ????else: ?????time.sleep(self.__wait_sec) ?????i = i + 1 ???return len(self.__driver.find_elements_by_name(name)) > 0 ??except BaseException as e: ???return False ?def checkIsExistsByPath(self, path): ??"""通过xpath判断是否存在""" ??try: ???i = 0 ???while self.__running and i < 3: ????if len(self.__driver.find_elements_by_xpath(path)) > 0: ?????break ????else: ?????time.sleep(self.__wait_sec) ?????i = i + 1 ???return len(self.__driver.find_elements_by_xpath(path)) > 0 ??except BaseException as e: ???return False ?def checkIsExistsByClass(self, cls): ??"""通过class名称判断是否存在""" ??try: ???i = 0 ???while self.__running and i < 3: ????if len(self.__driver.find_elements_by_class_name(cls)) > 0: ?????break ????else: ?????time.sleep(self.__wait_sec) ?????i = i + 1 ???return len(self.__driver.find_elements_by_class_name(cls)) > 0 ??except BaseException as e: ???return False ?def checkIsExistsByLinkText(self, link_text): ??"""判断LinkText是否存在""" ??try: ???i = 0 ???while self.__running and i < 3: ????if len(self.__driver.find_elements_by_link_text(link_text)) > 0: ?????break ????else: ?????time.sleep(self.__wait_sec) ?????i = i + 1 ???return len(self.__driver.find_elements_by_link_text(link_text)) > 0 ??except BaseException as e: ???return False ?def checkIsExistsByPartialLinkText(self, link_text): ??"""判断包含LinkText是否存在""" ??try: ???i = 0 ???while self.__running and i < 3: ????if len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0: ?????break ????else: ?????time.sleep(self.__wait_sec) ?????i = i + 1 ???return len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0 ??except BaseException as e: ???return False ?# def waiting(self, *locator): ?#? """等待完成""" ?#? # self.__driver.switch_to.window(self.__driver.window_handles[1]) ?#? Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator)) ?def login(self, username, password): ??"""登录""" ??# 5. 点击链接跳转到登录页面 ??self.__driver.find_element_by_link_text('账户登录').click() ??# 6. 输入账号密码 ??# 判断是否加载完成 ??# self.waiting((By.ID, "email")) ??if self.checkIsExistsById('email'): ???self.__driver.find_element_by_id('email').send_keys(username) ???self.__driver.find_element_by_id('password').send_keys(password) ???# 7. 点击登录按钮 ???self.__driver.find_element_by_id('sign-in').click() ?def working(self, item_id): ??"""工作状态""" ??while self.__running: ???try: ????# 正常获取信息 ????if self.checkIsExistsById('string'): ?????self.__driver.find_element_by_id('string').clear() ?????self.__driver.find_element_by_id('string').send_keys(item_id) ?????self.__driver.find_element_by_id('string').send_keys(Keys.ENTER) ????# 判断是否查询到商品 ????xpath = "//div[@class='specialty-header search']/div[@class='specialty-description']/div[" \ ??????"@class='gt-450']/span[2] " ????if self.checkIsExistsByPath(xpath): ?????count = int(self.__driver.find_element_by_xpath(xpath).text) ?????if count < 1: ??????time.sleep(self.__wait_sec) ??????self.__log_helper.put('没有查询到item id =' + item_id + '对应的信息') ??????continue ????else: ?????time.sleep(self.__wait_sec) ?????self.__log_helper.put('没有查询到item id2 =' + item_id + '对应的信息') ?????continue ????# 判断当前库存是否有货 ????xpath1 = "//div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \ ???????"@class='price']/span[@class='noStock'] " ????if self.checkIsExistsByPath(xpath1): ?????txt = self.__driver.find_element_by_xpath(xpath1).text ?????if txt == self.__no_stock: ??????# 当前无货 ??????time.sleep(self.__wait_sec) ??????self.__log_helper.put('查询一次' + item_id + ',无货') ??????continue ????# 链接path1 ????xpath2 = "//div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a" ????# 判断是否加载完毕 ????# self.waiting((By.CLASS_NAME, "imgDiv")) ????if self.checkIsExistsByPath(xpath2): ?????self.__driver.find_element_by_xpath(xpath2).click() ?????time.sleep(self.__wait_sec) ?????# 加入购物车 ?????if self.checkIsExistsByClass('add-to-cart'): ??????self.__driver.find_element_by_class_name('add-to-cart').click() ??????self.__log_helper.put('加入购物车成功,商品item-id:' + item_id) ??????break ?????else: ??????self.__log_helper.put('未找到加入购物车按钮') ????else: ?????self.__log_helper.put('没有查询到,可能是商品编码不对,或者已下架') ???except BaseException as e: ????self.__log_helper.put(e) ?def startRun(self): ??"""运行起来""" ??try: ???self.__running = True ???url: str = self.__cfg_info['url'] ???username = self.__cfg_info['username'] ???password = self.__cfg_info['password'] ???item_id = self.__cfg_info['item_id'] ???if url is None or len(url) == 0 or username is None or len(username) == 0 or password is None or len( ?????password) == 0 or item_id is None or len(item_id) == 0: ????self.__log_helper.put('配置信息不全,请检查config.cfg文件是否为空,然后再重启') ????return ???if self.__driver is None: ????options = webdriver.IeOptions() ????options.add_argument('encoding=UTF-8') ????options.add_argument('Accept= text / css, * / *') ????options.add_argument('Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5') ????options.add_argument('Accept - Encoding= gzip, deflate') ????options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko') ????# 2. 定义浏览器驱动对象 ????self.__driver = webdriver.Ie(executable_path=self.__work_path + r'\IEDriverServer.exe', options=options) ???self.run(url, username, password, item_id) ??except BaseException as e: ???self.__log_helper.put('运行过程中出错,请重新打开再试') ?def run(self, url, username, password, item_id): ??"""运行起来""" ??# 3. 访问网站 ??self.__driver.get(url) ??# 4. 最大化窗口 ??self.__driver.maximize_window() ??if self.checkIsExistsByLinkText('账户登录'): ???# 判断是否登录:未登录 ???self.login(username, password) ??if self.checkIsExistsByPartialLinkText('欢迎回来'): ???# 判断是否登录:已登录 ???self.__log_helper.put('登录成功,下一步开始工作了') ???self.working(item_id) ??else: ???self.__log_helper.put('登录失败,请设置账号密码') ?def stop(self): ??"""停止""" ??try: ???self.__running = False ???# 如果驱动不为空,则关闭 ???self.close_browser_nicely(self.__driver) ???if self.__driver is not None: ????self.__driver.quit() ????# 关闭后切要为None,否则启动报错 ????self.__driver = None ??except BaseException as e: ???print('Stop Failure') ??finally: ???self.__driver = None ?def close_browser_nicely(self, browser): ??try: ???browser.execute_script("window.onunload=null; window.onbeforeunload=null") ??except Exception as err: ???print("Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'") ??socket.setdefaulttimeout(10) ??try: ???browser.quit() ???print("Close browser and firefox by calling quit()") ??except Exception as err: ???print("Fail to quit from browser, error-type:%s, reason:%s" % (type(err), str(err))) ??socket.setdefaulttimeout(30) |