''' 依靠给各个不同线程组件的队列发指令来驱动所有工作,比较繁琐。 比较 01.py 的实现,各个组件完全不能互操作,仅依靠队列发消息进行协作 这是一个示例性质的原型,具体分布式环境下需要调整 ''' from threading import Thread from queue import Queue from cppy.cp_util import * class ActiveWFObject(Thread): def __init__(self): super().__init__() self.queue = Queue() self._stopMe = False self.start() def run(self): while not self._stopMe: message = self.queue.get() self._dispatch(message) if message[0] == 'over': break def send(receiver, message): receiver.queue.put(message) class DataStorageManager(ActiveWFObject): """ Models the contents of the file """ _data = '' def _dispatch(self, message): if message[0] == 'init': self._init(message[1:]) elif message[0] == 'send_word_freqs': self._process_words(message[1:]) else: # forward send(self._stop_word_manager, message) def _init(self, message): self._data = extract_file_words(message[0]) self._stop_word_manager = message[1] def _process_words(self, message): recipient = message[0] for w in self._data: send(self._stop_word_manager, ['filter', w]) send(self._stop_word_manager, ['topWord', recipient]) class StopWordManager(ActiveWFObject): """ Models the stop word filter """ _stop_words = [] def _dispatch(self, message): if message[0] == 'init': self._init(message[1:]) elif message[0] == 'filter': return self._filter(message[1:]) else: # forward send(self._word_freqs_manager, message) def _init(self, message): self._stop_words = get_stopwords() self._word_freqs_manager = message[0] def _filter(self, message): word = message[0] if word not in self._stop_words: send(self._word_freqs_manager, ['word', word]) class WordFrequencyManager(ActiveWFObject): """ Keeps the word frequency data """ _word_freqs = {} def _dispatch(self, message): if message[0] == 'word': self._increment_count(message[1:]) elif message[0] == 'topWord': self._topWord(message[1:]) def _increment_count(self, message): word, = message self._word_freqs[word] = self._word_freqs.get(word, 0) + 1 def _topWord(self, message): recipient = message[0] freqs_sorted = sort_dict ( self._word_freqs ) send(recipient, ['topWord', freqs_sorted]) class MyController(ActiveWFObject): def _dispatch(self, message): if message[0] == 'run': self._run(message[1:]) elif message[0] == 'topWord': self._display(message[1:]) else: raise Exception("Message not understood " + message[0]) def _run(self, message): self._storage_manager, = message send(self._storage_manager, ['send_word_freqs', self]) def _display(self, message): word_freqs, = message print_word_freqs( word_freqs) send(self._storage_manager, ['over']) self._stopMe = True if __name__ == '__main__': word_freq_manager = WordFrequencyManager() stop_word_manager = StopWordManager() storage_manager = DataStorageManager() wfcontroller = MyController() send(storage_manager, ['init', testfilepath, stop_word_manager]) send(stop_word_manager, ['init', word_freq_manager]) send(wfcontroller, ['run', storage_manager]) # 等待所有管理器完成工作 threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller] for thread in threads: thread.join()