diff --git a/README.md b/README.md index 2dbb934..8603ef4 100644 --- a/README.md +++ b/README.md @@ -192,6 +192,88 @@ import requests ``` +### 线程 + +多线程,手动版 + +```python +import threading +import time + +threadlines = 16 # 默认调用16个线程,不要超过20 +flag = 1 # 判断主线程 + +def printTime(name): + print("name", time.ctime()) + delay(4) + print("name", time.ctime()) + +threads = [] +for thread in range(threadlines): + name = "thread " + str(thread) + athread = printTime(name) + athread.start() + threads.append(athread) + +for thread in threads: # 加入阻塞,在子线程没完全结束前,保证主线程不断 + thread.join() +``` + +线程锁 + +```python +import threading +import time + +threadLock = threading.Lock() +threadlines = 16 # 默认调用16个线程,不要超过20 +flag = 1 # 判断主线程 + +def printTime(name): + print("name", time.ctime()) + delay(4) + print("newname", time.ctime()) + newtime = str(time.ctime()) + threadLock.acquire() # 获得对txt文件的锁(独享操作权限) + write2txt(newname) + threadLock.release() # 释放锁(把独享权限让出) + +def write2txt(name): + with open('test.txt', 'a+', encoding = 'utf-8') as fd: + fd.write(name) + +threads = [] +for thread in range(4): + name = "thread " + str(thread) + athread = printTime(name) + athread.start() + threads.append(athread) + +for thread in threads: # 加入阻塞,在子线程没完全结束前,保证主线程不断 + thread.join() +``` + +线程池,建议用 + +```python +from concurrent.futures import ThreadPoolExecutor +import time + +def printTime(name): + print("name", time.ctime()) + delay(4) + print("newname", time.ctime()) + +with ThreadPoolExecutor(max_workers = 10) as thread: + for count in range(10): + name = "thread" + str(count) + task = thread.submit(printTime, (name)) # 传入函数和对应需要的参数 + print(task.done()) # 查看该线程是否完成,bool + print(task.result()) # 返回上面 printTime 函数的返回值 +``` + + + ### Redis ```python @@ -250,4 +332,6 @@ redisconn = redis.Redis(host = '127.0.0.1', port = '6379', password = 'x', db = 17,[Python字符串操作之字符串分割与组合](https://blog.csdn.net/seetheworld518/article/details/47346527) -18, \ No newline at end of file +18,[python线程池](https://www.cnblogs.com/liyuanhong/p/15767817.html) + +19, \ No newline at end of file diff --git a/historyPrice.py b/historyPrice.py index da867b1..e4497b8 100644 --- a/historyPrice.py +++ b/historyPrice.py @@ -39,7 +39,7 @@ class historyPriceItem: return str(tags[5:]) def updateTime() -> str: - string = 'timeline-text' + string = 'p3' if check(string) == False: # 用于判断有无数据更新时间记录 return '' reg = r"//div[@class='p3']/p[@class='tips']/text()" diff --git a/middlewares.py b/middlewares.py index a0b5fa6..1057bd8 100644 --- a/middlewares.py +++ b/middlewares.py @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- +from concurrent.futures import ThreadPoolExecutor import settings import pipelines import downloader -import threading import redis import time import os @@ -16,9 +16,6 @@ REDIS_LISTNAME = settings.REDIS_LISTNAME BASEURL = settings.BASEURL FILENAME_CSV = settings.FILENAME_CSV -threadLock = threading.Lock() -threadlines = 16 # 默认调用16个线程,不要超过20 -flag = 1 # 判断主线程 connection_pool = redis.ConnectionPool(host = REDIS_HOST, port = REDIS_PORT, password = REDIS_PASSWORD, decode_responses = True) redisconn = redis.Redis(connection_pool = connection_pool) @@ -67,42 +64,26 @@ def write2csv(category, response): # 写入csv文件 filename_csv = os.getcwd() + "\\Catalogues\\" + FILENAME_CSV.get(category) pipelines.write2csv(response, filename_csv) -class milkSpider(threading.Thread): - def __init__(self, name, url): - threading.Thread.__init__(self) - self.name = name +class milkSpider: + def __init__(self, url): self.url = url self.category = getCategory(url) - # self.response = "" - def run(self): + def go(self): self.response = downloader.getsource(self.url) - threadLock.acquire() print("write2csv for '{}' was started.".format(self.url)) write2csv(self.category, self.response) - print("{} is done.".format(self.name)) - threadLock.release() -def mainThread(threadlines = threadlines, flag = flag): # 线程数默认为3 +def mainThread(): try: - threads = [] - for index in range(1, threadlines + 1): - if isNullRedis(): - print("Redis queue is empty, no more threads will be started") - flag = 0 - break - name = "Thread[" + str(index) + "]" - print("{} started... {}/{}".format(name, str(index), threadlines)) - url = redisconn.lpop(REDIS_LISTNAME) - athread = milkSpider(name, url) - athread.start() - threads.append(athread) - - for thread in threads: - thread.join() - - if flag == 1: - mainThread(threadlines, flag) + with ThreadPoolExecutor(max_workers = 8) as thread: + while True: + if isNullRedis(): + print("Redis queue is empty, no more threads will be started") + break + url = redisconn.lpop(REDIS_LISTNAME) + aSpider = milkSpider(url) + thread.submit(aSpider.go) except BaseException as e: print(e) diff --git a/pipelines.py b/pipelines.py index 16b1552..de5021b 100644 --- a/pipelines.py +++ b/pipelines.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from lxml import etree +from concurrent.futures import ThreadPoolExecutor import os import historyPrice @@ -159,12 +160,15 @@ def print2console(response): # 输出到命令行 def write2csv(response, filename_csv): # 写入到csv文件 def writer(fd): - for id in getidlist(response): - if int(id) < 1000: - continue - aitem = item(id, gethtml(response)) - itemString = aitem.getitem() - fd.write(itemString) + with ThreadPoolExecutor(max_workers = 8) as thread: + for id in getidlist(response): + if int(id) < 1000: + continue + aitem = item(id, gethtml(response)) + task = thread.submit(aitem.getitem) + itemString = task.result() + print(itemString) + fd.write(itemString) try: if os.path.exists(filename_csv):