################ 待整理 ''' 多线程各个模块比较乱的但是协作序贯的完成了数据处理 各个组件完全不能互操作,仅依靠队列发消息进行协作 适合环节多,数据可分块,有IO-计算性能设计考量要求,让各个模块自己适应调整 在某些情况下,可以避免复杂的控制流设计,使代码简洁 ''' from threading import Thread from queue import Queue from cppy.cp_util import * class ThreadObject(Thread): def __init__(self): super().__init__() self.queue = Queue() self._over = False self.start() def run(self): while not self._over: message = self.queue.get() self._dispatch(message) if message[0] == 'over': break def send(receiver, message): receiver.queue.put(message) class TxtManager(ThreadObject): _data = '' def _dispatch(self, message): if message[0] == 'init': self._init(message[1:]) elif message[0] == 'send_word_freqs': self._process_words(message[1:]) else: send(self._stop_word_manager, message) def _init(self, message): self._data = extract_file_words(message[0]) self._stop_word_manager = message[1] def _process_words(self, message): recipient = message[0] for w in self._data: send(self._stop_word_manager, ['filter', w]) send(self._stop_word_manager, ['topWord', recipient]) class FilterManager(ThreadObject): _stop_words = [] def _dispatch(self, message): if message[0] == 'init': self._init(message[1:]) elif message[0] == 'filter': return self._filter(message[1:]) else: send(self._word_freqs_manager, message) def _init(self, message): self._stop_words = get_stopwords() self._word_freqs_manager = message[0] def _filter(self, message): word = message[0] if word not in self._stop_words: send(self._word_freqs_manager, ['word', word]) class WFManager(ThreadObject): _word_freqs = {} def _dispatch(self, message): if message[0] == 'word': self._increment_count(message[1:]) elif message[0] == 'topWord': self._topWord(message[1:]) def _increment_count(self, message): word, = message self._word_freqs[word] = self._word_freqs.get(word, 0) + 1 def _topWord(self, message): recipient = message[0] freqs_sorted = sort_dict ( self._word_freqs ) send(recipient, ['topWord', freqs_sorted]) class MyController(ThreadObject): def _dispatch(self, message): if message[0] == 'run': self._run(message[1:]) elif message[0] == 'topWord': self._display(message[1:]) else: raise Exception("Message not understood " + message[0]) def _run(self, message): self._storage_manager, = message send(self._storage_manager, ['send_word_freqs', self]) def _display(self, message): word_freqs, = message print_word_freqs( word_freqs) send(self._storage_manager, ['over']) self._over = True if __name__ == '__main__': word_freq_manager = WFManager() stop_word_manager = FilterManager() storage_manager = TxtManager() wfcontroller = MyController() send(storage_manager, ['init', testfilepath, stop_word_manager]) send(stop_word_manager, ['init', word_freq_manager]) send(wfcontroller, ['run', storage_manager]) # 等待所有管理器完成工作 threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller] for thread in threads: thread.join()