dev
zj3D 8 months ago
parent cd8186dd68
commit 5099345721

@ -2,9 +2,13 @@
使用 multiprocessing.Manager: 使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象 它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
使用 multiprocessing.Manager 来完成统计词频
需要注意
- Manager() 必须用函数包起来,不能按脚本随便放外面否则会提示freeze_support
- 工作函数需要放到外面不能做内部函数否则会提示参数错误
- 无法在 Jupyter 类似环境运行
''' '''
# 使用 multiprocessing.Manager 来完成统计词频
# 用消费者模式更好
from cppy.cp_util import * from cppy.cp_util import *
from collections import Counter from collections import Counter
@ -12,32 +16,45 @@ from multiprocessing import Manager, Process
stop_words = get_stopwords() stop_words = get_stopwords()
def count_words(chunk,word_count): def process_chunk(shared_chunks,word_count):
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ] while True:
try:
chunk = shared_chunks.pop(0) # 从共享列表中取出一个数据块
if chunk is None: break # 如果取出的是None表示所有数据块已处理完毕
words = extract_str_words(chunk)
for word in words: for word in words:
if word not in stop_words:
word_count[word] = word_count.get(word, 0) + 1 word_count[word] = word_count.get(word, 0) + 1
# word_count.update( Counter(words) ) # 类型不起作用 except Exception as e:
print(e)
break
@timing_decorator @timing_decorator
def main(): def main():
# 创建一个Manager实例
manager = Manager() manager = Manager()
shared_chunks = manager.list()
word_count = manager.dict() word_count = manager.dict()
chunks = get_chunks(testfilepath) # 读取文件并按块大小分割,将块添加到共享列表中
processes = [] chunk_size = 1024 * 10 # 假设每个块是10KB可以根据需要调整
for chunk in chunks: with open(testfilepath, 'r', encoding='utf-8') as f:
p = Process(target=count_words, args=(chunk,word_count)) while True:
processes.append(p) chunk = f.read(chunk_size)
p.start() if not chunk: break
shared_chunks.append(chunk)
shared_chunks.append(None)
print('-------------------',len(shared_chunks))
processes = [ Process( target=process_chunk,
args=(shared_chunks,word_count))
for _ in range( 4 ) ] # 假设启动4个工作进程
for p in processes: p.start()
for p in processes: p.join() for p in processes: p.join()
sorted_word_count = sorted(word_count.items(), key=lambda x: x[1], reverse=True) # 将Manager类型的字典转换为普通的字典以便使用Counter
top_10_words = sorted_word_count[:10] word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print("频率最高的10个词:") print_word_freqs(word_freqs)
for word, count in top_10_words:
print(f"{word}: {count}")
if __name__ == '__main__': if __name__ == '__main__':
main() main()

@ -0,0 +1,42 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
'''
# 使用 multiprocessing.Manager 来完成统计词频
# 怎么得到最快的一个结果是一个试错过程X程创建数目多少、分片的大小 ...
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(chunk,word_count):
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
for word in words: # 非常化时间
word_count[word] = word_count.get(word, 0) + 1
# word_count.update( Counter(words) ) # 类型不起作用
@timing_decorator
def main():
manager = Manager()
word_count = manager.dict()
chunks = get_chunks(testfilepath,2800)
print('-------------------',len(chunks))
processes = []
for chunk in chunks:
p = Process(target=process_chunk,
args=(chunk,word_count) )
processes.append(p)
p.start()
for p in processes: p.join()
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -52,3 +52,10 @@ with sqlite3.connect(db_file_path) as connection:
c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10") c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10")
for row in c.fetchall(): for row in c.fetchall():
print(row[0], '-', row[1]) print(row[0], '-', row[1])
'''
也可以把数据库看做解决共享数据的竞争死锁的办法
不过本例中的计算太快
用数据库共享数据成本太高
'''

@ -0,0 +1,30 @@
from flask import Flask, render_template, request, redirect, url_for
from collections import Counter
from cppy.cp_util import *
import os
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 获取上传的文件
file = request.files['file']
# 保存临时文件并读取内容
filename = os.path.join('/temp', file.filename)
file.save(filename)
# 计算词频
words = extract_file_words(filename)
word_counts = Counter(words)
# 删除临时文件
os.remove(filename)
return render_template('result.html', word_counts=word_counts.most_common())
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,14 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Upload Text File</title>
</head>
<body>
<h1>Upload a Text File to Count Word Frequencies</h1>
<form action="/" method="post" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Submit">
</form>
</body>
</html>

@ -0,0 +1,16 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Word Frequencies</title>
</head>
<body>
<h1>Top Word Frequencies:</h1>
<ul>
{% for word, count in word_counts %}
<li>{{ word }}: {{ count }}</li>
{% endfor %}
</ul>
<a href="{{ url_for('index') }}">Back to Upload</a>
</body>
</html>

@ -0,0 +1,25 @@
import requests
from cppy.cp_util import *
def main():
# 读测试文件的内容
content = read_file()
# 抽词
tokenize_response = requests.post("http://localhost:7770/tokenize", json={"text": content})
words = tokenize_response.json()["words"]
# 计算词频
count_response = requests.post("http://localhost:7771/count", json={"words": words})
word_count = count_response.json()["word_count"]
# 排序
sort_response = requests.post("http://localhost:7772/sort", json={"word_count": word_count})
top_10_words = sort_response.json()["top_10_words"]
print("Top 10 words:")
print_word_freqs(top_10_words)
if __name__ == "__main__":
main()

@ -0,0 +1,14 @@
from fastapi import FastAPI
from collections import Counter
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/count")
async def count(words_list: dict): # {"words": ["word1", "word2", ...]}
word_count = Counter(words_list["words"])
return {"word_count": dict(word_count)}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7771)

@ -0,0 +1,13 @@
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.post("/sort")
async def sort(word_count_dict: dict):
sorted_word_count = sorted(word_count_dict["word_count"].items(), key=lambda x: x[1], reverse=True)
top_10_words = sorted_word_count[:10]
return {"top_10_words": top_10_words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7772)

@ -0,0 +1,13 @@
from fastapi import FastAPI
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/tokenize")
async def tokenize(text: str):
words = extract_str_words(text)
return {"words": words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7770)
Loading…
Cancel
Save