注册消息

dev
zj3D 8 months ago
parent 8d5c578da8
commit 850a3eb772

@ -1,94 +0,0 @@
'''
依靠给各个组件的 dispatch 调用接口发指令来驱动所有工作
这是一个示例性质的原型具体环境下需要调整
'''
from cppy.cp_util import *
class DataStorageManager():
""" Models the contents of the file """
def __init__(self):
self._data = []
def dispatch(self, message):
if message[0] == 'init':
return self._init(message[1])
elif message[0] == 'words':
return self._words()
else:
raise Exception("Message not understood " + message[0])
def _init(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
class StopWordManager():
""" Models the stop word filter """
_stop_words = []
def dispatch(self, message):
if message[0] == 'init':
return self._init()
elif message[0] == 'is_stop_word':
return self._is_stop_word(message[1])
else:
raise Exception("Message not understood " + message[0])
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, word):
return word in self._stop_words
class WordFrequencyManager():
""" Keeps the word frequency data """
_word_freqs = {}
def dispatch(self, message):
if message[0] == 'increment_count':
return self._increment_count(message[1])
elif message[0] == 'sorted':
return self._sorted()
else:
raise Exception("Message not understood " + message[0])
def _increment_count(self, word):
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sort_dict(self._word_freqs)
class WordFrequencyController():
def dispatch(self, message):
if message[0] == 'init':
return self._init(message[1])
elif message[0] == 'run':
return self._run()
else:
raise Exception("Message not understood " + message[0])
def _init(self, path_to_file):
self._storage_manager = DataStorageManager()
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
self._storage_manager.dispatch(['init', path_to_file])
self._stop_word_manager.dispatch(['init'])
def _run(self):
for w in self._storage_manager.dispatch(['words']):
if not self._stop_word_manager.dispatch(['is_stop_word', w]):
self._word_freq_manager.dispatch(['increment_count', w])
word_freqs = self._word_freq_manager.dispatch(['sorted'])
print_word_freqs(word_freqs)
if __name__ == '__main__':
wfcontroller = WordFrequencyController()
wfcontroller.dispatch(['init', testfilepath])
wfcontroller.dispatch(['run'])

@ -1,12 +1,22 @@
'''
入门级示例是用来帮助理解其他例子
把观察者挂到自己的处理队列上
适当时机调用所有队列上的约定的观察者的 update 方法
如果观察者有多个职能参与不同的任务链不一定要统一命名update方法
这是一个示例性质的原型具体环境下需要调整
'''
import collections
from abc import ABC, abstractmethod
from cppy.cp_util import *
# 定义观察者接口 ,在 Pyhon中并不是必须
class Observer(ABC):
@abstractmethod
def update(self, word):
pass
class Observer(ABC):
@abstractmethod
def update(self, word):
pass
# 定义具体观察者类,用于统计词频
class WordFrequencyObserver(Observer):

@ -0,0 +1,69 @@
'''
本例的基本模式还是观察者
基类 Subject 提供注册和提醒注册上的对象提醒机制
因为函数和参数混杂在一起传递使得各个模块的处理结构其实是 case by case
'''
from collections import Counter
from typing import List
from cppy.cp_util import *
class Subject:
def register_handler(self, handler: callable, *args, **kwargs):
self.handler = handler
self.args = args
self.kwargs = kwargs
def notify(self, *args, **kwargs):
self.handler( self.data, *self.args, **self.kwargs)
# 组件一TextLoader - 负责读取文本并过滤停用词
class TextLoader(Subject):
def load_text(self, filename: str) -> List[str]:
return extract_file_words(filename)
def notify(self, *args, **kwargs):
filename = args[0]
self.data = self.load_text(filename)
super().notify(self.data, *args, **kwargs)
# 组件二WordCounter - 计算词频
class WordCounter(Subject):
def count_words(self, words: List[str]) -> dict:
return Counter(words)
def notify(self, *args, **kwargs ):
words = args[0]
self.data = self.count_words(words)
super().notify(self.data, *args, **kwargs)
# 组件三TopWordsPresenter - 排序并输出前10个词
class TopWordsPresenter(Subject):
def notify(self, words,*args, **kwargs):
n = args[0]
top_words = words.most_common(n)
print_word_freqs( top_words )
# 主程序逻辑
def main():
loader = TextLoader()
counter = WordCounter()
presenter = TopWordsPresenter()
# 注册事件处理器
loader.register_handler(counter.notify)
counter.register_handler( presenter.notify,10 )
# 触发加载文本并开始流程
loader.notify(testfilepath)
if __name__ == "__main__":
main()

@ -0,0 +1,85 @@
'''
注册者 = 观察者
每个组件提供注册消息接口和注册消息动作
在其它单元上注册自己对于特定事件消息的响应函数
同时负责自己的注册队列的序贯调用
Python 中有一个Callable类型可以用来判断是否是可以回调类型
from typing import Callable
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
from cppy.cp_util import *
#
# event_manager
#
class EventManager:
def __init__(self):
self.load_handlers = [] # 用于加载文件的事件处理器
self.process_handlers = [] # 用于处理数据的事件处理器
self.end_handlers = [] # 用于结束流程的事件处理器
def register_load_event(self, handler):
self.load_handlers.append(handler)
def register_process_event(self, handler):
self.process_handlers.append(handler)
def register_end_event(self, handler):
self.end_handlers.append(handler)
# 运行框架,按顺序执行注册的事件处理器
def run(self, file_path):
for handler in self.load_handlers: handler(file_path)
for handler in self.process_handlers: handler()
for handler in self.end_handlers: handler()
#
# 功能组件
#
# 定义数据存储类,用于模拟文件内容的加载和处理
class TextData:
_word_event_handlers = []
def __init__( self, event_manager ):
self._stop_words = get_stopwords()
event_manager.register_load_event(self.__load)
event_manager.register_process_event(self.__process_words)
def __load(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def __process_words(self):
for word in self._data:
if word not in self._stop_words:
for handler in self._word_event_handlers:
handler(word)
def register_word_event(self, handler):
self._word_event_handlers.append(handler)
class WordFrequencyCounter:
def __init__(self, event_manager, data_storage):
self._word_freqs = defaultdict(int) # 存储单词频率
data_storage.register_word_event(self.__increment_count) # 注册单词事件
event_manager.register_end_event(self.__print_freqs) # 注册结束事件
def __increment_count(self, word):
self._word_freqs[word] += 1
def __print_freqs(self):
print_word_freqs ( sort_dict (self._word_freqs) )
if __name__ == '__main__':
em = EventManager()
data_storage = TextData(em)
word_freq_counter = WordFrequencyCounter(em, data_storage)
em.run(testfilepath)

@ -0,0 +1,105 @@
from cppy.cp_util import *
'''
订阅者 = 注册者 = 观察者
注册回调的一个变体
要点是中心化统一化
为了简化消息订阅可能形成的复杂性
提供一个中心消息管理器统一负责消息的订阅和回调
各个功能组件只是完成自己的功能
在中心管理器上订阅消息挂到自己响应的处理函数上
总结相比较的改变
- 注册的时候通过提供一个类型字段标识不同消息
- 其它实体不做注册和做回调统一这两个功能到一个中心单元
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
#################################################
# Event Manager
#################################################
class EventManager:
def __init__(self):
self._subs = defaultdict(list)
def subscribe(self, event_type, handler):
self._subs[event_type].append(handler)
def publish(self, event):
event_type = event[0]
for handle in self._subs.get(event_type, []):
handle(event)
#################################################
# Application Entities
#################################################
class DataStorage:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self._load)
self._event_manager.subscribe('start', self.produce_words)
def _load(self, event):
self._data = extract_file_words( event[1] )
def produce_words(self, _):
for word in self._data:
self._event_manager.publish(('word', word ))
self._event_manager.publish(('eof', None))
class StopWordFilter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load_stop_words)
self._event_manager.subscribe('word', self.filter_word)
self._stop_words = set()
def load_stop_words(self, _ ):
self._stop_words = set( get_stopwords() )
def filter_word(self, event):
word = event[1]
if word not in self._stop_words:
self._event_manager.publish(('valid_word', word))
class WordFrequencyCounter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('valid_word', self.count_word)
self._event_manager.subscribe('print', self.print_freqs)
self._word_freqs = {}
def count_word(self, event):
word = event[1]
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def print_freqs(self, _ ):
print_word_freqs ( sort_dict (self._word_freqs) )
class WordFrequencyApp:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('run', self.start_application)
self._event_manager.subscribe('eof', self.stop_application)
def start_application(self, event):
path_to_file = event[1]
self._event_manager.publish(('load', path_to_file))
self._event_manager.publish(('start', ))
def stop_application(self, _ ):
self._event_manager.publish(('print', ))
def main():
event_manager = EventManager()
DataStorage( event_manager )
StopWordFilter( event_manager )
WordFrequencyCounter( event_manager )
WordFrequencyApp( event_manager )
event_manager.publish(('run', testfilepath ))
if __name__ == "__main__":
main()

@ -0,0 +1,9 @@
注册
- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。
- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。
- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。
- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。
- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。

@ -1,92 +0,0 @@
'''
每个组件提供注册消息接口和注册消息动作
把顺序的流程分解到各个组件内部实现
这样避免回传到中心控制器
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
from cppy.cp_util import *
#
# Framework
#
class WordFrequencyFramework:
def __init__(self):
self._load_event_handlers = []
self._dowork_event_handlers = []
self._end_event_handlers = []
def register_for_load_event(self, handler):
self._load_event_handlers.append(handler)
def register_for_dowork_event(self, handler):
self._dowork_event_handlers.append(handler)
def register_for_end_event(self, handler):
self._end_event_handlers.append(handler)
def run(self, path_to_file):
for h in self._load_event_handlers: h(path_to_file)
for h in self._dowork_event_handlers: h()
for h in self._end_event_handlers: h()
#
# 功能组件
#
class DataStorage:
""" Models the contents of the file """
_data = ''
_stop_word_filter = None
_word_event_handlers = []
def __init__(self, wfapp, stop_word_filter):
self._stop_word_filter = stop_word_filter
wfapp.register_for_load_event(self.__load)
wfapp.register_for_dowork_event(self.__produce_words)
def __load(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def __produce_words(self):
for w in self._data:
if not self._stop_word_filter.is_stop_word(w):
for h in self._word_event_handlers:
h(w)
def register_for_word_event(self, handler):
self._word_event_handlers.append(handler)
class StopWordFilter:
""" Models the stop word filter """
_stop_words = []
def __init__(self, wfapp):
wfapp.register_for_load_event(self.__load)
def __load(self, ignore):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
return word in self._stop_words
class WordFrequencyCounter:
def __init__(self, wfapp, data_storage):
self._word_freqs = defaultdict(int)
data_storage.register_for_word_event(self.__increment_count)
wfapp.register_for_end_event(self.__print_freqs)
def __increment_count(self, word):
self._word_freqs[word] += 1
def __print_freqs(self):
print_word_freqs ( sort_dict (self._word_freqs) )
if __name__ == '__main__':
wfapp = WordFrequencyFramework()
stop_word_filter = StopWordFilter(wfapp)
data_storage = DataStorage(wfapp, stop_word_filter)
word_freq_counter = WordFrequencyCounter(wfapp, data_storage)
wfapp.run(testfilepath)

@ -1,93 +0,0 @@
from cppy.cp_util import *
'''
注册回调的一个变体
提供一个中心消息管理器统一管理消息的订阅和通知
这是一个示例性质的原型具体分布式环境下需要调整
'''
#################################################
# The event management
#################################################
class EventManager:
def __init__(self):
self._subscriptions = {}
def subscribe(self, event_type, handler):
self._subscriptions.setdefault(event_type, []).append(handler)
def publish(self, event):
event_type = event[0]
for h in self._subscriptions.get(event_type, []):
h(event)
#################################################
# The application entities
#################################################
class DataStorage:
""" Models the contents of the file """
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load)
self._event_manager.subscribe('start', self.produce_words)
def load(self, event):
self._data = extract_file_words( event[1] )
def produce_words(self, event):
for w in self._data:
self._event_manager.publish(('word', w))
self._event_manager.publish(('eof', None))
class StopWordFilter:
""" Models the stop word filter """
def __init__(self, event_manager):
self._stop_words = []
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load)
self._event_manager.subscribe('word', self.is_stop_word)
def load(self, event):
self._stop_words = get_stopwords()
def is_stop_word(self, event):
word = event[1]
if word not in self._stop_words:
self._event_manager.publish(('valid_word', word))
class WordFrequencyCounter:
""" Keeps the word frequency data """
def __init__(self, event_manager):
self._word_freqs = {}
self._event_manager = event_manager
self._event_manager.subscribe('valid_word', self.increment_count)
self._event_manager.subscribe('print', self.print_freqs)
def increment_count(self, event):
word = event[1]
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def print_freqs(self, event):
print_word_freqs ( sort_dict (self._word_freqs) )
class WordFrequencyApplication:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('run', self.run)
self._event_manager.subscribe('eof', self.stop)
def run(self, event):
path_to_file = event[1]
self._event_manager.publish(('load', path_to_file))
self._event_manager.publish(('start', None))
def stop(self, event):
self._event_manager.publish(('print', None))
if __name__ == "__main__":
em = EventManager()
DataStorage(em), StopWordFilter(em), WordFrequencyCounter(em)
WordFrequencyApplication(em)
em.publish(('run', testfilepath ))

@ -0,0 +1,97 @@
'''
应用场景针对各个组件的 notify 方法发指令来驱动所有工作
这是一个示例性质的原型具体分布式环境下需要调整
notify 用了四种写法是和本主题无关的测试
'''
from cppy.cp_util import *
from collections import defaultdict
badmsg = lambda : exec (''' raise Exception("Message not understood " , action ) ''')
class fff:
def __init__(self, d):
self._data = defaultdict( badmsg )
self._data.update(d)
def __getitem__(self, key):
return self._data[key]
class DataStorageMod():
def __init__(self):
self._data = []
def notify(self, action, *args):
return {
'init': lambda : self._init,
'words': lambda : self._words
}.get( action , badmsg )()(*args)
def _init(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
class StopWordMod():
_stop_words = []
def notify(self, action, *args):
return { 'init': self._init,
'is_stop_word': self._is_stop_word
}[ action ](*args)
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, wordx):
return wordx in self._stop_words
class WordFrequencyMod():
_word_freqs = {}
def notify(self, action, *args):
return fff( {
'increment_count': lambda : self._increment_count,
'sorted': lambda : self._sorted
})[ action ]()(*args)
def _increment_count(self, word):
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sort_dict(self._word_freqs)
class ScenarioManager():
def notify(self, action, *args):
if action == 'init':
return self._init( *args)
elif action == 'run':
return self._run()
else:
raise Exception("Message not understood " + action )
def _init(self, path_to_file):
self._storage_manager = DataStorageMod()
self._stop_word_manager = StopWordMod()
self._word_freq_manager = WordFrequencyMod()
self._storage_manager.notify('init', path_to_file)
self._stop_word_manager.notify('init')
def _run(self):
for word in self._storage_manager.notify('words'):
if not self._stop_word_manager.notify('is_stop_word', word):
self._word_freq_manager.notify('increment_count', word )
word_freqs = self._word_freq_manager.notify('sorted')
print_word_freqs(word_freqs)
if __name__ == '__main__':
sm = ScenarioManager()
sm.notify('init', testfilepath)
sm.notify('run')

@ -1,7 +1,7 @@
from cppy.cp_util import *
# 这个例子没有实际意义,是用来帮助理解下一个例子
# 主程序只需要做第一件事情,后面的顺序逻辑写到各个函数里面了
# 这个例子没有实际意义,是用来帮助理解其他例子
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
def readfile(path_to_file, func):
data = read_file(path_to_file)

@ -0,0 +1,102 @@
'''
后续组件挂载到前序组件后续链上
仅提供 self.next_observer 的抽象关系
后续组件接到指令和数据自己决定动作
理论上每个组件可以参与到多个生产队列
本例使用了类来封装消息相对于字符串理论上提供了更丰富的扩展可能
这是一个示例性质的原型具体环境下需要调整
'''
from collections import Counter
from typing import List, Dict
from cppy.cp_util import *
# 定义消息类型
class Message:
def __init__(self, data):
self.data = data
class TokenizedText(Message):
pass
class FilteredText(Message):
pass
class WordFrequency(Message):
pass
# 定义观察者接口
class Observer:
def notify(self, message: Message):
pass
# 切词订阅者
class TokenizerSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message.data, str):
return
tokenized_text = re_split(message.data)
self.next_observer.notify(TokenizedText(tokenized_text))
# 停用词订阅者
class StopWordsRemoverSubscriber(Observer):
def __init__(self, next_observer: Observer, stop_words: List[str]):
self.next_observer = next_observer
self.stop_words = set(stop_words)
def notify(self, message: Message):
if not isinstance(message, TokenizedText):
return
filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ]
self.next_observer.notify(FilteredText(filtered_text))
# 词频统计订阅者
class WordFrequencyCalculatorSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message, FilteredText):
return
word_freq = Counter(message.data)
self.next_observer.notify( WordFrequency(word_freq) )
# 输出前N个词订阅者
class TopNWordsDisplaySubscriber(Observer):
def __init__(self, n: int):
self.n = n
def notify(self, message: Message):
if not isinstance(message, WordFrequency):
return
print_word_freqs( message.data.most_common(self.n) )
# 模拟发布者
def publish_text(text: str, observers: List[Observer]):
for observer in observers:
observer.notify(Message(text))
# 主函数
def main():
text = read_file()
stop_words = get_stopwords()
# 创建订阅者链
display_subscriber = TopNWordsDisplaySubscriber( n=10 )
freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber)
stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words)
tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber)
# 发布文本
publish_text(text, [tokenizer_subscriber])
if __name__ == "__main__":
main()

@ -1,7 +1,9 @@
'''
依靠给各个不同线程组件的队列发指令来驱动所有工作比较繁琐
比较 01.py 的实现各个组件完全不能互操作仅依靠队列发消息进行协作
这是一个示例性质的原型具体分布式环境下需要调整
多线程各个模块比较乱的但是协作序贯的完成了数据处理
各个组件完全不能互操作仅依靠队列发消息进行协作
适合环节多数据可分块有IO-计算性能设计考量要求让各个模块自己适应调整
在某些情况下可以避免复杂的控制流设计使代码简洁
'''
from threading import Thread
Loading…
Cancel
Save