You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
4.4 KiB

3 years ago
# -*- coding: utf-8 -*-
import settings
import pipelines
import downloader
import threading
import redis
import time
import os
# 全局设定
REDIS_HOST = settings.REDIS_HOST
REDIS_PORT = settings.REDIS_PORT
REDIS_PASSWORD = settings.REDIS_PASSWORD
REDIS_LISTNAME = settings.REDIS_LISTNAME
BASEURL = settings.BASEURL
FILENAME_CSV = settings.FILENAME_CSV
threadLock = threading.Lock()
flag = 1 # 判断主线程
connection_pool = redis.ConnectionPool(host = REDIS_HOST, port = REDIS_PORT, password = REDIS_PASSWORD, decode_responses = True)
redisconn = redis.Redis(connection_pool = connection_pool)
def getCategory(url) -> str:
for urlstr in BASEURL.items():
if urlstr[1] in url: return urlstr[0]
print("can't get a valid baseurl! Check your settings.BASEURL.")
exit()
def geturlList(baseurl) -> list:
urlList = []
for i in range(1, 20, 2): # 爬取10页
url = baseurl + r"&page=" + str(i)
urlList.append(url)
return urlList
def save2Redis(): # 自动获取 settings.py 中的url存入到redis中
for category in BASEURL.items():
for eachurl in geturlList(category[1]):
redisconn.rpush(REDIS_LISTNAME, eachurl)
print("Save the urls for '{}' to Redis queue has done.".format(category[0]))
def isNullRedis() -> bool: # 判断redis中待处理的url为空
if redisconn.llen(REDIS_LISTNAME) == 0: return True
else: return False
def precheck() -> bool: # 检查redis队列情况
3 years ago
while redisconn.llen(REDIS_LISTNAME) == 0:
print("No queue was found!\nPush some urls to the queue using default settings.\nContinue [c] or Exit [q] ?")
check = str(input())
if check == 'c':
save2Redis()
return True
elif check == 'q':
print("Exit.")
exit()
else: print("invalid input!")
return True
def clearRedis(): # 用于清空Redis队列
while not isNullRedis():
redisconn.lpop(REDIS_LISTNAME)
print("Redis queue has cleared.")
3 years ago
def write2csv(category, response): # 写入csv文件
filename_csv = os.getcwd() + "\\Catalogues\\" + FILENAME_CSV.get(category)
pipelines.write2csv(response, filename_csv)
class milkSpider(threading.Thread):
def __init__(self, name, url):
threading.Thread.__init__(self)
self.name = name
self.url = url
3 years ago
self.category = getCategory(url)
# self.response = ""
def run(self):
self.response = downloader.getsource(self.url)
3 years ago
threadLock.acquire()
# print("write2csv for '{}' will be started in 3 seconds....".format(self.url))
print("write2csv for '{}' was started.".format(self.url))
# time.sleep(3)
3 years ago
write2csv(self.category, self.response)
print("{} is done.".format(self.name))
threadLock.release()
def mainThread(threadlines = 16, flag = flag): # 线程数默认为3
3 years ago
try:
threads = []
for index in range(1, threadlines + 1):
if isNullRedis():
print("Redis queue is empty, no more threads will be started")
flag = 0
break
name = "Thread[" + str(index) + "]"
print("{} started... {}/{}".format(name, str(index), threadlines))
url = redisconn.lpop(REDIS_LISTNAME)
athread = milkSpider(name, url)
athread.start()
threads.append(athread)
for thread in threads:
thread.join()
if flag == 1:
mainThread(threadlines, flag)
except BaseException as e:
print(e)
print("sth wrong in mainThread, check your Redis queue, main thread quit.")
exit()
if __name__ == '__main__':
clearRedis()
3 years ago
# 以下是本地测试
def print2console(response): # 输出到命令行
pipelines.print2console(response)
def localtest(category): # 本地加载的源码测试
fileList = settings.getfileList(settings.FILEPATH.get(category))
page = 1
for filename in fileList:
print("↓↓↓↓↓↓↓↓↓↓\npage " + str(page) + " start at " + time.ctime())
print("正在爬取第 " + str(page) + " 页: " + filename)
response = pipelines.gethtml(filename, gethtml_mode = "cache") # 只用在这里设定一次就够了
write2csv(response)
print("page " + str(page) + " sleep at " + time.ctime())
time.sleep(10)
print("page " + str(page) + " sleep over at " + time.ctime())
page += 1