# -*- coding: utf-8 -*- import os from concurrent.futures import ThreadPoolExecutor from lxml import etree import historyPrice def gethtml(response, gethtml_mode = "url"): # 用etree格式化得到的对象 try: if isElementTree(response): return response if gethtml_mode == "cache": html = etree.parse(response, etree.HTMLParser(encoding = 'utf-8')) elif gethtml_mode == "url": html = etree.HTML(response) else: print("sth wrong with parameters in pipelines.gethtml(gethtml_mode)") exit() return html except BaseException as e: print(e) print("sth wrong in pipelines.gethtml, see your settings in settings.py") exit() def getidlist(response) -> list: # 获取id reg = r"//li/@data-sku" if isElementTree(response): html = response else: html = gethtml(response) # print(html) idlist = html.xpath(reg) return idlist def myreplace(text, mode = '') -> str: # 简单的处理输出 if mode == 'all': return text.strip().replace(' ', '').replace("\r\n", '').replace('\r', '').replace('\n', '') elif mode == 'strip': return text.strip().replace('\r', '') elif mode == 'n': return text.replace('\n', '') else: return text.strip() def isElementTree(response) -> bool: # 用于判断是否已经为etree对象 if str(type(response)) == "": return True else: return False class item: def __init__(self, id, response): self.id = id self.response = response def getitem(self) -> str: reg = r"//li[@data-sku='" + str(self.id) + r"']" item = self.response.xpath(reg)[0] item = etree.tostring(item, encoding = 'utf-8', method = 'html').decode('utf-8') def name() -> str: string = 'p-name p-name-type-3' if check(string) == False: # 用于判断有无名字 return '' reg = r"//div[@class='p-name p-name-type-3']/a/em/text()" html = etree.HTML(item) name = html.xpath(reg)[0] name = myreplace(name) # print(name) return name def shop() -> str: string = 'curr-shop hd-shopname' if check(string) == False: # 用于判断有无商店信息 return '' reg = "//a[@class='curr-shop hd-shopname']/text()" html = etree.HTML(item) shop = html.xpath(reg)[0] shop = myreplace(shop) # print(shop) return shop def price() -> str: string = 'data-price' if check(string) == False: # 用于判断有无价格信息 return '' reg = r"//i[@data-price]/text()" html = etree.HTML(item) price = html.xpath(reg)[0] price = str(price) # print(price) return price def attribute() -> str: string = 'attr' if check(string) == False: # 用于判断有无标签 return '' reg = r"//span[@class='attr']/b/text()" html = etree.HTML(item) attribute = html.xpath(reg) attrStr = "" for attr in attribute: attrStr += attr + ' ' # print(attribute) return myreplace(attrStr) def sales() -> str: string = 'p-icons' if check(string) == False: # 用于判断有无促销信息 return '' reg = r"//div[@class='p-icons']/i/text()" html = etree.HTML(item) sales = html.xpath(reg) saleStr = "" for sale in sales: saleStr += sale + ' ' # print(sales) return myreplace(saleStr) def url() -> str: url = r"https://item.jd.com/" + str(self.id) + r".html" return url def check(string, item = item) -> bool: if string in item: return True elif not string in item: return False historyPriceItem = historyPrice.historyPriceItem(self.id) priceHistoryList = historyPriceItem.gethistoryPrice() # print("id = {}, list = {}".format(self.id, priceHistoryList[3])) # itemList = [str(self.id), name(), price(), priceHistoryList[0], shop(), priceHistoryList[1], attribute(), sales(), url(), priceHistoryList[2], priceHistoryList[3]] itemString = ("{},{},{},{},{},{},{},{},{},{},{}\n".format(str(self.id), name(), price(), priceHistoryList[0], shop(), priceHistoryList[1], attribute(), sales(), url(), priceHistoryList[2], priceHistoryList[3])) # print(itemString) return itemString def print2console(response): # 输出到命令行 def output(itemlist = []): print("商品id:" + itemlist[0]) print("商品名称:" + itemlist[1]) print("价格:¥" + itemlist[2]) print("关键词:" + itemlist[3]) print("促销活动:" + itemlist[4]) print("商品链接:" + itemlist[5]) print("") try: for id in getidlist(response): if int(id) < 1000: continue aitem = item(id, gethtml(response)) itemlist = aitem.getitem() output(itemlist) except BaseException as e: print(e) print("pipelines.py didn't work properly") print("pipelines.print2console is done") def write2csv(response, filename_csv): # 写入到csv文件 def writer(fd): with ThreadPoolExecutor(max_workers = 8) as thread: for id in getidlist(response): if int(id) < 1000: continue aitem = item(id, gethtml(response)) task = thread.submit(aitem.getitem) itemString = task.result() print(itemString) fd.write(itemString) try: dir = "Catalogues" if not os.path.exists(dir): os.mkdir(dir) if os.path.exists(filename_csv): with open(filename_csv, 'a+', encoding = 'utf-8-sig') as fd: # 存在,文件尾追加 writer(fd) else: with open(filename_csv, 'w+', encoding = 'utf-8-sig') as fd: # 不存在,创建并从文件头开始 headers = "id,商品名称,价格(人民币),评论数量(条),商铺名称,商品类别,标签,促销策略,url,价格数据更新时间,历史价格趋势\n" fd.write(headers) writer(fd) except BaseException as e: print(e) print("sth wrong in pipelines.write2csv") if __name__ == "__main__": pass ''' # 调试 filename_csv = os.getcwd() + '\\' + "milk.csv" response = './1320,1585,9434/1320,1585,9434&page=1.html' res = gethtml(response, gethtml_mode = 'cache') write2csv(res, filename_csv) ''' ''' # 调试数据 import pipelines from lxml import etree response = './1320,1585,9434/index.html' # 文件名 or url html = pipelines.gethtml(response, gethtml_mode = 'cache') # cache or url id = '3742086' aitem = pipelines.item(id, html) a = aitem.getitem() import historyPrice bitem = historyPrice.historyPriceItem(id) b = bitem.gethistoryPrice() ################ bitem.response # 查看requests返回的sources item = bitem.response.xpath(reg)[0] item = etree.tostring(item, encoding = 'utf-8', method = 'html').decode('utf-8') itemList = a + b ''' ''' # Xpath 调试 from pipelines import * response = './1320,1585,9434/index.html' response = gethtml(response, gethtml_mode = 'cache') id = '3742086' reg = r"//li[@data-sku='" + str(id) + r"']" item = etree.tostring(item, encoding = 'utf-8', method = 'html').decode('utf-8') reg = "//div[@class='p-shop']/span/a/text()" html = etree.HTML(item) name = html.xpath(reg)[0] '''