zj3D 1 month ago
parent 2398743ab9
commit f2729b414f

@ -1,9 +0,0 @@
注册
- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。
- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。
- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。
- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。
- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。

@ -1,6 +1,6 @@
################ 待整理
'''
注册者 = 观察者
你也可以把它看作订阅模式
每个组件提供注册消息接口和注册消息动作

@ -2,10 +2,7 @@
入门级示例是用来帮助理解其他例子
把观察者挂到自己的处理队列上
适当时机调用所有队列上的约定的观察者的 update 方法
如果观察者有多个职能参与不同的任务链不一定要统一命名update方法
这是一个示例性质的原型具体环境下需要调整
WordSubject 调用所有观察者的 update 方法
'''
import collections

@ -1,8 +1,7 @@
'''
本例的基本模式还是观察者
基类 Subject 提供注册和提醒注册上的对象提醒机制
每个派生组件做完自己的事情调用基类的 notify 方法
这样就可以实现一个简单的观察者模式也可以说是订阅模式
因为函数和参数混杂在一起传递使得各个模块的处理结构其实是 case by case
'''

@ -1,102 +0,0 @@
'''
后续组件挂载到前序组件后续链上
仅提供 self.next_observer 的抽象关系
后续组件接到指令和数据自己决定动作
理论上每个组件可以参与到多个生产队列
本例使用了类来封装消息相对于字符串理论上提供了更丰富的扩展可能
这是一个示例性质的原型具体环境下需要调整
'''
from collections import Counter
from typing import List, Dict
from cppy.cp_util import *
# 定义消息类型
class Message:
def __init__(self, data):
self.data = data
class TokenizedText(Message):
pass
class FilteredText(Message):
pass
class WordFrequency(Message):
pass
# 定义观察者接口
class Observer:
def notify(self, message: Message):
pass
# 切词订阅者
class TokenizerSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message.data, str):
return
tokenized_text = re_split(message.data)
self.next_observer.notify(TokenizedText(tokenized_text))
# 停用词订阅者
class StopWordsRemoverSubscriber(Observer):
def __init__(self, next_observer: Observer, stop_words: List[str]):
self.next_observer = next_observer
self.stop_words = set(stop_words)
def notify(self, message: Message):
if not isinstance(message, TokenizedText):
return
filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ]
self.next_observer.notify(FilteredText(filtered_text))
# 词频统计订阅者
class WordFrequencyCalculatorSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message, FilteredText):
return
word_freq = Counter(message.data)
self.next_observer.notify( WordFrequency(word_freq) )
# 输出前N个词订阅者
class TopNWordsDisplaySubscriber(Observer):
def __init__(self, n: int):
self.n = n
def notify(self, message: Message):
if not isinstance(message, WordFrequency):
return
print_word_freqs( message.data.most_common(self.n) )
# 模拟发布者
def publish_text(text: str, observers: List[Observer]):
for observer in observers:
observer.notify(Message(text))
# 主函数
def main():
text = read_file()
stop_words = get_stopwords()
# 创建订阅者链
display_subscriber = TopNWordsDisplaySubscriber( n=10 )
freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber)
stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words)
tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber)
# 发布文本
publish_text(text, [tokenizer_subscriber])
if __name__ == "__main__":
main()

@ -0,0 +1,2 @@
改造下适合跨进程系统的后台响应对象设计

@ -1,48 +0,0 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from multiprocessing.pool import ThreadPool
#
# 多线程
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
def thread_function(chunk, counts_list):
word_count = process_chunk(chunk)
counts_list.append(word_count)
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 线程池
pool = ThreadPool(len(chunks)) # 随意指定的线程数
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,42 +0,0 @@
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
#
# 多进程: 因为创建进程相比计算过程开销太大,结果最慢
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
@timing_decorator
def main():
# 读取文件内容,分割文件内容为多个块,每个块由一个进程处理
chunks = get_chunks(testfilepath,1000)
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,52 +0,0 @@
import threading, queue
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数
class WordFrequencyCounter:
def __init__(self, input_file):
self.word_space = queue.Queue()
self.freq_space = queue.Queue()
for chunk in get_chunks(input_file,3000):
self.word_space.put(chunk)
def process_words(self):
while not self.word_space.empty():
try:
chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据
except queue.Empty:
break # 队列为空,退出循环
# print(f"Worker thread ID: {threading.get_ident()}",len(chunk))
words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ]
word_freqs = Counter(words)
self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典
def run(self):
workers = [ threading.Thread(target=self.process_words) for _ in range(5)]
for worker in workers: worker.start()
for worker in workers: worker.join()
word_freqs = Counter() # 初始化一个空的Counter对象
while not self.freq_space.empty():
freqs = self.freq_space.get()
if freqs: # 确保freqs非空
word_freqs.update(freqs)
print_word_freqs ( sort_dict (word_freqs) )
@timing_decorator
def main():
counter = WordFrequencyCounter( testfilepath )
counter.run()
if __name__ == '__main__':
main()
'''
在多线程之间传递数据建议使用线程安全的队列如queue.Queue或multiprocessing.Queue后者也适用于多进程环境
这些队列提供了线程安全的数据传输机制可以避免竞态条件和数据损坏
'''

@ -1,49 +0,0 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
怎么得到最快的一个结果是一个试错过程X程创建数目多少分片的大小 ...
使用 multiprocessing.Manager 来完成统计词频
需要注意
- Manager() 必须用函数包起来,不能按脚本随便放外面否则会提示freeze_support
- 工作函数需要放到外面不能做内部函数否则会提示参数错误
- 无法在 Jupyter 类似环境运行
'''
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(chunk,word_count):
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
for word in words: # 非常化时间
word_count[word] = word_count.get(word, 0) + 1
# word_count.update( Counter(words) ) # 类型不起作用
@timing_decorator
def main():
manager = Manager()
word_count = manager.dict()
chunks = get_chunks(testfilepath,2800)
print('-------------------',len(chunks))
processes = []
for chunk in chunks:
p = Process(target=process_chunk,
args=(chunk,word_count) )
processes.append(p)
p.start()
for p in processes: p.join()
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -0,0 +1,18 @@
队列应用,涉及简单算法到多线程、多进程,以及分布式。
应用场景匹配的相应队列:
- 简单算法或小型任务list列表作为队列,append() 入队pop(0) 出队,简单易用。
- 单线程高性能:用 collections.deque效率最高。
- 异步编程: 用 asyncio.Queue
- 多线程安全:用 queue.Queue
- 多线程优先级任务安全:用 queue.PriorityQueue ,确保高优先级的元素优先被取出。
- 多进程通信:用 multiprocessing.Queue如果进程内部存在线程竞争就需要加锁。
注意事项:
- 非线程安全的实现(如 deque,list在多线程场景中需加锁 。
- multiprocessing.Queue 利用了操作系统提供的进程间通信IPC, Inter-Process Communication机制具体实现取决于不同操作系统的支持。
其它可能用到的相关工具:
Redis作为中间件提供共享存储和跨进程协调机制处理缓存、消息队列、分布式锁、分布式状态共享。
Celery用于管理异步任务或后台任务一般用来做周期性任务或者长时间运行的任务。可用 Redis 做存储。

@ -0,0 +1,60 @@
import os
import threading
from queue import Queue
from collections import Counter
import re
# 共享队列和词频统计器
file_queue = Queue()
word_counter = Counter()
lock = threading.Lock() # 确保线程安全更新 Counter
# 读取文件并分词的函数
def process_file():
while True:
try:
# 从队列获取文件名,非阻塞
file_path = file_queue.get_nowait()
except:
break # 队列为空,退出
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read().lower()
# 简单分词,移除标点
words = re.findall(r'\b\w+\b', text)
# 线程安全更新词频
with lock:
word_counter.update(words)
except Exception as e:
print(f"Error processing {file_path}: {e}")
finally:
file_queue.task_done()
def main():
# 获取 data 目录下所有 .txt 文件
data_dir = 'data'
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
# 将文件路径放入队列
for file_path in files:
file_queue.put(file_path)
# 创建并启动多个线程
num_threads = 4 # 可根据需要调整线程数
threads = []
for _ in range(num_threads):
t = threading.Thread(target=process_file)
t.start()
threads.append(t)
# 等待所有线程完成
for t in threads:
t.join()
# 输出前 10 高频词
print("Top 10 高频词:")
for word, count in word_counter.most_common(10):
print(f"{word}: {count}")
if __name__ == '__main__':
main()

@ -0,0 +1,45 @@
import os
import re
from collections import Counter
from multiprocessing import Pool, Manager
def process_file(file_path, shared_counter):
"""处理单个文件,统计词频"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read().lower()
# 简单分词,移除标点
words = re.findall(r'\b\w+\b', text)
# 更新共享 Counter
shared_counter.update(words)
except Exception as e:
print(f"Error processing {file_path}: {e}")
def main():
# 获取 data 目录下所有 .txt 文件
data_dir = 'data'
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
# 使用 Manager 创建共享 Counter
with Manager() as manager:
shared_counter = manager.dict(Counter())
# 创建进程池
with Pool(processes=4) as pool: # 可调整进程数
# 分发任务给进程池
for file_path in files:
pool.apply_async(process_file, args=(file_path, shared_counter))
# 关闭池并等待所有进程完成
pool.close()
pool.join()
# 转换为普通 Counter 以获取结果
final_counter = Counter(dict(shared_counter))
# 输出前 10 高频词
print("Top 10 高频词:")
for word, count in final_counter.most_common(10):
print(f"{word}: {count}")
if __name__ == '__main__':
main()
Loading…
Cancel
Save