提交测试

master
wkyuu 3 years ago
commit 2608924bb2

@ -0,0 +1,186 @@
# milkSpider
selenium + redis + 分布式 + xpath + etree + 可视化
任务爬取京东网站上在售的各类牛奶品类的商品名称简介价格相关评论区相关。并给出相应的价格波动趋势精选好评用python的可视化展示。计划任务自动爬取。
![image-20220410095017421](README [Image]/image-20220410095017421.png)
![image-20220410095022817](README [Image]/image-20220410095022817.png)
## TODO
- [x] 初始化 selenium 框架,编写好相应的爬取规则,初步实现小规模爬取内容
- [ ] 考虑user-agentip池cookietoken实现更大规模爬取内容
- [ ] 考虑用词频分析去重,写入文件规划
- [ ] 从历史价格网页爬取历史价格,比对,给出价格波动趋势
- [ ] 加入Redis分布式设计
- [ ] 数据可视化
- [ ] 定时,自动化爬取
## project
### 项目目录
> Selesium
>
> > downloader.py 下载器,即爬取内容
> >
> > middlewares.py 配置分布式线程redis相关内容
> >
> > pipelines.py 处理得到的数据,存储到相应文件
> >
> > milkSpider.py 主文件,配置爬取设置,自动化等
> >
> > items.py 暂定
### selenium
配置下载器利用selenium模拟浏览器正常浏览行为
```python
# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from lxml import etree
def getsource(url):
init = Options()
init.add_argument('--no-sandbox')
init.add_argument('--headless')
init.add_argument('--disable-gpu')
init.add_argument("disable-cache")
init.add_argument('disable-infobars')
init.add_argument('log-level=3') # INFO = 0 WARNING = 1 LOG_ERROR = 2 LOG_FATAL = 3 default is 0
init.add_experimental_option("excludeSwitches",['enable-automation','enable-logging'])
driver = webdriver.Chrome(chrome_options = init)
driver.implicitly_wait(10)
driver.get(url)
response = etree.HTML(driver.page_source)
response = etree.tostring(response, encoding = "utf-8", pretty_print = True, method = "html")
response = response.decode('utf-8')
driver.close()
return response
```
## 安装,初始化
### GIT
```powershell
# 安装git
winget install --id Git.Git -e --source winget
## 或者官网下载
https://git-scm.com/download/win
git init
git add README.md
git remote add origin https://code.educoder.net/mf942lkca/milkSpider.git
git push -u origin master
```
### selenium
安装
```powershell
# 安装selenium
pip3 install selenium
# 查看配置信息
pip how selenium
```
调用时导入的内容
```python
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
chrome_options = Options()
chrome_options.add_argument('--headless') # 无界面
chrome_options.add_argument('--no-sandbox') # 解决DevToolsActivePort文件不存在报错问题
chrome_options.add_argument('--disable-gpu') # 禁用GPU硬件加速。如果软件渲染器没有就位则GPU进程将不会启动。
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--window-size=1920,1080') # 设置当前窗口的宽度和高度
driver = webdriver.Chrome('chromedriver',chrome_options=chrome_options)
#driver = webdriver.Chrome()
url = ""
driver.get(url)
print(driver.page_source)
driver.quit()
```
一些备忘录
```python
text = """this is test content;这是测试内容。"""
html1 = etree.HTML(text)
# html1 = etree.fromstring(text) # 同HTML()
# 方法1 使用html.unescape()
res = etree.tostring(html1)
print(html.unescape(res.decode('utf-8')))
# 方法2 使用uft-8编码
res = etree.tostring(html1,encoding="utf-8") # 这种方法对标签用的中文属性无效
print(res.decode('utf-8'))
# 方法1 使用open读取文档做字符串处理
with open('test.html') as f:
html1 = etree.HTML(f.read())
# 之后代码同 处理字符串 的两种方法
# 方法2 parse读取文档时指定编码方式
html1 = etree.parse('test.html',etree.HTMLParser(encoding='utf-8'))
# 这里要指定正确(与所读取文档相应的编码)的编码方式,不然后面会出现乱码
# 之后代码同 处理字符串 的两种方法
```
### ChromeDriver
下载 [ChromeDriver](https://chromedriver.chromium.org/home) 放到 python 根目录就行
### Redis
[介绍,配置](C:\Users\wkyuu\Desktop\my\SQL\Redis\Redis - NoSql高速缓存数据库.md)
```python
# 安装 redis 模块
## pip install redis
# 实例对象
redisconn = redis.Redis(host = '127.0.0.1', port = '6379', password = 'x', db = 0)
# redis 取出的结果默认是字节,我们可以设定 decode_responses=True 改成字符串
```
## 备注
在没有使用线程之前,完整跑完五个种类共(30 x 10 x 5 = 1500)条数据用时365s
## 参考链接
1[selenium+python自动化100-centos上搭建selenium启动chrome浏览器headless无界面模式](https://www.cnblogs.com/yoyoketang/p/11582012.html)
2[解决:'chromedriver' executable needs to be in PATH问题](https://www.cnblogs.com/Neeo/articles/13949854.html)
3[Python selenium-chrome禁用打印日志输出](https://blog.csdn.net/wm9028/article/details/107536929)
4[Python将list逐行读入到csv文件中](https://blog.csdn.net/weixin_41068770/article/details/103145660)
5[详解pandas的read_csv方法](https://www.cnblogs.com/traditional/p/12514914.html)
6[python 3 实现定义跨模块的全局变量和使用](https://codeantenna.com/a/9YbdOKrrSJ)
7[Python 多线程](https://www.runoob.com/python/python-multithreading.html)
8[Python redis 使用介绍](https://www.runoob.com/w3cnote/python-redis-intro.html)
9[python + redis 实现 分布式队列任务](https://cloud.tencent.com/developer/article/1697383)
10

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from lxml import etree
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36'
}
def getsource(url):
init = Options()
init.add_argument('--no-sandbox')
init.add_argument('--headless')
init.add_argument('--disable-gpu')
init.add_argument("disable-cache")
init.add_argument('disable-infobars')
init.add_argument('log-level=3') # INFO = 0 WARNING = 1 LOG_ERROR = 2 LOG_FATAL = 3 default is 0
init.add_experimental_option("excludeSwitches",['enable-automation','enable-logging'])
driver = webdriver.Chrome(chrome_options = init)
driver.implicitly_wait(10)
driver.get(url)
response = etree.HTML(driver.page_source)
response = etree.tostring(response, encoding = "utf-8", pretty_print = True, method = "html")
response = response.decode('utf-8')
driver.close()
return response

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
import settings
import pipelines
import downloader
import threading
import redis
import time
import os
# 全局设定
REDIS_HOST = settings.REDIS_HOST
REDIS_PORT = settings.REDIS_PORT
REDIS_PASSWORD = settings.REDIS_PASSWORD
REDIS_LISTNAME = settings.REDIS_LISTNAME
BASEURL = settings.BASEURL
FILENAME_CSV = settings.FILENAME_CSV
threadLock = threading.Lock()
flag = 1 # 判断主线程
connection_pool = redis.ConnectionPool(host = REDIS_HOST, port = REDIS_PORT, password = REDIS_PASSWORD, decode_responses = True)
redisconn = redis.Redis(connection_pool = connection_pool)
def getCategory(url) -> str:
for urlstr in BASEURL.items():
if urlstr[1] in url: return urlstr[0]
print("can't get a valid baseurl! Check your settings.BASEURL.")
exit()
def geturlList(baseurl) -> list:
urlList = []
for i in range(1, 20, 2): # 爬取10页
url = baseurl + r"&page=" + str(i)
urlList.append(url)
return urlList
def save2Redis(): # 自动获取 settings.py 中的url存入到redis中
for category in BASEURL.items():
for eachurl in geturlList(category[1]):
redisconn.rpush(REDIS_LISTNAME, eachurl)
print("Save the urls for '{}' to Redis queue has done.".format(category[0]))
def isNullRedis() -> bool: # 判断redis中待处理的url为空
if redisconn.llen(REDIS_LISTNAME) == 0: return True
else: return False
def precheck() -> bool: #
while redisconn.llen(REDIS_LISTNAME) == 0:
print("No queue was found!\nPush some urls to the queue using default settings.\nContinue [c] or Exit [q] ?")
check = str(input())
if check == 'c':
save2Redis()
return True
elif check == 'q':
print("Exit.")
exit()
else: print("invalid input!")
return True
def write2csv(category, response): # 写入csv文件
filename_csv = os.getcwd() + "\\Catalogues\\" + FILENAME_CSV.get(category)
pipelines.write2csv(response, filename_csv)
class milkSpider(threading.Thread):
def __init__(self, name, url):
threading.Thread.__init__(self)
self.name = name
self.url = url
self.category = getCategory(url)
self.response = downloader.getsource(self.url)
# self.response = ""
def run(self):
threadLock.acquire()
print("write2csv for '{}' will be started in 3 seconds....".format(self.url))
time.sleep(3)
write2csv(self.category, self.response)
print("{} is done.".format(self.name))
threadLock.release()
def mainThread(threadlines = 5, flag = flag): # 线程数默认为3
try:
threads = []
for index in range(1, threadlines + 1):
if isNullRedis():
print("Redis queue is empty, no more threads will be started")
flag = 0
break
name = "Thread[" + str(index) + "]"
print("{} started... {}/{}".format(name, str(index), threadlines))
url = redisconn.lpop(REDIS_LISTNAME)
athread = milkSpider(name, url)
athread.start()
threads.append(athread)
for thread in threads:
thread.join()
if flag == 1:
mainThread(threadlines, flag)
except BaseException as e:
print(e)
print("sth wrong in mainThread, check your Redis queue, main thread quit.")
exit()
if __name__ == '__main__':
if precheck():
mainThread()
print("done.")
# 以下是本地测试
def print2console(response): # 输出到命令行
pipelines.print2console(response)
def localtest(category): # 本地加载的源码测试
fileList = settings.getfileList(settings.FILEPATH.get(category))
page = 1
for filename in fileList:
print("↓↓↓↓↓↓↓↓↓↓\npage " + str(page) + " start at " + time.ctime())
print("正在爬取第 " + str(page) + " 页: " + filename)
response = pipelines.gethtml(filename, gethtml_mode = "cache") # 只用在这里设定一次就够了
write2csv(response)
print("page " + str(page) + " sleep at " + time.ctime())
time.sleep(10)
print("page " + str(page) + " sleep over at " + time.ctime())
page += 1

@ -0,0 +1,16 @@
# -*- coding: utf-8 -*-
import settings
import middlewares
import time
if __name__ == "__main__":
# category = "冰淇淋" # 要爬取的种类默认是牛奶详见settings.py
start_time = time.time()
for category in settings.BASEURL.keys():
middlewares.singleSpider(category)
end_time = time.time()
print(start_time - end_time)

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
from lxml import etree
import csv
import os
def gethtml(response, gethtml_mode = "url"): # 用etree格式化得到的对象
try:
if isElementTree(response):
return response
if gethtml_mode == "cache":
html = etree.parse(response, etree.HTMLParser(encoding='utf-8'))
elif gethtml_mode == "url":
html = etree.HTML(response)
else:
print("sth wrong with parameters in pipelines.gethtml(gethtml_mode)")
exit()
return html
except BaseException as e:
print(e)
print("sth wrong in pipelines.gethtml, see your settings in settings.py")
exit()
def getidlist(response) -> list: # 获取id
reg = r"//li/@data-sku"
if isElementTree(response):
html = response
else:
html = gethtml(response)
# print(html)
idlist = html.xpath(reg)
return idlist
def myreplace(name) -> str: # 简单的处理输出
name = name.strip()
return name
def isElementTree(response) -> bool: # 用于判断是否已经为etree对象
if str(type(response)) == "<class 'lxml.etree._ElementTree'>":
return True
else:
return False
class item:
def __init__(self, id, response):
self.id = id
self.response = response
def getitem(self) -> list:
reg = r"//li[@data-sku='" + str(self.id) + r"']"
item = self.response.xpath(reg)[0]
item = etree.tostring(item, encoding = 'utf-8', method = 'html').decode('utf-8')
def name() -> str:
reg = r"//div[@class='p-name p-name-type-3']/a/em/text()"
html = etree.HTML(item)
name = html.xpath(reg)[0]
name = myreplace(name)
# print(name)
return name
def price() -> str:
reg = r"//i[@data-price]/text()"
html = etree.HTML(item)
price = html.xpath(reg)[0]
price = str(price)
# print(price)
return price
def attribute() -> str:
reg = r"//span[@class='attr']/b/text()"
html = etree.HTML(item)
attribute = html.xpath(reg)
attrStr = ""
for attr in attribute:
attrStr += attr + ' '
# print(attribute)
return myreplace(attrStr)
def sales() -> str:
reg = r"//div[@class='p-icons']/i/text()"
html = etree.HTML(item)
sales = html.xpath(reg)
saleStr = ""
for sale in sales:
saleStr += sale + ' '
# print(sales)
return myreplace(saleStr)
def url() -> str:
url = r"https://item.jd.com/" + str(self.id) + r".html"
return url
itemlist = [str(self.id), name(), price(), attribute(), sales(), url()]
return itemlist
def print2console(response): # 输出到命令行
def output(itemlist = []):
print("商品id" + itemlist[0])
print("商品名称:" + itemlist[1])
print("价格:¥" + itemlist[2])
print("关键词:" + itemlist[3])
print("促销活动:" + itemlist[4])
print("商品链接:" + itemlist[5])
print("")
try:
for id in getidlist(response):
if int(id) < 1000:
continue
aitem = item(id, gethtml(response))
itemlist = aitem.getitem()
output(itemlist)
except BaseException as e:
print(e)
print("pipelines.py didn't work properly")
print("pipelines.print2console is done")
def write2csv(response, filename_csv): # 写入到csv文件
def write(writer):
for id in getidlist(response):
if int(id) < 1000:
continue
aitem = item(id, gethtml(response))
itemlist = aitem.getitem()
# print(itemlist)
writer.writerow(itemlist)
try:
if os.path.exists(filename_csv):
with open(filename_csv, 'a+', encoding = 'utf-8-sig', newline = '') as fd: # 存在,文件尾追加
try:
writer = csv.writer(fd)
except BaseException as e:
print(e)
write(writer)
fd.close()
else:
with open(filename_csv, 'w+', encoding = 'utf-8-sig', newline = '') as fd: # 不存在,创建并从文件头开始
try:
writer = csv.writer(fd)
except BaseException as e:
print(e)
headers = ['id', '商品名称', '价格(人民币)', '标签', '促销策略', 'url']
writer.writerow(headers)
write(writer)
fd.close()
# print("pipelines.write2csv is done")
except BaseException as e:
print(e)
print("sth wrong in pipelines.write2csv")
if __name__ == "__main__":
write2csv()

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
import os
# 修改要生成的文件名,下面的是默认,注意要用.csv结尾
FILENAME_CSV = {
"牛奶": "milk.csv",
"苹果": "apple.csv",
"橙子": "orange.csv",
"芒果": "mango.csv",
"冰淇淋": "iceCream.csv"
}
# 几个默认的爬取目录
BASEURL = {
'牛奶': 'https://list.jd.com/list.html?cat=1320,1585,9434', # ok
'苹果': 'https://list.jd.com/list.html?cat=12218,12221,13554', # ok
'橙子': 'https://list.jd.com/list.html?cat=12218,12221,13555', # ok
'芒果': 'https://list.jd.com/list.html?cat=12218,12221,13558', # ok
'冰淇淋': 'https://list.jd.com/list.html?cat=12218,13598,13603' # ok
}
FILEPATH = {
'牛奶': os.getcwd() + '\\1320,1585,9434\\1320,1585,9434',
}
# REDIS 相关配置
REDIS_HOST = '159.75.135.137'
REDIS_PORT = '6379'
REDIS_PASSWORD = 'root'
REDIS_LISTNAME = "urlList"
Loading…
Cancel
Save