You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
3.9 KiB

9 months ago
'''
依靠给各个不同线程组件的队列发指令来驱动所有工作比较繁琐
比较 01.py 的实现各个组件完全不能互操作仅依靠队列发消息进行协作
这是一个示例性质的原型具体分布式环境下需要调整
'''
9 months ago
from threading import Thread
from queue import Queue
from cppy.cp_util import *
class ActiveWFObject(Thread):
def __init__(self):
super().__init__()
self.queue = Queue()
self._stopMe = False
self.start()
def run(self):
while not self._stopMe:
message = self.queue.get()
self._dispatch(message)
9 months ago
if message[0] == 'over':
break
9 months ago
def send(receiver, message):
receiver.queue.put(message)
class DataStorageManager(ActiveWFObject):
""" Models the contents of the file """
_data = ''
def _dispatch(self, message):
if message[0] == 'init':
self._init(message[1:])
elif message[0] == 'send_word_freqs':
self._process_words(message[1:])
else:
# forward
send(self._stop_word_manager, message)
def _init(self, message):
9 months ago
self._data = extract_file_words(message[0])
self._stop_word_manager = message[1]
9 months ago
def _process_words(self, message):
recipient = message[0]
for w in self._data:
send(self._stop_word_manager, ['filter', w])
9 months ago
send(self._stop_word_manager, ['topWord', recipient])
9 months ago
class StopWordManager(ActiveWFObject):
""" Models the stop word filter """
_stop_words = []
def _dispatch(self, message):
if message[0] == 'init':
self._init(message[1:])
elif message[0] == 'filter':
return self._filter(message[1:])
else:
# forward
send(self._word_freqs_manager, message)
def _init(self, message):
self._stop_words = get_stopwords()
self._word_freqs_manager = message[0]
def _filter(self, message):
word = message[0]
if word not in self._stop_words:
send(self._word_freqs_manager, ['word', word])
class WordFrequencyManager(ActiveWFObject):
""" Keeps the word frequency data """
_word_freqs = {}
def _dispatch(self, message):
if message[0] == 'word':
self._increment_count(message[1:])
9 months ago
elif message[0] == 'topWord':
self._topWord(message[1:])
9 months ago
def _increment_count(self, message):
word, = message
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
9 months ago
def _topWord(self, message):
9 months ago
recipient = message[0]
freqs_sorted = sort_dict ( self._word_freqs )
9 months ago
send(recipient, ['topWord', freqs_sorted])
9 months ago
9 months ago
class MyController(ActiveWFObject):
9 months ago
def _dispatch(self, message):
if message[0] == 'run':
self._run(message[1:])
9 months ago
elif message[0] == 'topWord':
9 months ago
self._display(message[1:])
else:
raise Exception("Message not understood " + message[0])
def _run(self, message):
self._storage_manager, = message
send(self._storage_manager, ['send_word_freqs', self])
def _display(self, message):
word_freqs, = message
print_word_freqs( word_freqs)
9 months ago
send(self._storage_manager, ['over'])
9 months ago
self._stopMe = True
9 months ago
if __name__ == '__main__':
9 months ago
word_freq_manager = WordFrequencyManager()
stop_word_manager = StopWordManager()
storage_manager = DataStorageManager()
9 months ago
wfcontroller = MyController()
9 months ago
send(storage_manager, ['init', testfilepath, stop_word_manager])
9 months ago
send(stop_word_manager, ['init', word_freq_manager])
9 months ago
send(wfcontroller, ['run', storage_manager])
9 months ago
# 等待所有管理器完成工作
threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]
for thread in threads: thread.join()