Compare commits

..

47 Commits
dev ... dev

Author SHA1 Message Date
zj3D 74b622ee04 526-7
1 month ago
zj3D 27c8e450c9 525-23
1 month ago
zj3D 464471282f 525-22
1 month ago
zj3D ca6f8d97ee 99
1 month ago
zj3D a8200ff225 525-22
1 month ago
zj3D 56d6c04878 0525-18
1 month ago
zj3D 7d03c9e5f0 524-20
1 month ago
zj3D 068c0f9bb8 524-3
1 month ago
zj3D 3c919aa5b6 524-2
1 month ago
zj3D f2729b414f 0524_1
1 month ago
zj3D 2398743ab9 532-18
1 month ago
zj3D 6b1604413a 0523-1
1 month ago
zj3D c29f01ece2 052123
1 month ago
zj3D cc841ffaae 0521-4
1 month ago
zj3D a270414ff0 0521-3
1 month ago
zj3D 8e7c4a3117 521_2
1 month ago
zj3D e7f73ef2f4 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
1 month ago
zj3D d727f0cba2 0521
1 month ago
p46318075 dd11216128 Update readme.MD
1 month ago
zj3D 9845e7f38c g04
1 month ago
zj3D 29c7d4a635 g02
1 month ago
zj3D aaba98303b grok01
1 month ago
zj3D 29f26846a0 426
2 months ago
zj3D 3f03b9014d 05
4 months ago
zj3D d782d65fb5 Remove .gitignore file from Git tracking
4 months ago
zj3D 5e83716c4d 04
4 months ago
zj3D d339ef454b 03
4 months ago
zj3D 654dabbb69 02
4 months ago
zj3D 59ea2b479b 01
4 months ago
zj3D 0c531354ce BC01
4 months ago
zj3D dc44e0be4b 04
4 months ago
zj3D 3ea3cbaa23 031
4 months ago
zj3D 0eb30e470f 03
4 months ago
zj3D 4abad6851f 02
4 months ago
zj3D 129f1aaa3f 01
4 months ago
zj3D 617465cec6 07
4 months ago
zj3D 35e58525f1 06
4 months ago
zj3D e5b9607dce 05
4 months ago
zj3D 49bf182cf8 04
4 months ago
zj3D ea53899bbd 04
4 months ago
zj3D 0606fc586c 03
4 months ago
zj3D b77d297f3e 02
4 months ago
zj3D 1712e964cf 02
4 months ago
zj3D 8b9e813ee2 2501
4 months ago
zj3D 7365ebb312 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
4 months ago
zj3D 524a65e492 ABC
4 months ago
p46318075 94800c4b9e Update readme.MD
5 months ago

4
.gitignore vendored

@ -1,4 +0,0 @@
log.txt
/test
/.venv
__pycache__

@ -1,34 +0,0 @@
from collections import Counter
import cppy.cp_util as util
def word_frequency( top_n=10 ):
def decorator(func):
def wrapper(*args, **kwargs):
# 执行被装饰的函数
result = func(*args, **kwargs)
# 初始化词频计数器
word_counts = Counter()
# 分词并计数
for word in util.extract_str_words(result):
word_counts[word] += 1
# 输出所有词的频率最高的n个词
most_common = word_counts.most_common(top_n)
util.print_word_freqs( most_common )
return result
return wrapper
return decorator
# 使用装饰器
@word_frequency( top_n=10 )
def read_file(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
read_file( util.testfilepath )

@ -1,56 +0,0 @@
import threading, queue
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数
class WordFrequencyCounter:
def __init__(self, input_file):
self.word_space = queue.Queue()
self.freq_space = queue.Queue()
for chunk in get_chunks(input_file,3000):
self.word_space.put(chunk)
def process_words(self):
while not self.word_space.empty():
try:
chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据
except queue.Empty:
break # 队列为空,退出循环
# print(f"Worker thread ID: {threading.get_ident()}",len(chunk))
words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ]
word_freqs = Counter(words)
self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典
def run(self):
workers = [ threading.Thread(target=self.process_words) for _ in range(5)]
for worker in workers: worker.start()
for worker in workers: worker.join()
word_freqs = Counter() # 初始化一个空的Counter对象
while not self.freq_space.empty():
freqs = self.freq_space.get()
if freqs: # 确保freqs非空
word_freqs.update(freqs)
print_word_freqs ( sort_dict (word_freqs) )
@timing_decorator
def main():
counter = WordFrequencyCounter( testfilepath )
counter.run()
if __name__ == '__main__':
main()
'''
在多线程之间传递数据建议使用线程安全的队列如queue.Queue或multiprocessing.Queue后者也适用于多进程环境
这些队列提供了线程安全的数据传输机制可以避免竞态条件和数据损坏
全局变量不可预测
multiprocessing.Queue 利用了操作系统提供的进程间通信IPC, Inter-Process Communication机制具体实现取决于不同操作系统的支持
在Unix/Linux系统中multiprocessing.Queue通常基于管道pipes共享内存和/或消息队列等机制实现
而在Windows系统上可能使用命名管道named pipes或者内存映射文件memory-mapped files以及某些版本的Windows特有的进程间同步对象如MutexesSemaphores和事件
'''

@ -1,62 +0,0 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
使用 multiprocessing.Manager 来完成统计词频
需要注意
- Manager() 必须用函数包起来,不能按脚本随便放外面否则会提示freeze_support
- 工作函数需要放到外面不能做内部函数否则会提示参数错误
- 无法在 Jupyter 类似环境运行
'''
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(shared_chunks,word_count):
while True:
try:
chunk = shared_chunks.pop(0) # 从共享列表中取出一个数据块
if chunk is None: break # 如果取出的是None表示所有数据块已处理完毕
words = extract_str_words(chunk)
for word in words:
if word not in stop_words:
word_count[word] = word_count.get(word, 0) + 1
except Exception as e:
print(e)
break
@timing_decorator
def main():
# 创建一个Manager实例
manager = Manager()
shared_chunks = manager.list()
word_count = manager.dict()
# 读取文件并按块大小分割,将块添加到共享列表中
chunk_size = 1024 * 10 # 假设每个块是10KB可以根据需要调整
with open(testfilepath, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk: break
shared_chunks.append(chunk)
shared_chunks.append(None)
print('-------------------',len(shared_chunks))
processes = [ Process( target=process_chunk,
args=(shared_chunks,word_count))
for _ in range( 4 ) ] # 假设启动4个工作进程
for p in processes: p.start()
for p in processes: p.join()
# 将Manager类型的字典转换为普通的字典以便使用Counter
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -1,42 +0,0 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
'''
# 使用 multiprocessing.Manager 来完成统计词频
# 怎么得到最快的一个结果是一个试错过程X程创建数目多少、分片的大小 ...
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(chunk,word_count):
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
for word in words: # 非常化时间
word_count[word] = word_count.get(word, 0) + 1
# word_count.update( Counter(words) ) # 类型不起作用
@timing_decorator
def main():
manager = Manager()
word_count = manager.dict()
chunks = get_chunks(testfilepath,2800)
print('-------------------',len(chunks))
processes = []
for chunk in chunks:
p = Process(target=process_chunk,
args=(chunk,word_count) )
processes.append(p)
p.start()
for p in processes: p.join()
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -1,48 +0,0 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from multiprocessing.pool import ThreadPool
#
# 多线程
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
def thread_function(chunk, counts_list):
word_count = process_chunk(chunk)
counts_list.append(word_count)
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 线程池
pool = ThreadPool(len(chunks)) # 随意指定的线程数
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,42 +0,0 @@
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
#
# 多进程: 因为创建进程相比计算过程开销太大,结果最慢
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
@timing_decorator
def main():
# 读取文件内容,分割文件内容为多个块,每个块由一个进程处理
chunks = get_chunks(testfilepath,1000)
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,2 +0,0 @@
" my Some sure acquaintance or other, my dear, sure,other I suppose; I am sure I do not
know. sure "

@ -1,64 +0,0 @@
'''
入门级示例是用来帮助理解其他例子
把观察者挂到自己的处理队列上
适当时机调用所有队列上的约定的观察者的 update 方法
如果观察者有多个职能参与不同的任务链不一定要统一命名update方法
这是一个示例性质的原型具体环境下需要调整
'''
import collections
from abc import ABC, abstractmethod
from cppy.cp_util import *
# 定义观察者接口 ,在 Pyhon中并不是必须
class Observer(ABC):
@abstractmethod
def update(self, word):
pass
# 定义具体观察者类,用于统计词频
class WordFrequencyObserver(Observer):
def __init__(self):
self.word_count = collections.Counter()
def update(self, word):
self.word_count[word] += 1
# 定义主题类
class WordSubject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def notify(self, word):
for observer in self.observers:
observer.update(word)
# 主函数
def main(testfilepath, top_n = 10 ):
stopwords = get_stopwords()
subject = WordSubject()
# 创建一个观察者并附加到主题
observer = WordFrequencyObserver()
subject.attach(observer)
# 处理文件
wordlist = re_split( read_file(testfilepath) )
for word in wordlist:
if word not in stopwords:
subject.notify(word) # 通知
# 打印最高的N个词频
top_words = observer.word_count.most_common(top_n)
print_word_freqs(top_words)
if __name__ == "__main__":
main( testfilepath )

@ -1,69 +0,0 @@
'''
本例的基本模式还是观察者
基类 Subject 提供注册和提醒注册上的对象提醒机制
因为函数和参数混杂在一起传递使得各个模块的处理结构其实是 case by case
'''
from collections import Counter
from typing import List
from cppy.cp_util import *
class Subject:
def register_handler(self, handler: callable, *args, **kwargs):
self.handler = handler
self.args = args
self.kwargs = kwargs
def notify(self, *args, **kwargs):
self.handler( self.data, *self.args, **self.kwargs)
# 组件一TextLoader - 负责读取文本并过滤停用词
class TextLoader(Subject):
def load_text(self, filename: str) -> List[str]:
return extract_file_words(filename)
def notify(self, *args, **kwargs):
filename = args[0]
self.data = self.load_text(filename)
super().notify(self.data, *args, **kwargs)
# 组件二WordCounter - 计算词频
class WordCounter(Subject):
def count_words(self, words: List[str]) -> dict:
return Counter(words)
def notify(self, *args, **kwargs ):
words = args[0]
self.data = self.count_words(words)
super().notify(self.data, *args, **kwargs)
# 组件三TopWordsPresenter - 排序并输出前10个词
class TopWordsPresenter(Subject):
def notify(self, words,*args, **kwargs):
n = args[0]
top_words = words.most_common(n)
print_word_freqs( top_words )
# 主程序逻辑
def main():
loader = TextLoader()
counter = WordCounter()
presenter = TopWordsPresenter()
# 注册事件处理器
loader.register_handler(counter.notify)
counter.register_handler( presenter.notify,10 )
# 触发加载文本并开始流程
loader.notify(testfilepath)
if __name__ == "__main__":
main()

@ -1,86 +0,0 @@
################ 待整理
'''
注册者 = 观察者
每个组件提供注册消息接口和注册消息动作
在其它单元上注册自己对于特定事件消息的响应函数
同时负责自己的注册队列的序贯调用
Python 中有一个Callable类型可以用来判断是否是可以回调类型
from typing import Callable
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
from cppy.cp_util import *
#
# event_manager
#
class EventManager:
def __init__(self):
self.load_handlers = [] # 用于加载文件的事件处理器
self.process_handlers = [] # 用于处理数据的事件处理器
self.end_handlers = [] # 用于结束流程的事件处理器
def register_load_event(self, handler):
self.load_handlers.append(handler)
def register_process_event(self, handler):
self.process_handlers.append(handler)
def register_end_event(self, handler):
self.end_handlers.append(handler)
# 运行框架,按顺序执行注册的事件处理器
def run(self, file_path):
for handler in self.load_handlers: handler(file_path)
for handler in self.process_handlers: handler()
for handler in self.end_handlers: handler()
#
# 功能组件
#
# 定义数据存储类,用于模拟文件内容的加载和处理
class TextData:
_word_event_handlers = []
def __init__( self, event_manager ):
self._stop_words = get_stopwords()
event_manager.register_load_event(self.__load)
event_manager.register_process_event(self.__process_words)
def __load(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def __process_words(self):
for word in self._data:
if word not in self._stop_words:
for handler in self._word_event_handlers:
handler(word)
def register_word_event(self, handler):
self._word_event_handlers.append(handler)
class WordFrequencyCounter:
def __init__(self, event_manager, data_storage):
self._word_freqs = defaultdict(int) # 存储单词频率
data_storage.register_word_event(self.__increment_count) # 注册单词事件
event_manager.register_end_event(self.__print_freqs) # 注册结束事件
def __increment_count(self, word):
self._word_freqs[word] += 1
def __print_freqs(self):
print_word_freqs ( sort_dict (self._word_freqs) )
if __name__ == '__main__':
em = EventManager()
data_storage = TextData(em)
word_freq_counter = WordFrequencyCounter(em, data_storage)
em.run(testfilepath)

@ -1,107 +0,0 @@
################ 待整理
from cppy.cp_util import *
'''
订阅者 = 注册者 = 观察者
注册回调的一个变体
要点是中心化统一化
为了简化消息订阅可能形成的复杂性
提供一个中心消息管理器统一负责消息的订阅和回调
各个功能组件只是完成自己的功能
在中心管理器上订阅消息挂到自己响应的处理函数上
总结相比较的改变
- 注册的时候通过提供一个类型字段标识不同消息
- 其它实体不做注册和做回调统一这两个功能到一个中心单元
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
#################################################
# Event Manager
#################################################
class EventManager:
def __init__(self):
self._subs = defaultdict(list)
def subscribe(self, event_type, handler):
self._subs[event_type].append(handler)
def publish(self, event):
event_type = event[0]
for handle in self._subs.get(event_type, []):
handle(event)
#################################################
# Application Entities
#################################################
class DataStorage:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self._load)
self._event_manager.subscribe('start', self.produce_words)
def _load(self, event):
self._data = extract_file_words( event[1] )
def produce_words(self, _):
for word in self._data:
self._event_manager.publish(('word', word ))
self._event_manager.publish(('eof', None))
class StopWordFilter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load_stop_words)
self._event_manager.subscribe('word', self.filter_word)
self._stop_words = set()
def load_stop_words(self, _ ):
self._stop_words = set( get_stopwords() )
def filter_word(self, event):
word = event[1]
if word not in self._stop_words:
self._event_manager.publish(('valid_word', word))
class WordFrequencyCounter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('valid_word', self.count_word)
self._event_manager.subscribe('print', self.print_freqs)
self._word_freqs = {}
def count_word(self, event):
word = event[1]
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def print_freqs(self, _ ):
print_word_freqs ( sort_dict (self._word_freqs) )
class WordFrequencyApp:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('run', self.start_application)
self._event_manager.subscribe('eof', self.stop_application)
def start_application(self, event):
path_to_file = event[1]
self._event_manager.publish(('load', path_to_file))
self._event_manager.publish(('start', ))
def stop_application(self, _ ):
self._event_manager.publish(('print', ))
def main():
event_manager = EventManager()
DataStorage( event_manager )
StopWordFilter( event_manager )
WordFrequencyCounter( event_manager )
WordFrequencyApp( event_manager )
event_manager.publish(('run', testfilepath ))
if __name__ == "__main__":
main()

@ -1,9 +0,0 @@
注册
- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。
- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。
- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。
- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。
- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。

@ -1,98 +0,0 @@
################ 待整理
'''
应用场景针对各个组件的 notify 方法发指令来驱动所有工作
这是一个示例性质的原型具体分布式环境下需要调整
notify 用了四种写法是和本主题无关的测试
'''
from cppy.cp_util import *
from collections import defaultdict
badmsg = lambda : exec (''' raise Exception("Message not understood " , action ) ''')
class fff:
def __init__(self, d):
self._data = defaultdict( badmsg )
self._data.update(d)
def __getitem__(self, key):
return self._data[key]
class DataStorageMod():
def __init__(self):
self._data = []
def notify(self, action, *args):
return {
'init': lambda : self._init,
'words': lambda : self._words
}.get( action , badmsg )()(*args)
def _init(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
class StopWordMod():
_stop_words = []
def notify(self, action, *args):
return { 'init': self._init,
'is_stop_word': self._is_stop_word
}[ action ](*args)
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, wordx):
return wordx in self._stop_words
class WordFrequencyMod():
_word_freqs = {}
def notify(self, action, *args):
return fff( {
'increment_count': lambda : self._increment_count,
'sorted': lambda : self._sorted
})[ action ]()(*args)
def _increment_count(self, word):
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sort_dict(self._word_freqs)
class ScenarioManager():
def notify(self, action, *args):
if action == 'init':
return self._init( *args)
elif action == 'run':
return self._run()
else:
raise Exception("Message not understood " + action )
def _init(self, path_to_file):
self._storage_manager = DataStorageMod()
self._stop_word_manager = StopWordMod()
self._word_freq_manager = WordFrequencyMod()
self._storage_manager.notify('init', path_to_file)
self._stop_word_manager.notify('init')
def _run(self):
for word in self._storage_manager.notify('words'):
if not self._stop_word_manager.notify('is_stop_word', word):
self._word_freq_manager.notify('increment_count', word )
word_freqs = self._word_freq_manager.notify('sorted')
print_word_freqs(word_freqs)
if __name__ == '__main__':
sm = ScenarioManager()
sm.notify('init', testfilepath)
sm.notify('run')

@ -1,102 +0,0 @@
'''
后续组件挂载到前序组件后续链上
仅提供 self.next_observer 的抽象关系
后续组件接到指令和数据自己决定动作
理论上每个组件可以参与到多个生产队列
本例使用了类来封装消息相对于字符串理论上提供了更丰富的扩展可能
这是一个示例性质的原型具体环境下需要调整
'''
from collections import Counter
from typing import List, Dict
from cppy.cp_util import *
# 定义消息类型
class Message:
def __init__(self, data):
self.data = data
class TokenizedText(Message):
pass
class FilteredText(Message):
pass
class WordFrequency(Message):
pass
# 定义观察者接口
class Observer:
def notify(self, message: Message):
pass
# 切词订阅者
class TokenizerSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message.data, str):
return
tokenized_text = re_split(message.data)
self.next_observer.notify(TokenizedText(tokenized_text))
# 停用词订阅者
class StopWordsRemoverSubscriber(Observer):
def __init__(self, next_observer: Observer, stop_words: List[str]):
self.next_observer = next_observer
self.stop_words = set(stop_words)
def notify(self, message: Message):
if not isinstance(message, TokenizedText):
return
filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ]
self.next_observer.notify(FilteredText(filtered_text))
# 词频统计订阅者
class WordFrequencyCalculatorSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message, FilteredText):
return
word_freq = Counter(message.data)
self.next_observer.notify( WordFrequency(word_freq) )
# 输出前N个词订阅者
class TopNWordsDisplaySubscriber(Observer):
def __init__(self, n: int):
self.n = n
def notify(self, message: Message):
if not isinstance(message, WordFrequency):
return
print_word_freqs( message.data.most_common(self.n) )
# 模拟发布者
def publish_text(text: str, observers: List[Observer]):
for observer in observers:
observer.notify(Message(text))
# 主函数
def main():
text = read_file()
stop_words = get_stopwords()
# 创建订阅者链
display_subscriber = TopNWordsDisplaySubscriber( n=10 )
freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber)
stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words)
tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber)
# 发布文本
publish_text(text, [tokenizer_subscriber])
if __name__ == "__main__":
main()

@ -1,4 +0,0 @@
## 任务
本项目的主要功能任务:做文本文件的分词,过滤常见词,求词频,并排序输出。

@ -1,4 +1,5 @@
# 引入停用词表和测试文件的路径
# -*- coding: utf-8 -*-
from cppy.cp_util import stopwordfilepath, testfilepath
# 准备停用词表
@ -48,3 +49,10 @@ for i in range(n):
# 打印频率最高的前10个词
for tf in word_freqs[:10]:
print(tf[0], '-', tf[1])
'''
想到哪里写到哪里只会Python语言最基础的语法C语言编程风格
所有逻辑所有细节一盘大棋摆在面前
这样的代码,很难维护后来的阅读者会进入一个一望无际的泥塘.
'''

@ -23,8 +23,8 @@ with open(testfilepath, encoding='utf8') as f:
# 打印前10个最常见的单词
for word, freq in word_freqs.most_common(10):
print(f"{word}-{freq}")
'''
相比 A01
使用collections.Counter来计数单词频率从而简化了代码并提高了效率
使用enumerate来获取行号和行内容使用set来存储停用词都有助于提高代码的性能和可读性
使用most_common方法来获取最常见的单词使输出更为简洁

@ -8,7 +8,9 @@ words = re.findall('[a-z]{2,}',
counts = collections.Counter(w for w in words if w not in stopwords)
for (w, c) in counts.most_common(10):
print(w, '-', c)
'''
熟练的软件工程师会如此简单完成任务
后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法
我们假设是要解决一个相对复杂的问题
'''

@ -1,3 +1,6 @@
'''
给代码片段命名打包成函数用函数名概括问题求解的过程方便阅读理解
'''
import string
from collections import Counter
from cppy.cp_util import *

@ -1,7 +1,13 @@
'''
消除全局变量限定函数只有输入输出来和外界打交道
如果消除无用的中间变量那么就是典型的数据处理流水线风格:我要在一个函数中完成某个操作然后调用下一个函数进行处理
流水线风格阅读方便提供栈性能优化的可能
如果需要加参数那么需要柯里化方法即Python里面的闭包函数结构
'''
import re
from cppy.cp_util import *
def extractwords(str_data):
pattern = re.compile('[\W_]+')
word_list = pattern.sub(' ', str_data).lower().split()

@ -1,3 +1,8 @@
'''
把一些公共函数分离出去保留中间变量来增加代码可读性
jupyter notebook 做分析常常如此
'''
import re
from collections import Counter
from cppy.cp_util import *

@ -1,3 +1,5 @@
''' 递归是基础编程思维产生的代码结构 。注意递归深度限制只能解决小规模问题 '''
from cppy.cp_util import *
from collections import Counter

@ -1,3 +1,10 @@
'''
对象化的视角看待问题是抽象出问题中的实体
实体有方法属性程序执行过程就是调动这些实体的交互行为
作为彻底的面向对象的风格主逻辑常常也写做一个类init 产生各个工具run 运行逻辑
本例中实体可理解为几个工具
'''
from collections import Counter
from cppy.cp_util import *

@ -1,5 +1,11 @@
from cppy.cp_util import *
'''
如果认为基于数据和函数的封装就是对象化编程那么类并不是必要的扩展的函数字典也有这个能力
函数利用属性存数据字典有的value是函数有的是value是数据
字典进行对象化程序设计只是缺乏权限控制机制而继承方面可以是通过一个代理模式包装实现
简单使用字典也能满足对象的一些应用场景而且更省资源
'''
def extract_words(obj, path_to_file):
"""
@ -49,4 +55,4 @@ if __name__ == '__main__':
# 获取排序后的单词频率并打印
word_freqs = word_freqs_obj['sorted']()
print_word_freqs(word_freqs)
print_word_freqs(word_freqs)

@ -1,9 +1,13 @@
import abc, re
from cppy.cp_util import *
#
# 接口
#
'''
如果在基类层面上进行设计这样就能获得多个子类同一层面上的抽象操作
在一些语言里面这种模式叫基于接口的编程接口要求纯抽象类
Python 因为对象协议的缘故所以抽象接口设计不是必须
沿用抽象接口设计是为工程性规整
'''
class IDataStorage (metaclass=abc.ABCMeta):
@abc.abstractmethod
def words(self):
@ -74,8 +78,7 @@ IWordFrequencyCounter.register(subclass=WordFrequencyManager)
# 应用类
#
class WordFrequencyController:
def __init__(self, path_to_file):
# self._storage = DataStorageManager1(path_to_file)
def __init__(self, path_to_file):
self.storage = DataStorageManager2(path_to_file)
self.stop_word_manager = StopWordManager()
self.word_freq_counter = WordFrequencyManager()

@ -3,6 +3,9 @@ from dataclasses import dataclass
from collections import Counter
import re
# 对象属性是现代 Python编程喜欢的风格
# 这里使用了dataclass来简化代码
@dataclass
class WordFrequency:
text: str

@ -1,7 +1,7 @@
from cppy.cp_util import *
#
# 生成器
# 生成器 是一种简单异步实现
#
def non_stop_words(testfilepath):
stopwords = get_stopwords()

@ -3,10 +3,19 @@ import aiofiles
from collections import Counter
from cppy.cp_util import *
'''
异步指程序运行期间具有挂起然后等待唤醒继续执行的能力
典型应用场景如数据
源源不断产生,比如网络监听
太大,可能大于内存
有连续处理动作当某一环得到想要结果立刻终止后续数据不需要处理
本例读文件的Io还是太快的爬虫
async 是当前版本 Python 官方异步编程的标准库
aiofiles 是一个第三方库提供异步文件操作的功能
'''
#
# 协程: 有点复杂; 读文件的Io还是太快的爬虫
#
async def read_file(file_path):
async with aiofiles.open(file_path, 'r', encoding='utf-8') as file:
content = await file.read()

@ -1,5 +1,7 @@
from cppy.cp_util import *
# 内省Introspection在编程中是指程序在运行时检查对象变量类型或属性的能力。
# 这是一种动态地了解对象的方法,非常有利于调试、优化代码、实现泛型编程等场景。
class Calculator:
def frequencies(self,word_list):

@ -1,3 +1,8 @@
'''
反射是一种动态地访问或修改对象状态和行为的能力包括其属性方法等
有被 Hacker 攻击的风险
'''
import cppy.cp_util as util
import operator

@ -31,3 +31,6 @@ def main():
if __name__ == '__main__':
main()
'''
简单的利用函数式风格
'''

@ -0,0 +1 @@
尾调用方式调用函数做连续的数据处理。代码好读方便栈资源优化一些语言已经实现了这种调用方式释放前面的函数栈但Python当前还没做如此优化

@ -0,0 +1 @@
异常主要发生在参数传递和代码块执行过程。一种原则是:软件不能挂掉。检查参数合理性、检查代码块执行可能的错误,并进行合理结果补齐,保持程序继续运行【 1 软件不能挂掉 】,另外一种情况是发生异常就抛出然后终止程序【 2 时间停止在那一刻 】,或者由上层函数接住,集中统一处理。【 3 预判可能的错误 】。

@ -1,7 +1,9 @@
from collections import Counter
from cppy.cp_util import *
#
# 遇到异常,退出程序
#
def extract_words(path_to_file):
assert(type(path_to_file) is str), "Must be a string!"
assert(path_to_file), "Must be a non-empty string!"

@ -2,7 +2,7 @@ import re, operator, string
from cppy.cp_util import *
#
# The functions
# 遇到异常,给出一个能让程序继续运行下去的默认值
#
def extract_words(path_to_file):
try:

@ -1,6 +1,8 @@
from cppy.cp_util import *
#
# 用断言从事发点给出出错的准确信息
#
def extractWords(path_to_file):
assert(type(path_to_file) is str), "Must be a string"
assert(path_to_file), "Must be a non-empty string"

@ -1,7 +1,11 @@
from collections import Counter
from cppy.cp_util import *
# 缓存系统,使用字典来存储文件路径和对应的词频结果; 可以存到本地磁盘,就可重复利用计算结果
'''
如果函数设计干净没有修改全局变量那么相同的调用参数就会得到必然相同的结果
缓存把每次计算输入输出存起来如果后面有相同的任务调用参数相同就不用重复计算了
'''
word_freq_cache = {}
def calculate_word_frequency(file_path):

@ -1,4 +1,6 @@
# 创建对象是消耗资源的,如果发现对象已经存在,可以返回引用,不创造新对象 。设计模式中这个做法叫享元
# 可以降低资源需求和提升响应速度。更常见的该模式使用场景是各种资源池。
from cppy.cp_util import *
#享元类

@ -0,0 +1,64 @@
'''
模板方法模式 (Template Method Pattern)
适用场景定义词频统计的整体流程骨架允许子类自定义某些步骤如分词或输出格式
如何应用
定义一个抽象类 WordFrequencyAnalyzer包含固定的流程读取文件 -> 分词 -> 统计 -> 输出
具体步骤如分词或输出由子类实现
优点保证流程一致子类只关注具体实现
知识点
理解解继承与抽象类的使用
如何定义固定流程允许子类灵活实现强调流程与实现的分离
'''
from abc import ABC, abstractmethod
from typing import List, Dict
from collections import Counter
class WordFrequencyAnalyzer(ABC):
def analyze(self, directory: str, top_n: int = 10):
texts = self.read_texts(directory)
word_counts = self.count_words(texts)
self.print_results(word_counts, top_n)
@abstractmethod
def read_texts(self, directory: str) -> List[str]:
pass
@abstractmethod
def count_words(self, texts: List[str]) -> Dict[str, int]:
pass
@abstractmethod
def print_results(self, word_counts: Dict[str, int], top_n: int):
pass
class SimpleAnalyzer(WordFrequencyAnalyzer):
def read_texts(self, directory: str) -> List[str]:
import os
texts = []
for filename in os.listdir(directory):
if filename.endswith(".txt"):
with open(os.path.join(directory, filename), "r", encoding="utf-8") as f:
texts.append(f.read())
return texts
def count_words(self, texts: List[str]) -> Dict[str, int]:
import jieba
word_counts = Counter()
for text in texts:
words = jieba.cut(text)
word_counts.update([word for word in words if word.strip()])
return dict(word_counts)
def print_results(self, word_counts: Dict[str, int], top_n: int):
top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
for word, count in top_words:
print(f"{word}: {count}")
# 使用
analyzer = SimpleAnalyzer()
analyzer.analyze("data", 10)

@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
'''
设计说明
策略模式定义 Tokenizer 接口允许不同分词策略如中文分词英文分词
工厂模式TokenizerFactory 创建分词器隔离具体实现
解耦数据读取DataReader词频统计WordCounter结果输出ResultPrinter分离各自独立
可扩展支持添加新分词策略或输出格式
解耦
DataReader 只负责读取文件与分词和统计无关
WordCounter 依赖抽象的 Tokenizer不关心具体分词实现
ResultPrinter 只处理输出格式可扩展如输出到文件
策略模式
Tokenizer 接口允许切换分词策略如添加英文分词器
ChineseTokenizer 是具体实现易于替换
工厂模式
TokenizerFactory 封装分词器创建逻辑调用方无需知道具体类
简洁性代码结构清晰模块职责单一
扩展性
可添加新分词器如英文 EnglishTokenizer
可扩展 ResultPrinter 支持不同输出格式
实践可尝试添加英文分词器或新输出格式 CSV
'''
import os
import jieba
from collections import Counter
from abc import ABC, abstractmethod
from typing import List, Dict
# 策略接口:分词器
class Tokenizer(ABC):
@abstractmethod
def tokenize(self, text: str) -> List[str]:
pass
# 具体策略:中文分词(使用 jieba
class ChineseTokenizer(Tokenizer):
def tokenize(self, text: str) -> List[str]:
return [word for word in jieba.cut(text) if word.strip()]
# 工厂:创建分词器
class TokenizerFactory:
@staticmethod
def create_tokenizer(language: str = "chinese") -> Tokenizer:
if language == "chinese":
return ChineseTokenizer()
raise ValueError(f"Unsupported language: {language}")
# 数据读取模块
class DataReader:
def __init__(self, directory: str):
self.directory = directory
def read_texts(self) -> List[str]:
texts = []
for filename in os.listdir(self.directory):
if filename.endswith(".txt"):
with open(os.path.join(self.directory, filename), "r", encoding="utf-8") as f:
texts.append(f.read())
return texts
# 词频统计模块
class WordCounter:
def __init__(self, tokenizer: Tokenizer):
self.tokenizer = tokenizer
def count_words(self, texts: List[str]) -> Dict[str, int]:
word_counts = Counter()
for text in texts:
words = self.tokenizer.tokenize(text)
word_counts.update(words)
return dict(word_counts)
# 结果输出模块
class ResultPrinter:
@staticmethod
def print_top_words(word_counts: Dict[str, int], top_n: int = 10):
top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
for word, count in top_words:
print(f"{word}: {count}")
# 主程序
def main():
# 配置
data_dir = "data"
top_n = 10
# 创建组件
reader = DataReader(data_dir)
tokenizer = TokenizerFactory.create_tokenizer("chinese")
counter = WordCounter(tokenizer)
printer = ResultPrinter()
# 执行流程
texts = reader.read_texts()
word_counts = counter.count_words(texts)
printer.print_top_words(word_counts, top_n)
if __name__ == "__main__":
main()

@ -0,0 +1,105 @@
'''
把观察者挂到自己的处理队列上
'''
import os
import re
import threading
from queue import Queue
from collections import Counter
from abc import ABC, abstractmethod
# 观察者接口
class Observer(ABC):
@abstractmethod
def update(self, word_counts: Counter):
pass
# 具体观察者:打印前 10 高频词
class PrintTopWordsObserver(Observer):
def update(self, word_counts: Counter):
print("Top 10 高频词:")
for word, count in word_counts.most_common(10):
print(f"{word}: {count}")
# 具体观察者:保存词频到文件
class SaveToFileObserver(Observer):
def __init__(self, output_file):
self.output_file = output_file
def update(self, word_counts: Counter):
try:
with open(self.output_file, 'w', encoding='utf-8') as f:
for word, count in word_counts.most_common(10):
f.write(f"{word}: {count}\n")
print(f"词频已保存到 {self.output_file}")
except Exception as e:
print(f"保存失败: {e}")
# 词频统计器(主题)
class WordFrequencyCounter:
def __init__(self):
self.observers = []
self.counter = Counter()
self.queue = Queue()
self.lock = threading.Lock()
def add_observer(self, observer: Observer):
self.observers.append(observer)
def remove_observer(self, observer: Observer):
self.observers.remove(observer)
def notify_observers(self):
for observer in self.observers:
observer.update(self.counter)
def process_file(self):
while True:
try:
file_path = self.queue.get_nowait()
except:
break
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read().lower()
words = re.findall(r'\b\w+\b', text)
with self.lock:
self.counter.update(words)
except Exception as e:
print(f"Error processing {file_path}: {e}")
finally:
self.queue.task_done()
def count_words(self, files, num_threads=4):
# 将文件路径放入队列
for file_path in files:
self.queue.put(file_path)
# 创建并启动线程
threads = [threading.Thread(target=self.process_file) for _ in range(num_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
# 通知所有观察者
self.notify_observers()
def main():
# 获取文件列表
data_dir = 'data'
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
# 创建词频统计器
counter = WordFrequencyCounter()
# 添加观察者
counter.add_observer(PrintTopWordsObserver())
counter.add_observer(SaveToFileObserver("word_frequency.txt"))
# 统计词频并通知观察者
counter.count_words(files)
if __name__ == '__main__':
main()

@ -1,3 +1,7 @@
# 插件模式提供一种个别扩展性开发和系统核心开发无关的松耦合结构。
# 简单说,第三方开发者在没有核心框架源码下也能扩展或者改造系统功能
import configparser, importlib.machinery
from cppy.cp_util import *

@ -1,3 +1,7 @@
'''
REST 风格是把数据资源拓展到了互联网环境用Web协议访问
组件可以用任何环境任何语言开发, 只需要遵循共同的 rest 协议
'''
# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify, abort
from functools import lru_cache

@ -1,3 +1,10 @@
'''
以查询和独立组件并发操作数据为中心的思路建数据库围绕数据库做查询操作
现代开发模式中编程语言很少直接用 SQL 操作数据库了常常有个隔离具体数据库更方便编程的ORM层
用工具把表数据映射为对象(ORM)
createDb 创建数据对象processData 处理数据DataQuery 查询输出数据相对 共享内存数据 它慢一些
'''
import sqlite3, os.path
from cppy.cp_util import testfilepath,db_filename,extract_file_words

@ -1,10 +1,9 @@
################ 待整理
'''
多线程各个模块比较乱的但是协作序贯的完成了数据处理
各个组件完全不能互操作仅依靠队列发消息进行协作
每个对象运行于独立线程每个对象只暴露一个队伍消息队列的接口
各个组件完全不能互操作仅依靠队列发消息进行协作/
多线程模式下消息队列来共享数据空间相比较传统数据库共享方式访问更快
适合环节多数据可分块有IO-计算性能设计考量要求让各个模块自己适应调整
在某些情况下可以避免复杂的控制流设计使代码简洁
这种模式改造下适合跨进程系统的后台响应对象设计.
'''
from threading import Thread

@ -0,0 +1,52 @@
'''
装饰器模式 (Decorator Pattern)
适用场景动态为词频统计添加功能如过滤停用词忽略标点或添加词性标注
如何应用
定义核心 WordCounter 接口负责基本词频统计
使用装饰器类动态添加功能 StopWordFilterDecorator 过滤停用词
优点功能扩展不修改核心逻辑符合开闭原则
知识点
如何动态扩展功能保持核心代码不变
体会组合优于继承的原则
'''
from abc import ABC, abstractmethod
from typing import List, Dict
from collections import Counter
class WordCounter(ABC):
@abstractmethod
def count_words(self, texts: List[str]) -> Dict[str, int]:
pass
class BasicWordCounter(WordCounter):
def count_words(self, texts: List[str]) -> Dict[str, int]:
import jieba
word_counts = Counter()
for text in texts:
words = jieba.cut(text)
word_counts.update([word for word in words if word.strip()])
return dict(word_counts)
class WordCounterDecorator(WordCounter, ABC):
def __init__(self, counter: WordCounter):
self.counter = counter
class StopWordFilterDecorator(WordCounterDecorator):
def __init__(self, counter: WordCounter, stop_words: List[str]):
super().__init__(counter)
self.stop_words = stop_words
def count_words(self, texts: List[str]) -> Dict[str, int]:
word_counts = self.counter.count_words(texts)
return {word: count for word, count in word_counts.items() if word not in self.stop_words}
# 使用
counter = StopWordFilterDecorator(BasicWordCounter(), stop_words=["", "", ""])
word_counts = counter.count_words(["这是一段测试文本。"])
print(word_counts)

@ -1,3 +1,6 @@
# 自己设计类装饰器,类方法做进行类型申明和检查
from collections import Counter
from cppy.cp_util import *

@ -1,6 +1,9 @@
import time
import cppy.cp_util as util
'''
用反射实现装饰器的效果
'''
# 工具函数
def extract_words(path_to_file):

@ -1,3 +1,7 @@
# Python 作为弱类型语言希望拥有强类型语言类似的规范工整工程性的优点,牺牲一些代码的自由度。
# 可以理解为更好的代码注释和更多的工程约束 。
import cppy.cp_util as util

@ -2,6 +2,7 @@ import sys
import re
from collections import Counter
# 提供带参数的运行命令行环境
# 使用 python command_line_1.py testfilepath 10
# 清洗文本,移除标点符号并转换为小写
@ -43,4 +44,5 @@ def main():
print(f"Error: {e}")
if __name__ == "__main__":
main()
main()

@ -1,6 +1,9 @@
import re
from collections import Counter
# 提供一个命令行交互方式来驱动程序运行
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()

@ -1,3 +1,7 @@
# 命令行菜单驱动程序
# while 循环体中出现一个 case 菜单,每个菜单调用对应函数
import os
import cppy.cp_util as util

@ -1,3 +1,6 @@
# 应用基于一个窗口,窗口基于操作系统的消息机制
import sys
from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QVBoxLayout, QTextEdit, QFileDialog
import cppy.cp_util as util

@ -1,3 +1,13 @@
'''
Web的MVC三层结构是互联网最火的应用框架
系统需要切分为模型视图控制三层
Python 中最简单的 Web服务器是 flask
app.py - Flask应用的主文件控制器Controller
models.py - 模型Model文件包含词频统计的逻辑
templates/index.html - 视图View文件用于展示结果
'''
from flask import Flask, render_template, request
from models import WordFrequencyModel

@ -2,6 +2,12 @@
import cppy.cp_util as util
from collections import Counter
'''
状态机是计算机程序运行的基础理论
使用状态机的风格来处理文件并计算词频我们可以将整个过程分解为一系列状态转移
每个状态代表处理过程中的一个阶段比如读取文件分割单词计算词频
这种方法在Python中并不常见但它展示了如何使用状态机来管理程序的状态和流程
'''
class WordFrequencyStateMachine:
def __init__(self, file_path):

@ -0,0 +1,8 @@
## 任务
本项目的主要功能任务对指定文件读取文本在停用词过滤后按降序输出前10个高频词
我们研究这个问题的解法在各种情景下的表现形式 。
将 cppy整个目录转移到 anaconda3\Lib\site-packages 下面。里面放了一些共同的代码片段(函数),这样使得模式代码更短小简洁,便于更能集中理解各种模式的核心思想 。

@ -0,0 +1,24 @@
# 目标
本节使用一个书城的各方面业务需求来展示面向对象的常见设计模式 。
# 任务
背景假设为一个综合书城,提供线上线下购买,还经营一个书吧、一个报告厅。
# 说明
面向对象的模式是把编程过程中的一些思路固定化,并给一个名字方便理解 。
它是软件工程中一组经过验证的、可重复使用的代码写法 。
所以,模式不是语法,而是编程思路 。
这样做的好处是,统一大家的代码形式,提高代码可读性、可维护性、可扩展性 。
那为啥,面向过程没有这么做
是因为这个思维提炼过程,充分利用了面向对象语言的特性:封装、继承、多态 。
面向过程语言,没有这些特性,所以,面向过程程序设计一般不谈设计模式 。
因为 Python 对象协议的机制,多态、接口概念发生了根本变化 。使得一些C++、Java 的模式没用了 。比如 “ 原型模式Prototype可以使用copy.deepcopy()非常简便来创建 。另外,很多模式中继承关系也没必要了。但,下面很多示例中依旧保持了基类 。一是致敬经典,二是起到一个工程上更工整和代码注释的作用 。
# 应用场景
面向对象设计模式在管理信息系统和图形用户界面系统应用比较广泛 。

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save