diff --git a/13 多计算单元/数据共享/2 服务进程.py b/13 多计算单元/数据共享/2 服务进程.py index 138746a..1ee93eb 100644 --- a/13 多计算单元/数据共享/2 服务进程.py +++ b/13 多计算单元/数据共享/2 服务进程.py @@ -2,9 +2,13 @@ 使用 multiprocessing.Manager: Manager 提供了一个可以在不同进程之间共享和修改的数据类型,如 list, dict, Namespace 等。 它实际上是在背后启动了一个单独的服务器进程,其他进程通过代理来访问这些共享对象。 + +使用 multiprocessing.Manager 来完成统计词频 +需要注意: + - Manager() 必须用函数包起来,不能按脚本随便放外面,否则会提示freeze_support + - 工作函数需要放到外面,不能做内部函数。否则会提示参数错误 + - 无法在 Jupyter 类似环境运行 ''' -# 使用 multiprocessing.Manager 来完成统计词频 -# 用消费者模式更好 from cppy.cp_util import * from collections import Counter @@ -12,32 +16,45 @@ from multiprocessing import Manager, Process stop_words = get_stopwords() -def count_words(chunk,word_count): - words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ] - for word in words: - word_count[word] = word_count.get(word, 0) + 1 - # word_count.update( Counter(words) ) # 类型不起作用 +def process_chunk(shared_chunks,word_count): + while True: + try: + chunk = shared_chunks.pop(0) # 从共享列表中取出一个数据块 + if chunk is None: break # 如果取出的是None,表示所有数据块已处理完毕 + words = extract_str_words(chunk) + for word in words: + if word not in stop_words: + word_count[word] = word_count.get(word, 0) + 1 + except Exception as e: + print(e) + break @timing_decorator -def main(): - manager = Manager() +def main(): + # 创建一个Manager实例 + manager = Manager() + shared_chunks = manager.list() word_count = manager.dict() - chunks = get_chunks(testfilepath) - processes = [] - for chunk in chunks: - p = Process(target=count_words, args=(chunk,word_count)) - processes.append(p) - p.start() - + # 读取文件并按块大小分割,将块添加到共享列表中 + chunk_size = 1024 * 10 # 假设每个块是10KB,可以根据需要调整 + with open(testfilepath, 'r', encoding='utf-8') as f: + while True: + chunk = f.read(chunk_size) + if not chunk: break + shared_chunks.append(chunk) + shared_chunks.append(None) + print('-------------------',len(shared_chunks)) + processes = [ Process( target=process_chunk, + args=(shared_chunks,word_count)) + for _ in range( 4 ) ] # 假设启动4个工作进程 + for p in processes: p.start() for p in processes: p.join() - sorted_word_count = sorted(word_count.items(), key=lambda x: x[1], reverse=True) - top_10_words = sorted_word_count[:10] - - print("频率最高的10个词:") - for word, count in top_10_words: - print(f"{word}: {count}") + # 将Manager类型的字典转换为普通的字典,以便使用Counter + word_count = dict(word_count) + word_freqs = Counter(word_count).most_common(10) + print_word_freqs(word_freqs) if __name__ == '__main__': main() diff --git a/13 多计算单元/数据共享/3 服务进程_分包.py b/13 多计算单元/数据共享/3 服务进程_分包.py new file mode 100644 index 0000000..4d50495 --- /dev/null +++ b/13 多计算单元/数据共享/3 服务进程_分包.py @@ -0,0 +1,42 @@ +''' +使用 multiprocessing.Manager: +Manager 提供了一个可以在不同进程之间共享和修改的数据类型,如 list, dict, Namespace 等。 +它实际上是在背后启动了一个单独的服务器进程,其他进程通过代理来访问这些共享对象。 +''' +# 使用 multiprocessing.Manager 来完成统计词频 +# 怎么得到最快的一个结果,是一个试错过程:X程创建数目多少、分片的大小 ... + +from cppy.cp_util import * +from collections import Counter +from multiprocessing import Manager, Process + +stop_words = get_stopwords() + +def process_chunk(chunk,word_count): + words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ] + for word in words: # 非常化时间 + word_count[word] = word_count.get(word, 0) + 1 + # word_count.update( Counter(words) ) # 类型不起作用 + +@timing_decorator +def main(): + manager = Manager() + word_count = manager.dict() + + chunks = get_chunks(testfilepath,2800) + print('-------------------',len(chunks)) + processes = [] + for chunk in chunks: + p = Process(target=process_chunk, + args=(chunk,word_count) ) + processes.append(p) + p.start() + + for p in processes: p.join() + + word_count = dict(word_count) + word_freqs = Counter(word_count).most_common(10) + print_word_freqs(word_freqs) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/16 其它/数据库/ORM/DataQuery.py b/13 多计算单元/数据共享/4 数据库/ORM/DataQuery.py similarity index 100% rename from 16 其它/数据库/ORM/DataQuery.py rename to 13 多计算单元/数据共享/4 数据库/ORM/DataQuery.py diff --git a/16 其它/数据库/ORM/createDb.py b/13 多计算单元/数据共享/4 数据库/ORM/createDb.py similarity index 100% rename from 16 其它/数据库/ORM/createDb.py rename to 13 多计算单元/数据共享/4 数据库/ORM/createDb.py diff --git a/16 其它/数据库/ORM/processData.py b/13 多计算单元/数据共享/4 数据库/ORM/processData.py similarity index 100% rename from 16 其它/数据库/ORM/processData.py rename to 13 多计算单元/数据共享/4 数据库/ORM/processData.py diff --git a/16 其它/数据库/tf.db b/13 多计算单元/数据共享/4 数据库/tf.db similarity index 100% rename from 16 其它/数据库/tf.db rename to 13 多计算单元/数据共享/4 数据库/tf.db diff --git a/16 其它/数据库/数据库.py b/13 多计算单元/数据共享/4 数据库/数据库.py similarity index 89% rename from 16 其它/数据库/数据库.py rename to 13 多计算单元/数据共享/4 数据库/数据库.py index 34b2411..1552144 100644 --- a/16 其它/数据库/数据库.py +++ b/13 多计算单元/数据共享/4 数据库/数据库.py @@ -51,4 +51,11 @@ with sqlite3.connect(db_file_path) as connection: c = connection.cursor() c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10") for row in c.fetchall(): - print(row[0], '-', row[1]) \ No newline at end of file + print(row[0], '-', row[1]) + + +''' +也可以把数据库看做解决共享数据的竞争死锁的办法 +不过本例中的计算太快 +用数据库共享数据成本太高 +''' \ No newline at end of file diff --git a/13 多计算单元/数据分包/mapreduce.py b/13 多计算单元/数据分包/1 mapreduce.py similarity index 100% rename from 13 多计算单元/数据分包/mapreduce.py rename to 13 多计算单元/数据分包/1 mapreduce.py diff --git a/13 多计算单元/数据分包/多线程.py b/13 多计算单元/数据分包/2 多线程.py similarity index 100% rename from 13 多计算单元/数据分包/多线程.py rename to 13 多计算单元/数据分包/2 多线程.py diff --git a/13 多计算单元/数据分包/多进程.py b/13 多计算单元/数据分包/3 多进程.py similarity index 98% rename from 13 多计算单元/数据分包/多进程.py rename to 13 多计算单元/数据分包/3 多进程.py index 7809bb7..0fbbe7e 100644 --- a/13 多计算单元/数据分包/多进程.py +++ b/13 多计算单元/数据分包/3 多进程.py @@ -41,5 +41,5 @@ def main(): if __name__ == '__main__': - main() + main() diff --git a/13 多计算单元/数据分包/抽象并发.py b/13 多计算单元/数据分包/4 抽象并发.py similarity index 100% rename from 13 多计算单元/数据分包/抽象并发.py rename to 13 多计算单元/数据分包/4 抽象并发.py diff --git a/14 人机交互/Web/simpleWeb/app.py b/14 人机交互/Web/simpleWeb/app.py new file mode 100644 index 0000000..83def99 --- /dev/null +++ b/14 人机交互/Web/simpleWeb/app.py @@ -0,0 +1,30 @@ +from flask import Flask, render_template, request, redirect, url_for +from collections import Counter +from cppy.cp_util import * +import os + +app = Flask(__name__) + +@app.route('/', methods=['GET', 'POST']) +def index(): + if request.method == 'POST': + # 获取上传的文件 + file = request.files['file'] + + # 保存临时文件并读取内容 + filename = os.path.join('/temp', file.filename) + file.save(filename) + + # 计算词频 + words = extract_file_words(filename) + word_counts = Counter(words) + + # 删除临时文件 + os.remove(filename) + + return render_template('result.html', word_counts=word_counts.most_common()) + + return render_template('index.html') + +if __name__ == '__main__': + app.run(debug=True) \ No newline at end of file diff --git a/14 人机交互/Web/simpleWeb/templates/index.html b/14 人机交互/Web/simpleWeb/templates/index.html new file mode 100644 index 0000000..5a31bad --- /dev/null +++ b/14 人机交互/Web/simpleWeb/templates/index.html @@ -0,0 +1,14 @@ + + +
+ +