From 850a3eb772a6cff58332390a481967a3aabcd852 Mon Sep 17 00:00:00 2001 From: zj3D Date: Sat, 23 Mar 2024 15:57:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B3=A8=E5=86=8C=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{反射装饰.py => 1 反射装饰.py} | 0 .../2 装饰/{装饰器.py => 2 装饰器.py} | 0 .../2 装饰/3 参数类型检查.py | 0 .../1 消息驱动的组件/1 指令驱动.py | 94 ---------------- .../1 松耦合/1 观察者/1 观察者.py | 18 ++- .../1 松耦合/1 观察者/2 观察者.py | 69 ++++++++++++ .../1 松耦合/1 观察者/3 注册回调.py | 85 ++++++++++++++ .../1 松耦合/1 观察者/4 订阅发布.py | 105 ++++++++++++++++++ .../1 松耦合/1 观察者/readme.MD | 9 ++ .../2 注册登记/注册回调.py | 92 --------------- .../2 注册登记/消息订阅.py | 93 ---------------- .../2 消息链/1 只有消息接口.py | 97 ++++++++++++++++ .../2 调用链py} | 4 +- .../1 松耦合/2 消息链/3 消息链.py | 102 +++++++++++++++++ .../4 消息队列.py} | 8 +- 16 其它/{对象设计模式 => }/享元.py | 0 16 files changed, 488 insertions(+), 288 deletions(-) rename 12 语言特性/2 装饰/{反射装饰.py => 1 反射装饰.py} (100%) rename 12 语言特性/2 装饰/{装饰器.py => 2 装饰器.py} (100%) rename 15 工程化/3 类型申明/参数类型检查.py => 12 语言特性/2 装饰/3 参数类型检查.py (100%) delete mode 100644 15 工程化/1 松耦合/1 消息驱动的组件/1 指令驱动.py rename 16 其它/对象设计模式/观察者.py => 15 工程化/1 松耦合/1 观察者/1 观察者.py (74%) create mode 100644 15 工程化/1 松耦合/1 观察者/2 观察者.py create mode 100644 15 工程化/1 松耦合/1 观察者/3 注册回调.py create mode 100644 15 工程化/1 松耦合/1 观察者/4 订阅发布.py create mode 100644 15 工程化/1 松耦合/1 观察者/readme.MD delete mode 100644 15 工程化/1 松耦合/2 注册登记/注册回调.py delete mode 100644 15 工程化/1 松耦合/2 注册登记/消息订阅.py create mode 100644 15 工程化/1 松耦合/2 消息链/1 只有消息接口.py rename 15 工程化/1 松耦合/{1 消息驱动的组件/2 去中心化开始.py => 2 消息链/2 调用链py} (69%) create mode 100644 15 工程化/1 松耦合/2 消息链/3 消息链.py rename 15 工程化/1 松耦合/{1 消息驱动的组件/3 仅有消息接口.py => 2 消息链/4 消息队列.py} (90%) rename 16 其它/{对象设计模式 => }/享元.py (100%) diff --git a/12 语言特性/2 装饰/反射装饰.py b/12 语言特性/2 装饰/1 反射装饰.py similarity index 100% rename from 12 语言特性/2 装饰/反射装饰.py rename to 12 语言特性/2 装饰/1 反射装饰.py diff --git a/12 语言特性/2 装饰/装饰器.py b/12 语言特性/2 装饰/2 装饰器.py similarity index 100% rename from 12 语言特性/2 装饰/装饰器.py rename to 12 语言特性/2 装饰/2 装饰器.py diff --git a/15 工程化/3 类型申明/参数类型检查.py b/12 语言特性/2 装饰/3 参数类型检查.py similarity index 100% rename from 15 工程化/3 类型申明/参数类型检查.py rename to 12 语言特性/2 装饰/3 参数类型检查.py diff --git a/15 工程化/1 松耦合/1 消息驱动的组件/1 指令驱动.py b/15 工程化/1 松耦合/1 消息驱动的组件/1 指令驱动.py deleted file mode 100644 index d881a63..0000000 --- a/15 工程化/1 松耦合/1 消息驱动的组件/1 指令驱动.py +++ /dev/null @@ -1,94 +0,0 @@ -''' -依靠给各个组件的 dispatch 调用接口发指令来驱动所有工作 -这是一个示例性质的原型,具体环境下需要调整 -''' -from cppy.cp_util import * - -class DataStorageManager(): - """ Models the contents of the file """ - def __init__(self): - self._data = [] - - def dispatch(self, message): - if message[0] == 'init': - return self._init(message[1]) - elif message[0] == 'words': - return self._words() - else: - raise Exception("Message not understood " + message[0]) - - def _init(self, path_to_file): - self._data = re_split( read_file(path_to_file) ) - - def _words(self): - return self._data - - -class StopWordManager(): - """ Models the stop word filter """ - _stop_words = [] - - def dispatch(self, message): - if message[0] == 'init': - return self._init() - elif message[0] == 'is_stop_word': - return self._is_stop_word(message[1]) - else: - raise Exception("Message not understood " + message[0]) - - def _init(self): - self._stop_words = get_stopwords() - - def _is_stop_word(self, word): - return word in self._stop_words - - -class WordFrequencyManager(): - """ Keeps the word frequency data """ - _word_freqs = {} - - def dispatch(self, message): - if message[0] == 'increment_count': - return self._increment_count(message[1]) - elif message[0] == 'sorted': - return self._sorted() - else: - raise Exception("Message not understood " + message[0]) - - def _increment_count(self, word): - self._word_freqs[word] = self._word_freqs.get(word,0) + 1 - - def _sorted(self): - return sort_dict(self._word_freqs) - - -class WordFrequencyController(): - - def dispatch(self, message): - if message[0] == 'init': - return self._init(message[1]) - elif message[0] == 'run': - return self._run() - else: - raise Exception("Message not understood " + message[0]) - - def _init(self, path_to_file): - self._storage_manager = DataStorageManager() - self._stop_word_manager = StopWordManager() - self._word_freq_manager = WordFrequencyManager() - self._storage_manager.dispatch(['init', path_to_file]) - self._stop_word_manager.dispatch(['init']) - - def _run(self): - for w in self._storage_manager.dispatch(['words']): - if not self._stop_word_manager.dispatch(['is_stop_word', w]): - self._word_freq_manager.dispatch(['increment_count', w]) - - word_freqs = self._word_freq_manager.dispatch(['sorted']) - print_word_freqs(word_freqs) - - -if __name__ == '__main__': - wfcontroller = WordFrequencyController() - wfcontroller.dispatch(['init', testfilepath]) - wfcontroller.dispatch(['run']) diff --git a/16 其它/对象设计模式/观察者.py b/15 工程化/1 松耦合/1 观察者/1 观察者.py similarity index 74% rename from 16 其它/对象设计模式/观察者.py rename to 15 工程化/1 松耦合/1 观察者/1 观察者.py index 9f6c962..c360af9 100644 --- a/16 其它/对象设计模式/观察者.py +++ b/15 工程化/1 松耦合/1 观察者/1 观察者.py @@ -1,12 +1,22 @@ +''' +入门级示例,是用来帮助理解其他例子 + +把观察者挂到自己的处理队列上 +适当时机调用所有队列上的约定的观察者的 update 方法 +如果观察者有多个职能参与不同的任务链,不一定要统一命名update方法 + +这是一个示例性质的原型,具体环境下需要调整 +''' + import collections from abc import ABC, abstractmethod from cppy.cp_util import * # 定义观察者接口 ,在 Pyhon中并不是必须 -class Observer(ABC): - @abstractmethod - def update(self, word): - pass +class Observer(ABC): + @abstractmethod + def update(self, word): + pass # 定义具体观察者类,用于统计词频 class WordFrequencyObserver(Observer): diff --git a/15 工程化/1 松耦合/1 观察者/2 观察者.py b/15 工程化/1 松耦合/1 观察者/2 观察者.py new file mode 100644 index 0000000..9be5fe4 --- /dev/null +++ b/15 工程化/1 松耦合/1 观察者/2 观察者.py @@ -0,0 +1,69 @@ + +''' +本例的基本模式还是观察者 +基类 Subject 提供注册和提醒注册上的对象提醒机制 + +因为函数和参数混杂在一起传递,使得各个模块的处理结构其实是 case by case +''' + +from collections import Counter +from typing import List +from cppy.cp_util import * + +class Subject: + def register_handler(self, handler: callable, *args, **kwargs): + self.handler = handler + self.args = args + self.kwargs = kwargs + + def notify(self, *args, **kwargs): + self.handler( self.data, *self.args, **self.kwargs) + + + +# 组件一:TextLoader - 负责读取文本并过滤停用词 +class TextLoader(Subject): + def load_text(self, filename: str) -> List[str]: + return extract_file_words(filename) + + def notify(self, *args, **kwargs): + filename = args[0] + self.data = self.load_text(filename) + super().notify(self.data, *args, **kwargs) + + +# 组件二:WordCounter - 计算词频 +class WordCounter(Subject): + def count_words(self, words: List[str]) -> dict: + return Counter(words) + + def notify(self, *args, **kwargs ): + words = args[0] + self.data = self.count_words(words) + super().notify(self.data, *args, **kwargs) + + +# 组件三:TopWordsPresenter - 排序并输出前10个词 +class TopWordsPresenter(Subject): + def notify(self, words,*args, **kwargs): + n = args[0] + top_words = words.most_common(n) + print_word_freqs( top_words ) + + +# 主程序逻辑 +def main(): + loader = TextLoader() + counter = WordCounter() + presenter = TopWordsPresenter() + + # 注册事件处理器 + loader.register_handler(counter.notify) + counter.register_handler( presenter.notify,10 ) + + # 触发加载文本并开始流程 + loader.notify(testfilepath) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/15 工程化/1 松耦合/1 观察者/3 注册回调.py b/15 工程化/1 松耦合/1 观察者/3 注册回调.py new file mode 100644 index 0000000..b5854cf --- /dev/null +++ b/15 工程化/1 松耦合/1 观察者/3 注册回调.py @@ -0,0 +1,85 @@ +''' +注册者 = 观察者 + +每个组件提供注册消息接口和注册消息动作 + +在其它单元上注册自己对于特定事件(消息)的响应函数 +同时负责自己的注册队列的序贯调用 +Python 中有一个Callable类型,可以用来判断是否是可以回调类型 +from typing import Callable + +这是一个示例性质的原型,具体分布式环境下需要调整 +''' + +from collections import defaultdict +from cppy.cp_util import * + +# +# event_manager +# +class EventManager: + def __init__(self): + self.load_handlers = [] # 用于加载文件的事件处理器 + self.process_handlers = [] # 用于处理数据的事件处理器 + self.end_handlers = [] # 用于结束流程的事件处理器 + + def register_load_event(self, handler): + self.load_handlers.append(handler) + + def register_process_event(self, handler): + self.process_handlers.append(handler) + + def register_end_event(self, handler): + self.end_handlers.append(handler) + + # 运行框架,按顺序执行注册的事件处理器 + def run(self, file_path): + for handler in self.load_handlers: handler(file_path) + for handler in self.process_handlers: handler() + for handler in self.end_handlers: handler() + + +# +# 功能组件 +# +# 定义数据存储类,用于模拟文件内容的加载和处理 +class TextData: + _word_event_handlers = [] + + def __init__( self, event_manager ): + self._stop_words = get_stopwords() + event_manager.register_load_event(self.__load) + event_manager.register_process_event(self.__process_words) + + def __load(self, path_to_file): + self._data = re_split( read_file(path_to_file) ) + + def __process_words(self): + for word in self._data: + if word not in self._stop_words: + for handler in self._word_event_handlers: + handler(word) + + def register_word_event(self, handler): + self._word_event_handlers.append(handler) + + +class WordFrequencyCounter: + def __init__(self, event_manager, data_storage): + self._word_freqs = defaultdict(int) # 存储单词频率 + data_storage.register_word_event(self.__increment_count) # 注册单词事件 + event_manager.register_end_event(self.__print_freqs) # 注册结束事件 + + def __increment_count(self, word): + self._word_freqs[word] += 1 + + def __print_freqs(self): + print_word_freqs ( sort_dict (self._word_freqs) ) + + +if __name__ == '__main__': + em = EventManager() + data_storage = TextData(em) + word_freq_counter = WordFrequencyCounter(em, data_storage) + + em.run(testfilepath) \ No newline at end of file diff --git a/15 工程化/1 松耦合/1 观察者/4 订阅发布.py b/15 工程化/1 松耦合/1 观察者/4 订阅发布.py new file mode 100644 index 0000000..5e6da70 --- /dev/null +++ b/15 工程化/1 松耦合/1 观察者/4 订阅发布.py @@ -0,0 +1,105 @@ +from cppy.cp_util import * +''' +订阅者 = 注册者 = 观察者 + +注册回调的一个变体 +要点是:中心化、统一化 + +为了简化消息订阅可能形成的复杂性, +提供一个中心消息管理器统一负责消息的订阅和回调 +各个功能组件只是完成自己的功能, +在中心管理器上订阅消息挂到自己响应的处理函数上 + +总结,相比较的改变: +- 注册的时候,通过提供一个类型字段标识不同消息 +- 其它实体不做注册和做回调,统一这两个功能到一个中心单元 + +这是一个示例性质的原型,具体分布式环境下需要调整 +''' +from collections import defaultdict +################################################# +# Event Manager +################################################# +class EventManager: + def __init__(self): + self._subs = defaultdict(list) + + def subscribe(self, event_type, handler): + self._subs[event_type].append(handler) + + def publish(self, event): + event_type = event[0] + for handle in self._subs.get(event_type, []): + handle(event) + +################################################# +# Application Entities +################################################# +class DataStorage: + def __init__(self, event_manager): + self._event_manager = event_manager + self._event_manager.subscribe('load', self._load) + self._event_manager.subscribe('start', self.produce_words) + + def _load(self, event): + self._data = extract_file_words( event[1] ) + + def produce_words(self, _): + for word in self._data: + self._event_manager.publish(('word', word )) + self._event_manager.publish(('eof', None)) + +class StopWordFilter: + def __init__(self, event_manager): + self._event_manager = event_manager + self._event_manager.subscribe('load', self.load_stop_words) + self._event_manager.subscribe('word', self.filter_word) + self._stop_words = set() + + def load_stop_words(self, _ ): + self._stop_words = set( get_stopwords() ) + + def filter_word(self, event): + word = event[1] + if word not in self._stop_words: + self._event_manager.publish(('valid_word', word)) + +class WordFrequencyCounter: + def __init__(self, event_manager): + self._event_manager = event_manager + self._event_manager.subscribe('valid_word', self.count_word) + self._event_manager.subscribe('print', self.print_freqs) + self._word_freqs = {} + + def count_word(self, event): + word = event[1] + self._word_freqs[word] = self._word_freqs.get(word, 0) + 1 + + def print_freqs(self, _ ): + print_word_freqs ( sort_dict (self._word_freqs) ) + +class WordFrequencyApp: + def __init__(self, event_manager): + self._event_manager = event_manager + self._event_manager.subscribe('run', self.start_application) + self._event_manager.subscribe('eof', self.stop_application) + + def start_application(self, event): + path_to_file = event[1] + self._event_manager.publish(('load', path_to_file)) + self._event_manager.publish(('start', )) + + def stop_application(self, _ ): + self._event_manager.publish(('print', )) + + +def main(): + event_manager = EventManager() + DataStorage( event_manager ) + StopWordFilter( event_manager ) + WordFrequencyCounter( event_manager ) + WordFrequencyApp( event_manager ) + event_manager.publish(('run', testfilepath )) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/15 工程化/1 松耦合/1 观察者/readme.MD b/15 工程化/1 松耦合/1 观察者/readme.MD new file mode 100644 index 0000000..9ad044a --- /dev/null +++ b/15 工程化/1 松耦合/1 观察者/readme.MD @@ -0,0 +1,9 @@ + +注册 +- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。 +- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。 + +- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。 +- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。 + +- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。 \ No newline at end of file diff --git a/15 工程化/1 松耦合/2 注册登记/注册回调.py b/15 工程化/1 松耦合/2 注册登记/注册回调.py deleted file mode 100644 index c4095b9..0000000 --- a/15 工程化/1 松耦合/2 注册登记/注册回调.py +++ /dev/null @@ -1,92 +0,0 @@ -''' -每个组件提供注册消息接口和注册消息动作 -把顺序的流程分解到各个组件内部实现 -这样避免回传到中心控制器 -这是一个示例性质的原型,具体分布式环境下需要调整 -''' - -from collections import defaultdict -from cppy.cp_util import * - -# -# Framework -# -class WordFrequencyFramework: - def __init__(self): - self._load_event_handlers = [] - self._dowork_event_handlers = [] - self._end_event_handlers = [] - - def register_for_load_event(self, handler): - self._load_event_handlers.append(handler) - - def register_for_dowork_event(self, handler): - self._dowork_event_handlers.append(handler) - - def register_for_end_event(self, handler): - self._end_event_handlers.append(handler) - - def run(self, path_to_file): - for h in self._load_event_handlers: h(path_to_file) - for h in self._dowork_event_handlers: h() - for h in self._end_event_handlers: h() - -# -# 功能组件 -# -class DataStorage: - """ Models the contents of the file """ - _data = '' - _stop_word_filter = None - _word_event_handlers = [] - - def __init__(self, wfapp, stop_word_filter): - self._stop_word_filter = stop_word_filter - wfapp.register_for_load_event(self.__load) - wfapp.register_for_dowork_event(self.__produce_words) - - def __load(self, path_to_file): - self._data = re_split( read_file(path_to_file) ) - - def __produce_words(self): - for w in self._data: - if not self._stop_word_filter.is_stop_word(w): - for h in self._word_event_handlers: - h(w) - - def register_for_word_event(self, handler): - self._word_event_handlers.append(handler) - -class StopWordFilter: - """ Models the stop word filter """ - _stop_words = [] - def __init__(self, wfapp): - wfapp.register_for_load_event(self.__load) - - def __load(self, ignore): - self._stop_words = get_stopwords() - - def is_stop_word(self, word): - return word in self._stop_words - - -class WordFrequencyCounter: - def __init__(self, wfapp, data_storage): - self._word_freqs = defaultdict(int) - data_storage.register_for_word_event(self.__increment_count) - wfapp.register_for_end_event(self.__print_freqs) - - def __increment_count(self, word): - self._word_freqs[word] += 1 - - def __print_freqs(self): - print_word_freqs ( sort_dict (self._word_freqs) ) - - -if __name__ == '__main__': - wfapp = WordFrequencyFramework() - stop_word_filter = StopWordFilter(wfapp) - data_storage = DataStorage(wfapp, stop_word_filter) - word_freq_counter = WordFrequencyCounter(wfapp, data_storage) - - wfapp.run(testfilepath) \ No newline at end of file diff --git a/15 工程化/1 松耦合/2 注册登记/消息订阅.py b/15 工程化/1 松耦合/2 注册登记/消息订阅.py deleted file mode 100644 index f638f42..0000000 --- a/15 工程化/1 松耦合/2 注册登记/消息订阅.py +++ /dev/null @@ -1,93 +0,0 @@ -from cppy.cp_util import * -''' -注册回调的一个变体 -提供一个中心消息管理器,统一管理消息的订阅和通知 -这是一个示例性质的原型,具体分布式环境下需要调整 -''' - - -################################################# -# The event management -################################################# -class EventManager: - def __init__(self): - self._subscriptions = {} - - def subscribe(self, event_type, handler): - self._subscriptions.setdefault(event_type, []).append(handler) - - def publish(self, event): - event_type = event[0] - for h in self._subscriptions.get(event_type, []): - h(event) - -################################################# -# The application entities -################################################# -class DataStorage: - """ Models the contents of the file """ - def __init__(self, event_manager): - self._event_manager = event_manager - self._event_manager.subscribe('load', self.load) - self._event_manager.subscribe('start', self.produce_words) - - def load(self, event): - self._data = extract_file_words( event[1] ) - - def produce_words(self, event): - for w in self._data: - self._event_manager.publish(('word', w)) - self._event_manager.publish(('eof', None)) - -class StopWordFilter: - """ Models the stop word filter """ - def __init__(self, event_manager): - self._stop_words = [] - self._event_manager = event_manager - self._event_manager.subscribe('load', self.load) - self._event_manager.subscribe('word', self.is_stop_word) - - def load(self, event): - self._stop_words = get_stopwords() - - def is_stop_word(self, event): - word = event[1] - if word not in self._stop_words: - self._event_manager.publish(('valid_word', word)) - -class WordFrequencyCounter: - """ Keeps the word frequency data """ - def __init__(self, event_manager): - self._word_freqs = {} - self._event_manager = event_manager - self._event_manager.subscribe('valid_word', self.increment_count) - self._event_manager.subscribe('print', self.print_freqs) - - def increment_count(self, event): - word = event[1] - self._word_freqs[word] = self._word_freqs.get(word, 0) + 1 - - def print_freqs(self, event): - print_word_freqs ( sort_dict (self._word_freqs) ) - - -class WordFrequencyApplication: - def __init__(self, event_manager): - self._event_manager = event_manager - self._event_manager.subscribe('run', self.run) - self._event_manager.subscribe('eof', self.stop) - - def run(self, event): - path_to_file = event[1] - self._event_manager.publish(('load', path_to_file)) - self._event_manager.publish(('start', None)) - - def stop(self, event): - self._event_manager.publish(('print', None)) - - -if __name__ == "__main__": - em = EventManager() - DataStorage(em), StopWordFilter(em), WordFrequencyCounter(em) - WordFrequencyApplication(em) - em.publish(('run', testfilepath )) \ No newline at end of file diff --git a/15 工程化/1 松耦合/2 消息链/1 只有消息接口.py b/15 工程化/1 松耦合/2 消息链/1 只有消息接口.py new file mode 100644 index 0000000..a020795 --- /dev/null +++ b/15 工程化/1 松耦合/2 消息链/1 只有消息接口.py @@ -0,0 +1,97 @@ +''' +应用场景针对各个组件的 notify 方法发指令来驱动所有工作 +这是一个示例性质的原型,具体分布式环境下需要调整 + +notify 用了四种写法,是和本主题无关的测试 +''' +from cppy.cp_util import * +from collections import defaultdict + +badmsg = lambda : exec (''' raise Exception("Message not understood " , action ) ''') +class fff: + def __init__(self, d): + self._data = defaultdict( badmsg ) + self._data.update(d) + + def __getitem__(self, key): + return self._data[key] + + +class DataStorageMod(): + def __init__(self): + self._data = [] + + def notify(self, action, *args): + return { + 'init': lambda : self._init, + 'words': lambda : self._words + }.get( action , badmsg )()(*args) + + def _init(self, path_to_file): + self._data = re_split( read_file(path_to_file) ) + + def _words(self): + return self._data + + +class StopWordMod(): + _stop_words = [] + + def notify(self, action, *args): + return { 'init': self._init, + 'is_stop_word': self._is_stop_word + }[ action ](*args) + + def _init(self): + self._stop_words = get_stopwords() + + def _is_stop_word(self, wordx): + return wordx in self._stop_words + + +class WordFrequencyMod(): + _word_freqs = {} + + def notify(self, action, *args): + return fff( { + 'increment_count': lambda : self._increment_count, + 'sorted': lambda : self._sorted + })[ action ]()(*args) + + def _increment_count(self, word): + self._word_freqs[word] = self._word_freqs.get(word,0) + 1 + + def _sorted(self): + return sort_dict(self._word_freqs) + + +class ScenarioManager(): + + def notify(self, action, *args): + if action == 'init': + return self._init( *args) + elif action == 'run': + return self._run() + else: + raise Exception("Message not understood " + action ) + + def _init(self, path_to_file): + self._storage_manager = DataStorageMod() + self._stop_word_manager = StopWordMod() + self._word_freq_manager = WordFrequencyMod() + self._storage_manager.notify('init', path_to_file) + self._stop_word_manager.notify('init') + + def _run(self): + for word in self._storage_manager.notify('words'): + if not self._stop_word_manager.notify('is_stop_word', word): + self._word_freq_manager.notify('increment_count', word ) + + word_freqs = self._word_freq_manager.notify('sorted') + print_word_freqs(word_freqs) + + +if __name__ == '__main__': + sm = ScenarioManager() + sm.notify('init', testfilepath) + sm.notify('run') \ No newline at end of file diff --git a/15 工程化/1 松耦合/1 消息驱动的组件/2 去中心化开始.py b/15 工程化/1 松耦合/2 消息链/2 调用链py similarity index 69% rename from 15 工程化/1 松耦合/1 消息驱动的组件/2 去中心化开始.py rename to 15 工程化/1 松耦合/2 消息链/2 调用链py index f0e448b..8190bc6 100644 --- a/15 工程化/1 松耦合/1 消息驱动的组件/2 去中心化开始.py +++ b/15 工程化/1 松耦合/2 消息链/2 调用链py @@ -1,7 +1,7 @@ from cppy.cp_util import * -# 这个例子没有实际意义,是用来帮助理解下一个例子 -# 主程序只需要做第一件事情,后面的顺序逻辑写到各个函数里面了 +# 这个例子没有实际意义,是用来帮助理解其他例子 +# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了 def readfile(path_to_file, func): data = read_file(path_to_file) diff --git a/15 工程化/1 松耦合/2 消息链/3 消息链.py b/15 工程化/1 松耦合/2 消息链/3 消息链.py new file mode 100644 index 0000000..68c10cd --- /dev/null +++ b/15 工程化/1 松耦合/2 消息链/3 消息链.py @@ -0,0 +1,102 @@ +''' +后续组件挂载到前序组件后续链上 +仅提供 self.next_observer 的抽象关系 +后续组件接到指令和数据,自己决定动作 + +理论上每个组件可以参与到多个生产队列 + +本例使用了类来封装消息,相对于字符串理论上提供了更丰富的扩展可能 +这是一个示例性质的原型,具体环境下需要调整 +''' + +from collections import Counter +from typing import List, Dict +from cppy.cp_util import * + +# 定义消息类型 +class Message: + def __init__(self, data): + self.data = data + +class TokenizedText(Message): + pass + +class FilteredText(Message): + pass + +class WordFrequency(Message): + pass + +# 定义观察者接口 +class Observer: + def notify(self, message: Message): + pass + +# 切词订阅者 +class TokenizerSubscriber(Observer): + def __init__(self, next_observer: Observer): + self.next_observer = next_observer + + def notify(self, message: Message): + if not isinstance(message.data, str): + return + tokenized_text = re_split(message.data) + self.next_observer.notify(TokenizedText(tokenized_text)) + +# 停用词订阅者 +class StopWordsRemoverSubscriber(Observer): + def __init__(self, next_observer: Observer, stop_words: List[str]): + self.next_observer = next_observer + self.stop_words = set(stop_words) + + def notify(self, message: Message): + if not isinstance(message, TokenizedText): + return + filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ] + self.next_observer.notify(FilteredText(filtered_text)) + +# 词频统计订阅者 +class WordFrequencyCalculatorSubscriber(Observer): + def __init__(self, next_observer: Observer): + self.next_observer = next_observer + + def notify(self, message: Message): + if not isinstance(message, FilteredText): + return + word_freq = Counter(message.data) + self.next_observer.notify( WordFrequency(word_freq) ) + + +# 输出前N个词订阅者 +class TopNWordsDisplaySubscriber(Observer): + def __init__(self, n: int): + self.n = n + + def notify(self, message: Message): + if not isinstance(message, WordFrequency): + return + print_word_freqs( message.data.most_common(self.n) ) + + +# 模拟发布者 +def publish_text(text: str, observers: List[Observer]): + for observer in observers: + observer.notify(Message(text)) + +# 主函数 +def main(): + text = read_file() + + stop_words = get_stopwords() + + # 创建订阅者链 + display_subscriber = TopNWordsDisplaySubscriber( n=10 ) + freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber) + stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words) + tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber) + + # 发布文本 + publish_text(text, [tokenizer_subscriber]) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/15 工程化/1 松耦合/1 消息驱动的组件/3 仅有消息接口.py b/15 工程化/1 松耦合/2 消息链/4 消息队列.py similarity index 90% rename from 15 工程化/1 松耦合/1 消息驱动的组件/3 仅有消息接口.py rename to 15 工程化/1 松耦合/2 消息链/4 消息队列.py index 5530a6e..13719fd 100644 --- a/15 工程化/1 松耦合/1 消息驱动的组件/3 仅有消息接口.py +++ b/15 工程化/1 松耦合/2 消息链/4 消息队列.py @@ -1,7 +1,9 @@ ''' -依靠给各个不同线程组件的队列发指令来驱动所有工作,比较繁琐。 -比较 01.py 的实现,各个组件完全不能互操作,仅依靠队列发消息进行协作 -这是一个示例性质的原型,具体分布式环境下需要调整 +多线程各个模块比较乱的但是协作序贯的完成了数据处理 +各个组件完全不能互操作,仅依靠队列发消息进行协作 + +适合环节多,数据可分块,有IO-计算性能设计考量要求,让各个模块自己适应调整 +在某些情况下,可以避免复杂的控制流设计,使代码简洁 ''' from threading import Thread diff --git a/16 其它/对象设计模式/享元.py b/16 其它/享元.py similarity index 100% rename from 16 其它/对象设计模式/享元.py rename to 16 其它/享元.py