dev
zj3D 8 months ago
parent 850a3eb772
commit fa3e01dedc

@ -53,7 +53,7 @@ def main(testfilepath, top_n = 10 ):
wordlist = re_split( read_file(testfilepath) )
for word in wordlist:
if word not in stopwords:
subject.notify(word) # 触发
subject.notify(word) # 通知
# 打印最高的N个词频
top_words = observer.word_count.most_common(top_n)

@ -1,3 +1,4 @@
################ 待整理
'''
注册者 = 观察者

@ -1,3 +1,5 @@
################ 待整理
from cppy.cp_util import *
'''
订阅者 = 注册者 = 观察者

@ -1,3 +1,4 @@
################ 待整理
'''
应用场景针对各个组件的 notify 方法发指令来驱动所有工作
这是一个示例性质的原型具体分布式环境下需要调整

@ -3,8 +3,8 @@ from cppy.cp_util import *
# 这个例子没有实际意义,是用来帮助理解其他例子
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
def readfile(path_to_file, func):
data = read_file(path_to_file)
def readfile(file_path, func):
data = read_file(file_path)
func(data, frequencies)
def extractwords(str_data,func):
@ -17,7 +17,7 @@ def frequencies(word_list, func):
def sort(wf, func):
func(sort_dict(wf), None)
def printall(word_freqs, func):
def printall(word_freqs, _ ):
print_word_freqs(word_freqs)
if __name__ == "__main__":

@ -1,3 +1,4 @@
################ 待整理
'''
多线程各个模块比较乱的但是协作序贯的完成了数据处理
各个组件完全不能互操作仅依靠队列发消息进行协作
@ -10,15 +11,15 @@ from threading import Thread
from queue import Queue
from cppy.cp_util import *
class ActiveWFObject(Thread):
class ThreadObject(Thread):
def __init__(self):
super().__init__()
self.queue = Queue()
self._stopMe = False
self._over = False
self.start()
def run(self):
while not self._stopMe:
while not self._over:
message = self.queue.get()
self._dispatch(message)
if message[0] == 'over':
@ -27,8 +28,7 @@ class ActiveWFObject(Thread):
def send(receiver, message):
receiver.queue.put(message)
class DataStorageManager(ActiveWFObject):
""" Models the contents of the file """
class TxtManager(ThreadObject):
_data = ''
def _dispatch(self, message):
@ -37,7 +37,6 @@ class DataStorageManager(ActiveWFObject):
elif message[0] == 'send_word_freqs':
self._process_words(message[1:])
else:
# forward
send(self._stop_word_manager, message)
def _init(self, message):
@ -50,8 +49,8 @@ class DataStorageManager(ActiveWFObject):
send(self._stop_word_manager, ['filter', w])
send(self._stop_word_manager, ['topWord', recipient])
class StopWordManager(ActiveWFObject):
""" Models the stop word filter """
class FilterManager(ThreadObject):
_stop_words = []
def _dispatch(self, message):
@ -60,7 +59,6 @@ class StopWordManager(ActiveWFObject):
elif message[0] == 'filter':
return self._filter(message[1:])
else:
# forward
send(self._word_freqs_manager, message)
def _init(self, message):
@ -72,8 +70,7 @@ class StopWordManager(ActiveWFObject):
if word not in self._stop_words:
send(self._word_freqs_manager, ['word', word])
class WordFrequencyManager(ActiveWFObject):
""" Keeps the word frequency data """
class WFManager(ThreadObject):
_word_freqs = {}
def _dispatch(self, message):
@ -91,8 +88,7 @@ class WordFrequencyManager(ActiveWFObject):
freqs_sorted = sort_dict ( self._word_freqs )
send(recipient, ['topWord', freqs_sorted])
class MyController(ActiveWFObject):
class MyController(ThreadObject):
def _dispatch(self, message):
if message[0] == 'run':
self._run(message[1:])
@ -109,13 +105,13 @@ class MyController(ActiveWFObject):
word_freqs, = message
print_word_freqs( word_freqs)
send(self._storage_manager, ['over'])
self._stopMe = True
self._over = True
if __name__ == '__main__':
word_freq_manager = WordFrequencyManager()
stop_word_manager = StopWordManager()
storage_manager = DataStorageManager()
word_freq_manager = WFManager()
stop_word_manager = FilterManager()
storage_manager = TxtManager()
wfcontroller = MyController()
send(storage_manager, ['init', testfilepath, stop_word_manager])

Loading…
Cancel
Save