import threading, queue from cppy.cp_util import * from collections import Counter stop_words = get_stopwords() # 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数 class WordFrequencyCounter: def __init__(self, input_file): self.word_space = queue.Queue() self.freq_space = queue.Queue() for chunk in get_chunks(input_file,3000): self.word_space.put(chunk) def process_words(self): while not self.word_space.empty(): try: chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据 except queue.Empty: break # 队列为空,退出循环 # print(f"Worker thread ID: {threading.get_ident()}",len(chunk)) words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ] word_freqs = Counter(words) self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典 def run(self): workers = [ threading.Thread(target=self.process_words) for _ in range(5)] for worker in workers: worker.start() for worker in workers: worker.join() word_freqs = Counter() # 初始化一个空的Counter对象 while not self.freq_space.empty(): freqs = self.freq_space.get() if freqs: # 确保freqs非空 word_freqs.update(freqs) print_word_freqs ( sort_dict (word_freqs) ) @timing_decorator def main(): counter = WordFrequencyCounter( testfilepath ) counter.run() if __name__ == '__main__': main()