From f2729b414ff7e907e70319eab6518d9bfaeb9447 Mon Sep 17 00:00:00 2001 From: zj3D Date: Sat, 24 May 2025 00:50:52 +0800 Subject: [PATCH] 0524_1 --- .../2 松耦合/1 观察者/readme.MD | 9 -- .../注册回调 1.py} | 2 +- .../注册回调 2.py} | 2 - .../观察者 1.py} | 5 +- .../观察者 2.py} | 5 +- .../{4 插件 => 2 插件}/config.ini | 0 .../{4 插件 => 2 插件}/plugin.py | 0 .../plugins-src/buildingPyc.py | 0 .../{4 插件 => 2 插件}/plugins-src/f1.py | 0 .../{4 插件 => 2 插件}/plugins-src/f2.py | 0 .../{4 插件 => 2 插件}/plugins/f1.pyc | Bin .../{4 插件 => 2 插件}/plugins/f2.pyc | Bin .../2 松耦合/2 消息驱动/3 消息链.py | 102 ------------------ .../消息驱动}/1 消息接口.py | 0 .../消息驱动}/2 消息队列.py | 0 .../3 进程独立/消息驱动/readme.md | 2 + .../1A 嵌套调用.py} | 0 .../1B 调用链.py} | 0 .../2 方法bind.py | 0 .../3 重载管道.py | 0 .../4 类方法.py | 0 .../5 类方法.py | 0 .../14 多计算单元/并发/2 多线程.py | 48 --------- .../14 多计算单元/并发/3 多进程.py | 42 -------- .../数据共享/1 数据队列.py | 52 --------- .../数据共享/2 服务进程.py | 49 --------- .../数据库做中介}/ORM/DataQuery.py | 0 .../数据库做中介}/ORM/createDb.py | 0 .../数据库做中介}/ORM/processData.py | 0 .../数据库做中介}/tf.db | Bin .../数据库做中介}/数据库.py | 0 C 高性能编程/11 队列.md | 18 ++++ .../词频统计/1 抽象并发.py | 0 C 高性能编程/词频统计/2 多线程.py | 60 +++++++++++ C 高性能编程/词频统计/3 多进程.py | 45 ++++++++ 35 files changed, 129 insertions(+), 312 deletions(-) delete mode 100644 A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/readme.MD rename A 动机与模式/13 工程化考虑/2 松耦合/{1 观察者/3 注册回调.py => 1 解耦模式/注册回调 1.py} (97%) rename A 动机与模式/13 工程化考虑/2 松耦合/{1 观察者/4 订阅发布.py => 1 解耦模式/注册回调 2.py} (98%) rename A 动机与模式/13 工程化考虑/2 松耦合/{1 观察者/1 观察者.py => 1 解耦模式/观察者 1.py} (84%) rename A 动机与模式/13 工程化考虑/2 松耦合/{1 观察者/2 观察者.py => 1 解耦模式/观察者 2.py} (91%) rename A 动机与模式/13 工程化考虑/2 松耦合/{4 插件 => 2 插件}/config.ini (100%) rename A 动机与模式/13 工程化考虑/2 松耦合/{4 插件 => 2 插件}/plugin.py (100%) rename A 动机与模式/13 工程化考虑/2 松耦合/{4 插件 => 2 插件}/plugins-src/buildingPyc.py (100%) rename A 动机与模式/13 工程化考虑/2 松耦合/{4 插件 => 2 插件}/plugins-src/f1.py (100%) rename A 动机与模式/13 工程化考虑/2 松耦合/{4 插件 => 2 插件}/plugins-src/f2.py (100%) rename A 动机与模式/13 工程化考虑/2 松耦合/{4 插件 => 2 插件}/plugins/f1.pyc (100%) rename A 动机与模式/13 工程化考虑/2 松耦合/{4 插件 => 2 插件}/plugins/f2.pyc (100%) delete mode 100644 A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/3 消息链.py rename A 动机与模式/13 工程化考虑/2 松耦合/{2 消息驱动 => 3 进程独立/消息驱动}/1 消息接口.py (100%) rename A 动机与模式/13 工程化考虑/2 松耦合/{2 消息驱动 => 3 进程独立/消息驱动}/2 消息队列.py (100%) create mode 100644 A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/readme.md rename A 动机与模式/13 工程化考虑/{4 管道封装/1 嵌套调用.py => 4 流式风格/1A 嵌套调用.py} (100%) rename A 动机与模式/13 工程化考虑/{2 松耦合/2 消息驱动/2 调用链.py => 4 流式风格/1B 调用链.py} (100%) rename A 动机与模式/13 工程化考虑/{4 管道封装 => 4 流式风格}/2 方法bind.py (100%) rename A 动机与模式/13 工程化考虑/{4 管道封装 => 4 流式风格}/3 重载管道.py (100%) rename A 动机与模式/13 工程化考虑/{4 管道封装 => 4 流式风格}/4 类方法.py (100%) rename A 动机与模式/13 工程化考虑/{4 管道封装 => 4 流式风格}/5 类方法.py (100%) delete mode 100644 A 动机与模式/14 多计算单元/并发/2 多线程.py delete mode 100644 A 动机与模式/14 多计算单元/并发/3 多进程.py delete mode 100644 A 动机与模式/14 多计算单元/数据共享/1 数据队列.py delete mode 100644 A 动机与模式/14 多计算单元/数据共享/2 服务进程.py rename A 动机与模式/{14 多计算单元/数据共享/3 数据库 => 17 其它/数据库做中介}/ORM/DataQuery.py (100%) rename A 动机与模式/{14 多计算单元/数据共享/3 数据库 => 17 其它/数据库做中介}/ORM/createDb.py (100%) rename A 动机与模式/{14 多计算单元/数据共享/3 数据库 => 17 其它/数据库做中介}/ORM/processData.py (100%) rename A 动机与模式/{14 多计算单元/数据共享/3 数据库 => 17 其它/数据库做中介}/tf.db (100%) rename A 动机与模式/{14 多计算单元/数据共享/3 数据库 => 17 其它/数据库做中介}/数据库.py (100%) create mode 100644 C 高性能编程/11 队列.md rename A 动机与模式/14 多计算单元/并发/4 抽象并发.py => C 高性能编程/词频统计/1 抽象并发.py (100%) create mode 100644 C 高性能编程/词频统计/2 多线程.py create mode 100644 C 高性能编程/词频统计/3 多进程.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/readme.MD b/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/readme.MD deleted file mode 100644 index 9ad044a..0000000 --- a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/readme.MD +++ /dev/null @@ -1,9 +0,0 @@ - -注册 -- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。 -- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。 - -- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。 -- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。 - -- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。 \ No newline at end of file diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/3 注册回调.py b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/注册回调 1.py similarity index 97% rename from A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/3 注册回调.py rename to A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/注册回调 1.py index 00d723d..6d4b308 100644 --- a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/3 注册回调.py +++ b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/注册回调 1.py @@ -1,6 +1,6 @@ -################ 待整理 ''' 注册者 = 观察者 +你也可以把它看作订阅模式 每个组件提供注册消息接口和注册消息动作 diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/4 订阅发布.py b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/注册回调 2.py similarity index 98% rename from A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/4 订阅发布.py rename to A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/注册回调 2.py index d2bed7f..5e6da70 100644 --- a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/4 订阅发布.py +++ b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/注册回调 2.py @@ -1,5 +1,3 @@ -################ 待整理 - from cppy.cp_util import * ''' 订阅者 = 注册者 = 观察者 diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/1 观察者.py b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/观察者 1.py similarity index 84% rename from A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/1 观察者.py rename to A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/观察者 1.py index c320666..601df18 100644 --- a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/1 观察者.py +++ b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/观察者 1.py @@ -2,10 +2,7 @@ 入门级示例,是用来帮助理解其他例子 把观察者挂到自己的处理队列上 -适当时机调用所有队列上的约定的观察者的 update 方法 -如果观察者有多个职能参与不同的任务链,不一定要统一命名update方法 - -这是一个示例性质的原型,具体环境下需要调整 +WordSubject 调用所有观察者的 update 方法 ''' import collections diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/2 观察者.py b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/观察者 2.py similarity index 91% rename from A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/2 观察者.py rename to A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/观察者 2.py index 9be5fe4..23544df 100644 --- a/A 动机与模式/13 工程化考虑/2 松耦合/1 观察者/2 观察者.py +++ b/A 动机与模式/13 工程化考虑/2 松耦合/1 解耦模式/观察者 2.py @@ -1,8 +1,7 @@ - ''' -本例的基本模式还是观察者 基类 Subject 提供注册和提醒注册上的对象提醒机制 - +每个派生组件做完自己的事情,调用基类的 notify 方法 +这样就可以实现一个简单的观察者模式,也可以说是订阅模式 因为函数和参数混杂在一起传递,使得各个模块的处理结构其实是 case by case ''' diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/4 插件/config.ini b/A 动机与模式/13 工程化考虑/2 松耦合/2 插件/config.ini similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/4 插件/config.ini rename to A 动机与模式/13 工程化考虑/2 松耦合/2 插件/config.ini diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugin.py b/A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugin.py similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugin.py rename to A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugin.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins-src/buildingPyc.py b/A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins-src/buildingPyc.py similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins-src/buildingPyc.py rename to A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins-src/buildingPyc.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins-src/f1.py b/A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins-src/f1.py similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins-src/f1.py rename to A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins-src/f1.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins-src/f2.py b/A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins-src/f2.py similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins-src/f2.py rename to A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins-src/f2.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins/f1.pyc b/A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins/f1.pyc similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins/f1.pyc rename to A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins/f1.pyc diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins/f2.pyc b/A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins/f2.pyc similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/4 插件/plugins/f2.pyc rename to A 动机与模式/13 工程化考虑/2 松耦合/2 插件/plugins/f2.pyc diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/3 消息链.py b/A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/3 消息链.py deleted file mode 100644 index 68c10cd..0000000 --- a/A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/3 消息链.py +++ /dev/null @@ -1,102 +0,0 @@ -''' -后续组件挂载到前序组件后续链上 -仅提供 self.next_observer 的抽象关系 -后续组件接到指令和数据,自己决定动作 - -理论上每个组件可以参与到多个生产队列 - -本例使用了类来封装消息,相对于字符串理论上提供了更丰富的扩展可能 -这是一个示例性质的原型,具体环境下需要调整 -''' - -from collections import Counter -from typing import List, Dict -from cppy.cp_util import * - -# 定义消息类型 -class Message: - def __init__(self, data): - self.data = data - -class TokenizedText(Message): - pass - -class FilteredText(Message): - pass - -class WordFrequency(Message): - pass - -# 定义观察者接口 -class Observer: - def notify(self, message: Message): - pass - -# 切词订阅者 -class TokenizerSubscriber(Observer): - def __init__(self, next_observer: Observer): - self.next_observer = next_observer - - def notify(self, message: Message): - if not isinstance(message.data, str): - return - tokenized_text = re_split(message.data) - self.next_observer.notify(TokenizedText(tokenized_text)) - -# 停用词订阅者 -class StopWordsRemoverSubscriber(Observer): - def __init__(self, next_observer: Observer, stop_words: List[str]): - self.next_observer = next_observer - self.stop_words = set(stop_words) - - def notify(self, message: Message): - if not isinstance(message, TokenizedText): - return - filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ] - self.next_observer.notify(FilteredText(filtered_text)) - -# 词频统计订阅者 -class WordFrequencyCalculatorSubscriber(Observer): - def __init__(self, next_observer: Observer): - self.next_observer = next_observer - - def notify(self, message: Message): - if not isinstance(message, FilteredText): - return - word_freq = Counter(message.data) - self.next_observer.notify( WordFrequency(word_freq) ) - - -# 输出前N个词订阅者 -class TopNWordsDisplaySubscriber(Observer): - def __init__(self, n: int): - self.n = n - - def notify(self, message: Message): - if not isinstance(message, WordFrequency): - return - print_word_freqs( message.data.most_common(self.n) ) - - -# 模拟发布者 -def publish_text(text: str, observers: List[Observer]): - for observer in observers: - observer.notify(Message(text)) - -# 主函数 -def main(): - text = read_file() - - stop_words = get_stopwords() - - # 创建订阅者链 - display_subscriber = TopNWordsDisplaySubscriber( n=10 ) - freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber) - stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words) - tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber) - - # 发布文本 - publish_text(text, [tokenizer_subscriber]) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/1 消息接口.py b/A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/1 消息接口.py similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/1 消息接口.py rename to A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/1 消息接口.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/2 消息队列.py b/A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/2 消息队列.py similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/2 消息队列.py rename to A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/2 消息队列.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/readme.md b/A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/readme.md new file mode 100644 index 0000000..bad4c96 --- /dev/null +++ b/A 动机与模式/13 工程化考虑/2 松耦合/3 进程独立/消息驱动/readme.md @@ -0,0 +1,2 @@ + +改造下适合跨进程系统的后台响应对象设计 \ No newline at end of file diff --git a/A 动机与模式/13 工程化考虑/4 管道封装/1 嵌套调用.py b/A 动机与模式/13 工程化考虑/4 流式风格/1A 嵌套调用.py similarity index 100% rename from A 动机与模式/13 工程化考虑/4 管道封装/1 嵌套调用.py rename to A 动机与模式/13 工程化考虑/4 流式风格/1A 嵌套调用.py diff --git a/A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/2 调用链.py b/A 动机与模式/13 工程化考虑/4 流式风格/1B 调用链.py similarity index 100% rename from A 动机与模式/13 工程化考虑/2 松耦合/2 消息驱动/2 调用链.py rename to A 动机与模式/13 工程化考虑/4 流式风格/1B 调用链.py diff --git a/A 动机与模式/13 工程化考虑/4 管道封装/2 方法bind.py b/A 动机与模式/13 工程化考虑/4 流式风格/2 方法bind.py similarity index 100% rename from A 动机与模式/13 工程化考虑/4 管道封装/2 方法bind.py rename to A 动机与模式/13 工程化考虑/4 流式风格/2 方法bind.py diff --git a/A 动机与模式/13 工程化考虑/4 管道封装/3 重载管道.py b/A 动机与模式/13 工程化考虑/4 流式风格/3 重载管道.py similarity index 100% rename from A 动机与模式/13 工程化考虑/4 管道封装/3 重载管道.py rename to A 动机与模式/13 工程化考虑/4 流式风格/3 重载管道.py diff --git a/A 动机与模式/13 工程化考虑/4 管道封装/4 类方法.py b/A 动机与模式/13 工程化考虑/4 流式风格/4 类方法.py similarity index 100% rename from A 动机与模式/13 工程化考虑/4 管道封装/4 类方法.py rename to A 动机与模式/13 工程化考虑/4 流式风格/4 类方法.py diff --git a/A 动机与模式/13 工程化考虑/4 管道封装/5 类方法.py b/A 动机与模式/13 工程化考虑/4 流式风格/5 类方法.py similarity index 100% rename from A 动机与模式/13 工程化考虑/4 管道封装/5 类方法.py rename to A 动机与模式/13 工程化考虑/4 流式风格/5 类方法.py diff --git a/A 动机与模式/14 多计算单元/并发/2 多线程.py b/A 动机与模式/14 多计算单元/并发/2 多线程.py deleted file mode 100644 index 442b3d4..0000000 --- a/A 动机与模式/14 多计算单元/并发/2 多线程.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- -from collections import Counter -from cppy.cp_util import * -from multiprocessing.pool import ThreadPool - - -# -# 多线程 -# -stop_words = get_stopwords() - -def process_chunk(chunk): - # 过滤停用词 - words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ] - return Counter(words) - - -def merge_counts(counts_list): - """合并多个Counter对象的总和""" - return sum(counts_list, Counter()) - - -def thread_function(chunk, counts_list): - word_count = process_chunk(chunk) - counts_list.append(word_count) - - -@timing_decorator -def main(): - # 读数据,按1000个词一组分片 - chunks = get_chunks(testfilepath,1000) - - # 线程池 - pool = ThreadPool(len(chunks)) # 随意指定的线程数 - - counts_list = pool.map(process_chunk, chunks) - pool.close() - pool.join() - - # 合并计数 - total_counts = merge_counts(counts_list) - - # 输出最高频的n个词 - print_word_freqs(total_counts.most_common(10)) - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/A 动机与模式/14 多计算单元/并发/3 多进程.py b/A 动机与模式/14 多计算单元/并发/3 多进程.py deleted file mode 100644 index f979182..0000000 --- a/A 动机与模式/14 多计算单元/并发/3 多进程.py +++ /dev/null @@ -1,42 +0,0 @@ -# -*- coding: utf-8 -*- -import multiprocessing -from collections import Counter -from cppy.cp_util import * - - -# -# 多进程: 因为创建进程相比计算过程开销太大,结果最慢 -# -stop_words = get_stopwords() - -def process_chunk(chunk): - # 过滤停用词 - words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ] - return Counter(words) - -def merge_counts(counts_list): - """合并多个Counter对象的总和""" - return sum(counts_list, Counter()) - - -@timing_decorator -def main(): - # 读取文件内容,分割文件内容为多个块,每个块由一个进程处理 - chunks = get_chunks(testfilepath,1000) - - # 使用多进程处理每个块 - pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) - counts_list = pool.map(process_chunk, chunks) - pool.close() - pool.join() - - # 合并计数 - total_counts = merge_counts(counts_list) - - # 输出最高频的n个词 - print_word_freqs(total_counts.most_common(10)) - - -if __name__ == '__main__': - main() - diff --git a/A 动机与模式/14 多计算单元/数据共享/1 数据队列.py b/A 动机与模式/14 多计算单元/数据共享/1 数据队列.py deleted file mode 100644 index 92eb9c5..0000000 --- a/A 动机与模式/14 多计算单元/数据共享/1 数据队列.py +++ /dev/null @@ -1,52 +0,0 @@ -import threading, queue -from cppy.cp_util import * -from collections import Counter - -stop_words = get_stopwords() - -# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数 - -class WordFrequencyCounter: - def __init__(self, input_file): - self.word_space = queue.Queue() - self.freq_space = queue.Queue() - for chunk in get_chunks(input_file,3000): - self.word_space.put(chunk) - - def process_words(self): - while not self.word_space.empty(): - try: - chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据 - except queue.Empty: - break # 队列为空,退出循环 - # print(f"Worker thread ID: {threading.get_ident()}",len(chunk)) - words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ] - word_freqs = Counter(words) - self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典 - - def run(self): - workers = [ threading.Thread(target=self.process_words) for _ in range(5)] - for worker in workers: worker.start() - for worker in workers: worker.join() - - word_freqs = Counter() # 初始化一个空的Counter对象 - while not self.freq_space.empty(): - freqs = self.freq_space.get() - if freqs: # 确保freqs非空 - word_freqs.update(freqs) - - print_word_freqs ( sort_dict (word_freqs) ) - - -@timing_decorator -def main(): - counter = WordFrequencyCounter( testfilepath ) - counter.run() - -if __name__ == '__main__': - main() - -''' -在多线程之间传递数据,建议使用线程安全的队列,如queue.Queue或multiprocessing.Queue(后者也适用于多进程环境)。 -这些队列提供了线程安全的数据传输机制,可以避免竞态条件和数据损坏。 -''' \ No newline at end of file diff --git a/A 动机与模式/14 多计算单元/数据共享/2 服务进程.py b/A 动机与模式/14 多计算单元/数据共享/2 服务进程.py deleted file mode 100644 index 3042b65..0000000 --- a/A 动机与模式/14 多计算单元/数据共享/2 服务进程.py +++ /dev/null @@ -1,49 +0,0 @@ -''' -使用 multiprocessing.Manager: -Manager 提供了一个可以在不同进程之间共享和修改的数据类型,如 list, dict, Namespace 等。 -它实际上是在背后启动了一个单独的服务器进程,其他进程通过代理来访问这些共享对象。 - -怎么得到最快的一个结果,是一个试错过程:X程创建数目多少、分片的大小 ... - -使用 multiprocessing.Manager 来完成统计词频 -需要注意: - - Manager() 必须用函数包起来,不能按脚本随便放外面,否则会提示freeze_support - - 工作函数需要放到外面,不能做内部函数。否则会提示参数错误 - - 无法在 Jupyter 类似环境运行 -''' - -from cppy.cp_util import * -from collections import Counter -from multiprocessing import Manager, Process - -stop_words = get_stopwords() - -def process_chunk(chunk,word_count): - words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ] - for word in words: # 非常化时间 - word_count[word] = word_count.get(word, 0) + 1 - # word_count.update( Counter(words) ) # 类型不起作用 - -@timing_decorator -def main(): - manager = Manager() - word_count = manager.dict() - - chunks = get_chunks(testfilepath,2800) - print('-------------------',len(chunks)) - processes = [] - for chunk in chunks: - p = Process(target=process_chunk, - args=(chunk,word_count) ) - processes.append(p) - p.start() - - for p in processes: p.join() - - word_count = dict(word_count) - word_freqs = Counter(word_count).most_common(10) - print_word_freqs(word_freqs) - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/A 动机与模式/14 多计算单元/数据共享/3 数据库/ORM/DataQuery.py b/A 动机与模式/17 其它/数据库做中介/ORM/DataQuery.py similarity index 100% rename from A 动机与模式/14 多计算单元/数据共享/3 数据库/ORM/DataQuery.py rename to A 动机与模式/17 其它/数据库做中介/ORM/DataQuery.py diff --git a/A 动机与模式/14 多计算单元/数据共享/3 数据库/ORM/createDb.py b/A 动机与模式/17 其它/数据库做中介/ORM/createDb.py similarity index 100% rename from A 动机与模式/14 多计算单元/数据共享/3 数据库/ORM/createDb.py rename to A 动机与模式/17 其它/数据库做中介/ORM/createDb.py diff --git a/A 动机与模式/14 多计算单元/数据共享/3 数据库/ORM/processData.py b/A 动机与模式/17 其它/数据库做中介/ORM/processData.py similarity index 100% rename from A 动机与模式/14 多计算单元/数据共享/3 数据库/ORM/processData.py rename to A 动机与模式/17 其它/数据库做中介/ORM/processData.py diff --git a/A 动机与模式/14 多计算单元/数据共享/3 数据库/tf.db b/A 动机与模式/17 其它/数据库做中介/tf.db similarity index 100% rename from A 动机与模式/14 多计算单元/数据共享/3 数据库/tf.db rename to A 动机与模式/17 其它/数据库做中介/tf.db diff --git a/A 动机与模式/14 多计算单元/数据共享/3 数据库/数据库.py b/A 动机与模式/17 其它/数据库做中介/数据库.py similarity index 100% rename from A 动机与模式/14 多计算单元/数据共享/3 数据库/数据库.py rename to A 动机与模式/17 其它/数据库做中介/数据库.py diff --git a/C 高性能编程/11 队列.md b/C 高性能编程/11 队列.md new file mode 100644 index 0000000..1f250ff --- /dev/null +++ b/C 高性能编程/11 队列.md @@ -0,0 +1,18 @@ + +队列应用,涉及简单算法到多线程、多进程,以及分布式。 + +应用场景匹配的相应队列: +- 简单算法或小型任务:list(列表)作为队列,append() 入队,pop(0) 出队,简单易用。 +- 单线程高性能:用 collections.deque,效率最高。 +- 异步编程: 用 asyncio.Queue +- 多线程安全:用 queue.Queue +- 多线程优先级任务安全:用 queue.PriorityQueue ,确保高优先级的元素优先被取出。 +- 多进程通信:用 multiprocessing.Queue,如果进程内部存在线程竞争,就需要加锁。 + +注意事项: +- 非线程安全的实现(如 deque,list)在多线程场景中需加锁 。 +- multiprocessing.Queue 利用了操作系统提供的进程间通信(IPC, Inter-Process Communication)机制,具体实现取决于不同操作系统的支持。 + +其它可能用到的相关工具: +Redis,作为中间件提供共享存储和跨进程协调机制,处理缓存、消息队列、分布式锁、分布式状态共享。 +Celery,用于管理异步任务或后台任务,一般用来做周期性任务或者长时间运行的任务。可用 Redis 做存储。 diff --git a/A 动机与模式/14 多计算单元/并发/4 抽象并发.py b/C 高性能编程/词频统计/1 抽象并发.py similarity index 100% rename from A 动机与模式/14 多计算单元/并发/4 抽象并发.py rename to C 高性能编程/词频统计/1 抽象并发.py diff --git a/C 高性能编程/词频统计/2 多线程.py b/C 高性能编程/词频统计/2 多线程.py new file mode 100644 index 0000000..db73c57 --- /dev/null +++ b/C 高性能编程/词频统计/2 多线程.py @@ -0,0 +1,60 @@ +import os +import threading +from queue import Queue +from collections import Counter +import re + +# 共享队列和词频统计器 +file_queue = Queue() +word_counter = Counter() +lock = threading.Lock() # 确保线程安全更新 Counter + +# 读取文件并分词的函数 +def process_file(): + while True: + try: + # 从队列获取文件名,非阻塞 + file_path = file_queue.get_nowait() + except: + break # 队列为空,退出 + try: + with open(file_path, 'r', encoding='utf-8') as f: + text = f.read().lower() + # 简单分词,移除标点 + words = re.findall(r'\b\w+\b', text) + # 线程安全更新词频 + with lock: + word_counter.update(words) + except Exception as e: + print(f"Error processing {file_path}: {e}") + finally: + file_queue.task_done() + +def main(): + # 获取 data 目录下所有 .txt 文件 + data_dir = 'data' + files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')] + + # 将文件路径放入队列 + for file_path in files: + file_queue.put(file_path) + + # 创建并启动多个线程 + num_threads = 4 # 可根据需要调整线程数 + threads = [] + for _ in range(num_threads): + t = threading.Thread(target=process_file) + t.start() + threads.append(t) + + # 等待所有线程完成 + for t in threads: + t.join() + + # 输出前 10 高频词 + print("Top 10 高频词:") + for word, count in word_counter.most_common(10): + print(f"{word}: {count}") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/C 高性能编程/词频统计/3 多进程.py b/C 高性能编程/词频统计/3 多进程.py new file mode 100644 index 0000000..c6d82a8 --- /dev/null +++ b/C 高性能编程/词频统计/3 多进程.py @@ -0,0 +1,45 @@ +import os +import re +from collections import Counter +from multiprocessing import Pool, Manager + +def process_file(file_path, shared_counter): + """处理单个文件,统计词频""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + text = f.read().lower() + # 简单分词,移除标点 + words = re.findall(r'\b\w+\b', text) + # 更新共享 Counter + shared_counter.update(words) + except Exception as e: + print(f"Error processing {file_path}: {e}") + +def main(): + # 获取 data 目录下所有 .txt 文件 + data_dir = 'data' + files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')] + + # 使用 Manager 创建共享 Counter + with Manager() as manager: + shared_counter = manager.dict(Counter()) + + # 创建进程池 + with Pool(processes=4) as pool: # 可调整进程数 + # 分发任务给进程池 + for file_path in files: + pool.apply_async(process_file, args=(file_path, shared_counter)) + # 关闭池并等待所有进程完成 + pool.close() + pool.join() + + # 转换为普通 Counter 以获取结果 + final_counter = Counter(dict(shared_counter)) + + # 输出前 10 高频词 + print("Top 10 高频词:") + for word, count in final_counter.most_common(10): + print(f"{word}: {count}") + +if __name__ == '__main__': + main() \ No newline at end of file