You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
3.9 KiB

11 months ago
'''
依靠给各个不同线程组件的队列发指令来驱动所有工作比较繁琐
比较 01.py 的实现各个组件完全不能互操作仅依靠队列发消息进行协作
这是一个示例性质的原型具体分布式环境下需要调整
'''
11 months ago
from threading import Thread
from queue import Queue
from cppy.cp_util import *
class ActiveWFObject(Thread):
def __init__(self):
super().__init__()
self.queue = Queue()
self._stopMe = False
self.start()
def run(self):
while not self._stopMe:
message = self.queue.get()
self._dispatch(message)
11 months ago
if message[0] == 'over':
break
11 months ago
def send(receiver, message):
receiver.queue.put(message)
class DataStorageManager(ActiveWFObject):
""" Models the contents of the file """
_data = ''
def _dispatch(self, message):
if message[0] == 'init':
self._init(message[1:])
elif message[0] == 'send_word_freqs':
self._process_words(message[1:])
else:
# forward
send(self._stop_word_manager, message)
def _init(self, message):
11 months ago
self._data = extract_file_words(message[0])
self._stop_word_manager = message[1]
11 months ago
def _process_words(self, message):
recipient = message[0]
for w in self._data:
send(self._stop_word_manager, ['filter', w])
11 months ago
send(self._stop_word_manager, ['topWord', recipient])
11 months ago
class StopWordManager(ActiveWFObject):
""" Models the stop word filter """
_stop_words = []
def _dispatch(self, message):
if message[0] == 'init':
self._init(message[1:])
elif message[0] == 'filter':
return self._filter(message[1:])
else:
# forward
send(self._word_freqs_manager, message)
def _init(self, message):
self._stop_words = get_stopwords()
self._word_freqs_manager = message[0]
def _filter(self, message):
word = message[0]
if word not in self._stop_words:
send(self._word_freqs_manager, ['word', word])
class WordFrequencyManager(ActiveWFObject):
""" Keeps the word frequency data """
_word_freqs = {}
def _dispatch(self, message):
if message[0] == 'word':
self._increment_count(message[1:])
11 months ago
elif message[0] == 'topWord':
self._topWord(message[1:])
11 months ago
def _increment_count(self, message):
word, = message
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
11 months ago
def _topWord(self, message):
11 months ago
recipient = message[0]
freqs_sorted = sort_dict ( self._word_freqs )
11 months ago
send(recipient, ['topWord', freqs_sorted])
11 months ago
11 months ago
class MyController(ActiveWFObject):
11 months ago
def _dispatch(self, message):
if message[0] == 'run':
self._run(message[1:])
11 months ago
elif message[0] == 'topWord':
11 months ago
self._display(message[1:])
else:
raise Exception("Message not understood " + message[0])
def _run(self, message):
self._storage_manager, = message
send(self._storage_manager, ['send_word_freqs', self])
def _display(self, message):
word_freqs, = message
print_word_freqs( word_freqs)
11 months ago
send(self._storage_manager, ['over'])
11 months ago
self._stopMe = True
11 months ago
if __name__ == '__main__':
11 months ago
word_freq_manager = WordFrequencyManager()
stop_word_manager = StopWordManager()
storage_manager = DataStorageManager()
11 months ago
wfcontroller = MyController()
11 months ago
send(storage_manager, ['init', testfilepath, stop_word_manager])
11 months ago
send(stop_word_manager, ['init', word_freq_manager])
11 months ago
send(wfcontroller, ['run', storage_manager])
11 months ago
# 等待所有管理器完成工作
threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]
for thread in threads: thread.join()