from threading import Thread from queue import Queue from cppy.cp_util import * class ActiveWFObject(Thread): def __init__(self): super().__init__() self.queue = Queue() self._stopMe = False self.start() def run(self): while not self._stopMe: message = self.queue.get() self._dispatch(message) if message[0] == 'die': self._stopMe = True def send(receiver, message): receiver.queue.put(message) class DataStorageManager(ActiveWFObject): """ Models the contents of the file """ _data = '' def _dispatch(self, message): if message[0] == 'init': self._init(message[1:]) elif message[0] == 'send_word_freqs': self._process_words(message[1:]) else: # forward send(self._stop_word_manager, message) def _init(self, message): path_to_file = message[0] self._stop_word_manager = message[1] self._data = extract_file_words(path_to_file) def _process_words(self, message): recipient = message[0] for w in self._data: send(self._stop_word_manager, ['filter', w]) send(self._stop_word_manager, ['top10', recipient]) class StopWordManager(ActiveWFObject): """ Models the stop word filter """ _stop_words = [] def _dispatch(self, message): if message[0] == 'init': self._init(message[1:]) elif message[0] == 'filter': return self._filter(message[1:]) else: # forward send(self._word_freqs_manager, message) def _init(self, message): self._stop_words = get_stopwords() self._word_freqs_manager = message[0] def _filter(self, message): word = message[0] if word not in self._stop_words: send(self._word_freqs_manager, ['word', word]) class WordFrequencyManager(ActiveWFObject): """ Keeps the word frequency data """ _word_freqs = {} def _dispatch(self, message): if message[0] == 'word': self._increment_count(message[1:]) elif message[0] == 'top10': self._top10(message[1:]) def _increment_count(self, message): word, = message self._word_freqs[word] = self._word_freqs.get(word, 0) + 1 def _top10(self, message): recipient = message[0] freqs_sorted = sort_dict ( self._word_freqs ) send(recipient, ['top10', freqs_sorted]) class WordFrequencyController(ActiveWFObject): def _dispatch(self, message): if message[0] == 'run': self._run(message[1:]) elif message[0] == 'top10': self._display(message[1:]) else: raise Exception("Message not understood " + message[0]) def _run(self, message): self._storage_manager, = message send(self._storage_manager, ['send_word_freqs', self]) def _display(self, message): word_freqs, = message print_word_freqs( word_freqs) send(self._storage_manager, ['die']) self._stopMe = True if __name__ == '__main__': word_freq_manager = WordFrequencyManager() stop_word_manager = StopWordManager() storage_manager = DataStorageManager() send(stop_word_manager, ['init', word_freq_manager]) send(storage_manager, ['init', testfilepath, stop_word_manager]) wfcontroller = WordFrequencyController() send(wfcontroller, ['run', storage_manager]) # Wait for the active objects to finish [t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]]