''' 使用 multiprocessing.Manager: Manager 提供了一个可以在不同进程之间共享和修改的数据类型,如 list, dict, Namespace 等。 它实际上是在背后启动了一个单独的服务器进程,其他进程通过代理来访问这些共享对象。 使用 multiprocessing.Manager 来完成统计词频 需要注意: - Manager() 必须用函数包起来,不能按脚本随便放外面,否则会提示freeze_support - 工作函数需要放到外面,不能做内部函数。否则会提示参数错误 - 无法在 Jupyter 类似环境运行 ''' from cppy.cp_util import * from collections import Counter from multiprocessing import Manager, Process stop_words = get_stopwords() def process_chunk(shared_chunks,word_count): while True: try: chunk = shared_chunks.pop(0) # 从共享列表中取出一个数据块 if chunk is None: break # 如果取出的是None,表示所有数据块已处理完毕 words = extract_str_words(chunk) for word in words: if word not in stop_words: word_count[word] = word_count.get(word, 0) + 1 except Exception as e: print(e) break @timing_decorator def main(): # 创建一个Manager实例 manager = Manager() shared_chunks = manager.list() word_count = manager.dict() # 读取文件并按块大小分割,将块添加到共享列表中 chunk_size = 1024 * 10 # 假设每个块是10KB,可以根据需要调整 with open(testfilepath, 'r', encoding='utf-8') as f: while True: chunk = f.read(chunk_size) if not chunk: break shared_chunks.append(chunk) shared_chunks.append(None) print('-------------------',len(shared_chunks)) processes = [ Process( target=process_chunk, args=(shared_chunks,word_count)) for _ in range( 4 ) ] # 假设启动4个工作进程 for p in processes: p.start() for p in processes: p.join() # 将Manager类型的字典转换为普通的字典,以便使用Counter word_count = dict(word_count) word_freqs = Counter(word_count).most_common(10) print_word_freqs(word_freqs) if __name__ == '__main__': main()