zj3D 4 weeks ago
parent 464471282f
commit 27c8e450c9

@ -1,61 +1,105 @@
''' '''
入门级示例是用来帮助理解其他例子
把观察者挂到自己的处理队列上 把观察者挂到自己的处理队列上
WordSubject 调用所有观察者的 update 方法
''' '''
import collections import os
import re
import threading
from queue import Queue
from collections import Counter
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from cppy.cp_util import *
# 定义观察者接口 # 观察者接口
class Observer(ABC): class Observer(ABC):
@abstractmethod @abstractmethod
def update(self, word): def update(self, word_counts: Counter):
pass pass
# 定义具体观察者类,用于统计词频 # 具体观察者:打印前 10 高频词
class WordFrequencyObserver(Observer): class PrintTopWordsObserver(Observer):
def __init__(self): def update(self, word_counts: Counter):
self.word_count = collections.Counter() print("Top 10 高频词:")
for word, count in word_counts.most_common(10):
def update(self, word): print(f"{word}: {count}")
self.word_count[word] += 1
# 具体观察者:保存词频到文件
class SaveToFileObserver(Observer):
# 定义主题类 def __init__(self, output_file):
class WordSubject: self.output_file = output_file
def update(self, word_counts: Counter):
try:
with open(self.output_file, 'w', encoding='utf-8') as f:
for word, count in word_counts.most_common(10):
f.write(f"{word}: {count}\n")
print(f"词频已保存到 {self.output_file}")
except Exception as e:
print(f"保存失败: {e}")
# 词频统计器(主题)
class WordFrequencyCounter:
def __init__(self): def __init__(self):
self.observers = [] self.observers = []
self.counter = Counter()
self.queue = Queue()
self.lock = threading.Lock()
def attach(self, observer): def add_observer(self, observer: Observer):
self.observers.append(observer) self.observers.append(observer)
def notify(self, word): def remove_observer(self, observer: Observer):
for observer in self.observers: self.observers.remove(observer)
observer.update(word)
# 主函数 def notify_observers(self):
def main(testfilepath, top_n = 10 ): for observer in self.observers:
stopwords = get_stopwords() observer.update(self.counter)
subject = WordSubject()
def process_file(self):
# 创建一个观察者并附加到主题 while True:
observer = WordFrequencyObserver() try:
subject.attach(observer) file_path = self.queue.get_nowait()
except:
# 处理文件 break
wordlist = re_split( read_file(testfilepath) ) try:
for word in wordlist: with open(file_path, 'r', encoding='utf-8') as f:
if word not in stopwords: text = f.read().lower()
subject.notify(word) # 通知 words = re.findall(r'\b\w+\b', text)
with self.lock:
# 打印最高的N个词频 self.counter.update(words)
top_words = observer.word_count.most_common(top_n) except Exception as e:
print_word_freqs(top_words) print(f"Error processing {file_path}: {e}")
finally:
self.queue.task_done()
if __name__ == "__main__":
main( testfilepath ) def count_words(self, files, num_threads=4):
# 将文件路径放入队列
for file_path in files:
self.queue.put(file_path)
# 创建并启动线程
threads = [threading.Thread(target=self.process_file) for _ in range(num_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
# 通知所有观察者
self.notify_observers()
def main():
# 获取文件列表
data_dir = 'data'
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
# 创建词频统计器
counter = WordFrequencyCounter()
# 添加观察者
counter.add_observer(PrintTopWordsObserver())
counter.add_observer(SaveToFileObserver("word_frequency.txt"))
# 统计词频并通知观察者
counter.count_words(files)
if __name__ == '__main__':
main()

@ -1,14 +1,9 @@
''' '''
每个对象运行于独立线程每个对象只暴露一个队伍消息队列的接口 每个对象运行于独立线程每个对象只暴露一个队伍消息队列的接口
这种模式除了分布式也适合多线程的环境 各个组件完全不能互操作仅依靠队列发消息进行协作/
多线程模式下消息队列来共享数据空间相比较传统数据库共享方式访问更快
各个组件完全不能互操作仅依靠队列发消息进行协作
适合环节多数据可分块有IO-计算性能设计考量要求让各个模块自己适应调整
在某些情况下可以避免复杂的控制流设计使代码简洁
消息队列来共享数据空间相比较传统数据库共享方式访问更快 这种模式改造下适合跨进程系统的后台响应对象设计.
''' '''
from threading import Thread from threading import Thread

@ -1,101 +0,0 @@
################ 待整理
'''
每个对象只公开一个接受和发送消息的接口隐藏所有数据和函数
适合分布式系统的远程组件之间无法直接调用只能传递消息
应用场景针对各个组件的 notify 方法发指令来驱动所有工作
这是一个示例性质的原型具体分布式环境下需要调整
notify 用了四种写法是和本主题无关的测试
'''
from cppy.cp_util import *
from collections import defaultdict
badmsg = lambda : exec (''' raise Exception("Message not understood " , action ) ''')
class fff:
def __init__(self, d):
self._data = defaultdict( badmsg )
self._data.update(d)
def __getitem__(self, key):
return self._data[key]
class DataStorageMod():
def __init__(self):
self._data = []
def notify(self, action, *args):
return {
'init': lambda : self._init,
'words': lambda : self._words
}.get( action , badmsg )()(*args)
def _init(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
class StopWordMod():
_stop_words = []
def notify(self, action, *args):
return { 'init': self._init,
'is_stop_word': self._is_stop_word
}[ action ](*args)
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, wordx):
return wordx in self._stop_words
class WordFrequencyMod():
_word_freqs = {}
def notify(self, action, *args):
return fff( {
'increment_count': lambda : self._increment_count,
'sorted': lambda : self._sorted
})[ action ]()(*args)
def _increment_count(self, word):
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sort_dict(self._word_freqs)
class ScenarioManager():
def notify(self, action, *args):
if action == 'init':
return self._init( *args)
elif action == 'run':
return self._run()
else:
raise Exception("Message not understood " + action )
def _init(self, path_to_file):
self._storage_manager = DataStorageMod()
self._stop_word_manager = StopWordMod()
self._word_freq_manager = WordFrequencyMod()
self._storage_manager.notify('init', path_to_file)
self._stop_word_manager.notify('init')
def _run(self):
for word in self._storage_manager.notify('words'):
if not self._stop_word_manager.notify('is_stop_word', word):
self._word_freq_manager.notify('increment_count', word )
word_freqs = self._word_freq_manager.notify('sorted')
print_word_freqs(word_freqs)
if __name__ == '__main__':
sm = ScenarioManager()
sm.notify('init', testfilepath)
sm.notify('run')

@ -1,2 +0,0 @@
改造下适合跨进程系统的后台响应对象设计
Loading…
Cancel
Save