|
|
|
|
import threading, queue
|
|
|
|
|
from cppy.cp_util import *
|
|
|
|
|
from collections import Counter
|
|
|
|
|
|
|
|
|
|
stop_words = get_stopwords()
|
|
|
|
|
|
|
|
|
|
# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数
|
|
|
|
|
class WordFrequencyCounter:
|
|
|
|
|
def __init__(self, input_file):
|
|
|
|
|
self.word_space = queue.Queue()
|
|
|
|
|
self.freq_space = queue.Queue()
|
|
|
|
|
for chunk in get_chunks(input_file,3000):
|
|
|
|
|
self.word_space.put(chunk)
|
|
|
|
|
|
|
|
|
|
def process_words(self):
|
|
|
|
|
while not self.word_space.empty():
|
|
|
|
|
try:
|
|
|
|
|
chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据
|
|
|
|
|
except queue.Empty:
|
|
|
|
|
break # 队列为空,退出循环
|
|
|
|
|
# print(f"Worker thread ID: {threading.get_ident()}",len(chunk))
|
|
|
|
|
words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ]
|
|
|
|
|
word_freqs = Counter(words)
|
|
|
|
|
self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
workers = [ threading.Thread(target=self.process_words) for _ in range(5)]
|
|
|
|
|
for worker in workers: worker.start()
|
|
|
|
|
for worker in workers: worker.join()
|
|
|
|
|
|
|
|
|
|
word_freqs = Counter() # 初始化一个空的Counter对象
|
|
|
|
|
while not self.freq_space.empty():
|
|
|
|
|
freqs = self.freq_space.get()
|
|
|
|
|
if freqs: # 确保freqs非空
|
|
|
|
|
word_freqs.update(freqs)
|
|
|
|
|
|
|
|
|
|
print_word_freqs ( sort_dict (word_freqs) )
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@timing_decorator
|
|
|
|
|
def main():
|
|
|
|
|
counter = WordFrequencyCounter( testfilepath )
|
|
|
|
|
counter.run()
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
在多线程之间传递数据,建议使用线程安全的队列,如queue.Queue或multiprocessing.Queue(后者也适用于多进程环境)。
|
|
|
|
|
这些队列提供了线程安全的数据传输机制,可以避免竞态条件和数据损坏。
|
|
|
|
|
全局变量不可预测
|
|
|
|
|
|
|
|
|
|
multiprocessing.Queue 利用了操作系统提供的进程间通信(IPC, Inter-Process Communication)机制,具体实现取决于不同操作系统的支持。
|
|
|
|
|
在Unix/Linux系统中,multiprocessing.Queue通常基于管道(pipes)、共享内存和/或消息队列等机制实现。
|
|
|
|
|
而在Windows系统上,可能使用命名管道(named pipes)或者内存映射文件(memory-mapped files),以及某些版本的Windows特有的进程间同步对象如Mutexes、Semaphores和事件。
|
|
|
|
|
'''
|