import threading, queue from cppy.cp_util import * from collections import Counter stop_words = get_stopwords() # 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数 class WordFrequencyCounter: def __init__(self, input_file): self.word_space = queue.Queue() self.freq_space = queue.Queue() for chunk in get_chunks(input_file,3000): self.word_space.put(chunk) def process_words(self): while not self.word_space.empty(): try: chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据 except queue.Empty: break # 队列为空,退出循环 # print(f"Worker thread ID: {threading.get_ident()}",len(chunk)) words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ] word_freqs = Counter(words) self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典 def run(self): workers = [ threading.Thread(target=self.process_words) for _ in range(5)] for worker in workers: worker.start() for worker in workers: worker.join() word_freqs = Counter() # 初始化一个空的Counter对象 while not self.freq_space.empty(): freqs = self.freq_space.get() if freqs: # 确保freqs非空 word_freqs.update(freqs) print_word_freqs ( sort_dict (word_freqs) ) @timing_decorator def main(): counter = WordFrequencyCounter( testfilepath ) counter.run() if __name__ == '__main__': main() ''' 在多线程之间传递数据,建议使用线程安全的队列,如queue.Queue或multiprocessing.Queue(后者也适用于多进程环境)。 这些队列提供了线程安全的数据传输机制,可以避免竞态条件和数据损坏。 全局变量不可预测 multiprocessing.Queue在Python中的底层实现并不直接依赖于文件系统。它利用了操作系统提供的进程间通信(IPC, Inter-Process Communication)机制,具体实现取决于不同操作系统的支持。 在Unix/Linux系统中,multiprocessing.Queue通常基于管道(pipes)、共享内存和/或消息队列等机制实现。而在Windows系统上,可能使用命名管道(named pipes)或者内存映射文件(memory-mapped files),以及某些版本的Windows特有的进程间同步对象如Mutexes、Semaphores和事件。 '''