修改了线程逻辑,用线程池加快了整个流程

master
wkyuu 3 years ago
parent f391dc8362
commit d6661a23b5

@ -192,6 +192,88 @@ import requests
```
### 线程
多线程,手动版
```python
import threading
import time
threadlines = 16 # 默认调用16个线程不要超过20
flag = 1 # 判断主线程
def printTime(name):
print("name", time.ctime())
delay(4)
print("name", time.ctime())
threads = []
for thread in range(threadlines):
name = "thread " + str(thread)
athread = printTime(name)
athread.start()
threads.append(athread)
for thread in threads: # 加入阻塞,在子线程没完全结束前,保证主线程不断
thread.join()
```
线程锁
```python
import threading
import time
threadLock = threading.Lock()
threadlines = 16 # 默认调用16个线程不要超过20
flag = 1 # 判断主线程
def printTime(name):
print("name", time.ctime())
delay(4)
print("newname", time.ctime())
newtime = str(time.ctime())
threadLock.acquire() # 获得对txt文件的锁独享操作权限
write2txt(newname)
threadLock.release() # 释放锁(把独享权限让出)
def write2txt(name):
with open('test.txt', 'a+', encoding = 'utf-8') as fd:
fd.write(name)
threads = []
for thread in range(4):
name = "thread " + str(thread)
athread = printTime(name)
athread.start()
threads.append(athread)
for thread in threads: # 加入阻塞,在子线程没完全结束前,保证主线程不断
thread.join()
```
线程池,建议用
```python
from concurrent.futures import ThreadPoolExecutor
import time
def printTime(name):
print("name", time.ctime())
delay(4)
print("newname", time.ctime())
with ThreadPoolExecutor(max_workers = 10) as thread:
for count in range(10):
name = "thread" + str(count)
task = thread.submit(printTime, (name)) # 传入函数和对应需要的参数
print(task.done()) # 查看该线程是否完成bool
print(task.result()) # 返回上面 printTime 函数的返回值
```
### Redis
```python
@ -250,4 +332,6 @@ redisconn = redis.Redis(host = '127.0.0.1', port = '6379', password = 'x', db =
17[Python字符串操作之字符串分割与组合](https://blog.csdn.net/seetheworld518/article/details/47346527)
18
18[python线程池](https://www.cnblogs.com/liyuanhong/p/15767817.html)
19

@ -39,7 +39,7 @@ class historyPriceItem:
return str(tags[5:])
def updateTime() -> str:
string = 'timeline-text'
string = 'p3'
if check(string) == False: # 用于判断有无数据更新时间记录
return ''
reg = r"//div[@class='p3']/p[@class='tips']/text()"

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import settings
import pipelines
import downloader
import threading
import redis
import time
import os
@ -16,9 +16,6 @@ REDIS_LISTNAME = settings.REDIS_LISTNAME
BASEURL = settings.BASEURL
FILENAME_CSV = settings.FILENAME_CSV
threadLock = threading.Lock()
threadlines = 16 # 默认调用16个线程不要超过20
flag = 1 # 判断主线程
connection_pool = redis.ConnectionPool(host = REDIS_HOST, port = REDIS_PORT, password = REDIS_PASSWORD, decode_responses = True)
redisconn = redis.Redis(connection_pool = connection_pool)
@ -67,42 +64,26 @@ def write2csv(category, response): # 写入csv文件
filename_csv = os.getcwd() + "\\Catalogues\\" + FILENAME_CSV.get(category)
pipelines.write2csv(response, filename_csv)
class milkSpider(threading.Thread):
def __init__(self, name, url):
threading.Thread.__init__(self)
self.name = name
class milkSpider:
def __init__(self, url):
self.url = url
self.category = getCategory(url)
# self.response = ""
def run(self):
def go(self):
self.response = downloader.getsource(self.url)
threadLock.acquire()
print("write2csv for '{}' was started.".format(self.url))
write2csv(self.category, self.response)
print("{} is done.".format(self.name))
threadLock.release()
def mainThread(threadlines = threadlines, flag = flag): # 线程数默认为3
def mainThread():
try:
threads = []
for index in range(1, threadlines + 1):
with ThreadPoolExecutor(max_workers = 8) as thread:
while True:
if isNullRedis():
print("Redis queue is empty, no more threads will be started")
flag = 0
break
name = "Thread[" + str(index) + "]"
print("{} started... {}/{}".format(name, str(index), threadlines))
url = redisconn.lpop(REDIS_LISTNAME)
athread = milkSpider(name, url)
athread.start()
threads.append(athread)
for thread in threads:
thread.join()
if flag == 1:
mainThread(threadlines, flag)
aSpider = milkSpider(url)
thread.submit(aSpider.go)
except BaseException as e:
print(e)

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from lxml import etree
from concurrent.futures import ThreadPoolExecutor
import os
import historyPrice
@ -159,11 +160,14 @@ def print2console(response): # 输出到命令行
def write2csv(response, filename_csv): # 写入到csv文件
def writer(fd):
with ThreadPoolExecutor(max_workers = 8) as thread:
for id in getidlist(response):
if int(id) < 1000:
continue
aitem = item(id, gethtml(response))
itemString = aitem.getitem()
task = thread.submit(aitem.getitem)
itemString = task.result()
print(itemString)
fd.write(itemString)
try:

Loading…
Cancel
Save