Compare commits

..

3 Commits
dev ... master

4
.gitignore vendored

@ -1,4 +0,0 @@
log.txt
/test
/.venv
__pycache__

@ -1,30 +0,0 @@
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def process_chunks( chunks,word_freqs,x,max ):
next = x + 1
if next < max:
process_chunks(chunks,word_freqs,next,max)
word_list = process_chunk(chunks[x])
word_freqs += Counter(word_list)
# def process_chunks( chunks,word_freqs,x,max ):
# word_list = process_chunk(chunks[x])
# word_freqs += Counter(word_list)
# next = x + 1
# if next < max:
# process_chunks(chunks,word_freqs,next,max)
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,2000)
word_freqs = Counter()
process_chunks( chunks,word_freqs,0,len(chunks) )
print_word_freqs( word_freqs.most_common(10) )

@ -1,101 +0,0 @@
from collections import Counter
from cppy.cp_util import *
class DataStorageManager:
"""
数据模型读取文件内容并将内容分割成单词
Attributes:
_data: 单词列表
Methods:
_words (self): 返回分割后的单词列表
"""
def __init__(self, path_to_file):
self._data = re_split(read_file(path_to_file))
def words(self):
"""返回分割后的单词列表。"""
return self._data
class StopWordManager:
"""
停用词模型
Attributes:
_stop_words: 停用词列表
Methods:
is_stop_word (self, word): 判断给定单词是否为停用词
"""
def __init__(self):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
"""判断给定单词是否为停用词。"""
return word in self._stop_words
class WordFrequencyManager:
"""
词频模型计算并管理单词的频率
Attributes:
_word_freqs: 使用 Counter 存储单词及其出现次数
Methods:
increment_count (self, word): 计算词频
sorted(self): 返回按出现次数排序的单词列表
"""
def __init__(self):
self._word_freqs = Counter()
def increment_count(self, word):
"""计算词频。"""
self._word_freqs[word] += 1
def sorted(self):
"""返回按出现次数排序的单词列表。"""
return self._word_freqs.most_common()
class WordFrequencyController:
"""
控制器控制整个流程读取文件处理停用词计算词频并输出结果
Attributes:
_storage_manager: DataStorageManager 实例用于读取和处理文件内容
_stop_word_manager: StopWordManager 实例用于管理停用词
_word_freq_manager: WordFrequencyManager 实例用于计算和存储单词频率
Methods:
run(self): 运行方法遍历单词列表过滤掉停用词并计算每个单词的频率最后输出结果
"""
def __init__(self, path_to_file):
self._storage_manager = DataStorageManager(path_to_file)
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
def run(self):
"""运行方法,遍历单词列表,过滤掉停用词,并计算每个单词的频率,最后输出结果。"""
for w in self._storage_manager.words():
if not self._stop_word_manager.is_stop_word(w):
self._word_freq_manager.increment_count(w)
word_freqs = self._word_freq_manager.sorted()
print_word_freqs(word_freqs)
if __name__ == '__main__':
WordFrequencyController(testfilepath).run()
'''
函数输入参数调用后你的马上接住返回值
类输入参数后实例化后你可以需要的时候去访问你需要的数据实例属性
'''

@ -1,52 +0,0 @@
from cppy.cp_util import *
def extract_words(obj, path_to_file):
"""
从文件中提取单词并存储在对象的 'data' 字段中
Args:
obj (dict): 存储数据的字典对象
path_to_file (str): 文件路径
"""
obj['data'] = extract_file_words(path_to_file)
def increment_count(obj, w):
"""
增加单词的计数如果单词不存在则将其计数设置为1
参数:
obj (dict): 存储单词频率的字典对象
w (str): 单词
"""
obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w] + 1
# 数据存储对象,包含初始化和获取单词的方法
data_storage_obj = {
'data': [], # 存储单词列表
'init': lambda path_to_file: extract_words(data_storage_obj, path_to_file
), # 初始化方法,提取文件中的单词
'words': lambda: data_storage_obj['data'] # 获取单词列表的方法
}
# 单词频率对象,包含增加计数和排序的方法
word_freqs_obj = {
'freqs': {}, # 存储单词频率的字典
'increment_count':
lambda w: increment_count(word_freqs_obj, w), # 增加单词计数的方法
'sorted': lambda: sort_dict(word_freqs_obj['freqs']) # 获取排序后的单词频率的方法
}
if __name__ == '__main__':
# 初始化数据存储对象,提取文件中的单词
data_storage_obj['init'](testfilepath)
# 遍历单词列表,增加单词的计数
for word in data_storage_obj['words']():
word_freqs_obj['increment_count'](word)
# 获取排序后的单词频率并打印
word_freqs = word_freqs_obj['sorted']()
print_word_freqs(word_freqs)

@ -1,3 +0,0 @@
from cppy.cp_util import *
print_word_freqs( sort_dict ( get_frequencies ( extract_file_words(testfilepath) )))

@ -1,28 +0,0 @@
from cppy.cp_util import *
# 如果有连续的对数据加工操作,而且总是把共同加工数据对象当第一个参数,可以用本文件夹方法提升阅读体验
# 框架类
class FunBind:
def bind(self, func,*args, **kwargs):
try:
self.data = func(self.data,*args, **kwargs)
except:
self.data = func(*args, **kwargs)
return self
data = FunBind()\
.bind(extract_file_words,testfilepath)\
.bind(get_frequencies)\
.bind(sort_dict)\
.bind(print_word_freqs,10)\
.data
print(data)
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''

@ -1,28 +0,0 @@
from cppy.cp_util import *
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''
class FunPipe:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def __or__(self, other):
_data = self.func(*self.args, **self.kwargs)
return FunPipe( other.func,_data,*other.args,**other.kwargs)
@property
def data(self):
return self.func(*self.args, **self.kwargs)
# 模仿管道
pipe = FunPipe(extract_file_words,testfilepath) | FunPipe(get_frequencies) | FunPipe(sort_dict) | FunPipe(print_word_freqs, 10)
pipe.data

@ -1,29 +0,0 @@
from cppy.cp_util import *
class Flow:
def extract_file_words(self, filepath):
self.data = extract_file_words(filepath)
return self
def get_frequencies(self):
self.data = get_frequencies(self.data)
return self
def sort_dict(self):
self.data = sort_dict(self.data)
return self
def print_word_freqs(self, n):
print_word_freqs(self.data, n)
return self
# 顺序调用
Flow().extract_file_words(testfilepath).get_frequencies().sort_dict().print_word_freqs(10)
'''
连续方法调用看起来比较舒服
但是需要假设
- 每一个类方法返回 self 否则没法连续
- 类方法默认不写第一个参数数据都在 .data 里面
'''

@ -1,50 +0,0 @@
from cppy.cp_util import *
# 装饰器改写类
# - 找到以f_开头的方法
# - 将方法函数的返回值赋值给对象的data属性
# - 返回对象自身
def return_self_decorator(cls):
def return_self(func):
# 定义一个闭包函数,用于接收参数
def wrapper(self, *args, **kwargs):
self.data = func(self, *args, **kwargs)
return self # 返回类自身
return wrapper
for name, method in cls.__dict__.items():
# 判断属性是否可调用且属性名以f_开头
if callable(method) and name.startswith('f_'):
# 为类改写属性,将封装后的函数赋值
setattr(cls, name, return_self(method))
return cls
@return_self_decorator
class Flow():
def test(self):
return 'test'
def f_extract_file_words(self, filepath):
return extract_file_words(filepath)
def f_get_frequencies(self):
return get_frequencies(self.data)
def f_sort_dict(self):
return sort_dict(self.data)
def f_print_word_freqs(self, n):
print_word_freqs(self.data, n)
# 顺序调用
Flow().f_extract_file_words(testfilepath).f_get_frequencies().f_sort_dict().f_print_word_freqs(10)
'''
改写后参与 function flow 功能的方法
- 需要以 'f_' 开头
- 类方法默认不写第一个参数数据都在 .data 里面
仍旧需要特殊的方法写法
所以还是 12种方法比较自然
'''

@ -1,26 +0,0 @@
from cppy.cp_util import *
from collections import Counter
# 定义一个带计数器的元类
class CounterMetaclass(type):
def __new__(mcs, name, bases, attrs):
attrs['_counter'] = Counter()
return super().__new__(mcs, name, bases, attrs)
# 基于元类创建类
class Word( metaclass=CounterMetaclass ):
def __init__(self, word):
self.word = word
self._counter[self.word] += 1
@classmethod
def get_word_freqs(cls,n) -> Counter:
return cls._counter.most_common(n)
for word in extract_file_words ( testfilepath ) : Word(word)
print_word_freqs(Word.get_word_freqs(10))
'''
常用于将依赖项如服务或配置自动注入到类中
'''

@ -1,20 +0,0 @@
from cppy.cp_util import *
#
# 生成器
#
def non_stop_words(testfilepath):
stopwords = get_stopwords()
data_str = read_file(testfilepath)
wordlist = re_split( data_str )
for word in wordlist:
if word not in stopwords:
yield word # 弹出一个非停用词
freqs = {}
for word in non_stop_words(testfilepath):
freqs[word] = freqs.get(word, 0) + 1
data = sort_dict(freqs)
print_word_freqs(data)

@ -1,56 +0,0 @@
import threading, queue
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数
class WordFrequencyCounter:
def __init__(self, input_file):
self.word_space = queue.Queue()
self.freq_space = queue.Queue()
for chunk in get_chunks(input_file,3000):
self.word_space.put(chunk)
def process_words(self):
while not self.word_space.empty():
try:
chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据
except queue.Empty:
break # 队列为空,退出循环
# print(f"Worker thread ID: {threading.get_ident()}",len(chunk))
words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ]
word_freqs = Counter(words)
self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典
def run(self):
workers = [ threading.Thread(target=self.process_words) for _ in range(5)]
for worker in workers: worker.start()
for worker in workers: worker.join()
word_freqs = Counter() # 初始化一个空的Counter对象
while not self.freq_space.empty():
freqs = self.freq_space.get()
if freqs: # 确保freqs非空
word_freqs.update(freqs)
print_word_freqs ( sort_dict (word_freqs) )
@timing_decorator
def main():
counter = WordFrequencyCounter( testfilepath )
counter.run()
if __name__ == '__main__':
main()
'''
在多线程之间传递数据建议使用线程安全的队列如queue.Queue或multiprocessing.Queue后者也适用于多进程环境
这些队列提供了线程安全的数据传输机制可以避免竞态条件和数据损坏
全局变量不可预测
multiprocessing.Queue 利用了操作系统提供的进程间通信IPC, Inter-Process Communication机制具体实现取决于不同操作系统的支持
在Unix/Linux系统中multiprocessing.Queue通常基于管道pipes共享内存和/或消息队列等机制实现
而在Windows系统上可能使用命名管道named pipes或者内存映射文件memory-mapped files以及某些版本的Windows特有的进程间同步对象如MutexesSemaphores和事件
'''

@ -1,62 +0,0 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
使用 multiprocessing.Manager 来完成统计词频
需要注意
- Manager() 必须用函数包起来,不能按脚本随便放外面否则会提示freeze_support
- 工作函数需要放到外面不能做内部函数否则会提示参数错误
- 无法在 Jupyter 类似环境运行
'''
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(shared_chunks,word_count):
while True:
try:
chunk = shared_chunks.pop(0) # 从共享列表中取出一个数据块
if chunk is None: break # 如果取出的是None表示所有数据块已处理完毕
words = extract_str_words(chunk)
for word in words:
if word not in stop_words:
word_count[word] = word_count.get(word, 0) + 1
except Exception as e:
print(e)
break
@timing_decorator
def main():
# 创建一个Manager实例
manager = Manager()
shared_chunks = manager.list()
word_count = manager.dict()
# 读取文件并按块大小分割,将块添加到共享列表中
chunk_size = 1024 * 10 # 假设每个块是10KB可以根据需要调整
with open(testfilepath, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk: break
shared_chunks.append(chunk)
shared_chunks.append(None)
print('-------------------',len(shared_chunks))
processes = [ Process( target=process_chunk,
args=(shared_chunks,word_count))
for _ in range( 4 ) ] # 假设启动4个工作进程
for p in processes: p.start()
for p in processes: p.join()
# 将Manager类型的字典转换为普通的字典以便使用Counter
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -1,42 +0,0 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
'''
# 使用 multiprocessing.Manager 来完成统计词频
# 怎么得到最快的一个结果是一个试错过程X程创建数目多少、分片的大小 ...
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(chunk,word_count):
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
for word in words: # 非常化时间
word_count[word] = word_count.get(word, 0) + 1
# word_count.update( Counter(words) ) # 类型不起作用
@timing_decorator
def main():
manager = Manager()
word_count = manager.dict()
chunks = get_chunks(testfilepath,2800)
print('-------------------',len(chunks))
processes = []
for chunk in chunks:
p = Process(target=process_chunk,
args=(chunk,word_count) )
processes.append(p)
p.start()
for p in processes: p.join()
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -1,76 +0,0 @@
# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify, abort
from functools import lru_cache
from cppy.cp_util import *
from functools import cache
app = Flask(__name__)
# 模拟数据库
books_db = []
# 用于缓存用户数据库的装饰器
@lru_cache(maxsize=None)
def get_books_db():
return books_db
#查询所有资源
@app.route('/books', methods=['GET'])
def get_books():
return jsonify(get_books_db())
#查询某个资源
@app.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = next((book for book in get_books_db() if book['id'] == book_id), None)
if book is None:
abort(404)
return jsonify(book['content'])
# 创建或更新新资源
@app.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
global books_db
book_to_update = request.json
print(book_to_update)
books_db = get_books_db()
book = next((book for book in books_db if book['id'] == book_id), None)
if book is None:
# 如果资源不存在,创建新资源
books_db.append(book_to_update)
else:
# 如果资源存在,更新资源
book.update(book_to_update)
# 清除缓存的数据库
cache.delete(get_books_db)
return jsonify(books_db), 200
#操作一个资源
@app.route('/books/<int:book_id>/word_frequency', methods=['GET'])
def word_frequency(book_id):
global books_db
book = next((book for book in get_books_db() if book['id'] == book_id), None)
filepath = book['content']
word_list = extract_file_words(filepath)
word_frequency = get_frequencies(word_list)
word_frequency = sort_dict(word_frequency)
print_word_freqs(word_frequency)
return jsonify(word_frequency), 200
@app.route('/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
global books_db
books_db = [book for book in books_db if book['id'] != book_id]
if len(books_db) == len([l for l in books_db if l['id'] == book_id]):
abort(404) # 用户不存在
return jsonify({'message': f'book {book_id} deleted'}), 200
if __name__ == '__main__':
app.run(debug=True)

@ -1,45 +0,0 @@
# -*- coding: utf-8 -*-
import requests
from cppy.cp_util import *
# 查询资源,得到空列表
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 创建一个1号资源
print('创建一个1号资源')
book_1 = {"id": 1, "title": "Python编程:从入门到实践", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/1'
response = requests.put(url,json=book_1)
time.sleep(2)
# - 创建一个2号资源修改testfilepaht变量
print('创建一个2号资源')
testfilepath = testfilepath.replace('Prey.txt','Pride-and-Prejudice.txt')
book_2 = {"id": 2, "title": "深入浅出计算机组成原理", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/2'
response = requests.put(url,json=book_2)
time.sleep(2)
# - 创建一个3号资源修改testfilepaht变量正好有3个文件
print('创建一个3号资源')
testfilepath = testfilepath.replace('Pride-and-Prejudice.txt','test.txt')
book_3 = {"id": 3, "title": "算法导论", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/3'
response = requests.put(url,json=book_3)
time.sleep(2)
# - 查询资源,看到结果
print('查询资源,看到结果')
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 操作1号资源得到词频
print('操作1号资源得到词频')
url = 'http://127.0.0.1:5000/books/1/word_frequency'
response = requests.get(url)
print_word_freqs(response.json())

@ -1,33 +0,0 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from functools import reduce
stop_words = get_stopwords()
# map - reduce
def process_chunk(chunk): # 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(count1,count2):
return count1 + count2
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 使用 map 方法和 process_chunk 函数处理每个分区
counts_list = list(map(process_chunk, chunks))
# 使用 reduce 和 merge_counts 函数统计所有分区的词频
total_counts = (reduce(merge_counts,counts_list))
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,48 +0,0 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from multiprocessing.pool import ThreadPool
#
# 多线程
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
def thread_function(chunk, counts_list):
word_count = process_chunk(chunk)
counts_list.append(word_count)
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 线程池
pool = ThreadPool(len(chunks)) # 随意指定的线程数
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,42 +0,0 @@
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
#
# 多进程: 因为创建进程相比计算过程开销太大,结果最慢
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
@timing_decorator
def main():
# 读取文件内容,分割文件内容为多个块,每个块由一个进程处理
chunks = get_chunks(testfilepath,1000)
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,46 +0,0 @@
import sys
import re
from collections import Counter
# 使用 python command_line_1.py testfilepath 10
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 主函数
def main():
# 检查命令行参数数量
if len(sys.argv) != 3:
print("Usage: python command_line_1.py <file_path> <n>")
sys.exit(1)
file_path = sys.argv[1]
n = int(sys.argv[2])
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"File not found: {file_path}")
except ValueError as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -1,48 +0,0 @@
import re
from collections import Counter
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 交互式提示用户输入文件路径和前n个单词的数量
def interactive_mode():
file_path = input("请输入文件路径 >> ")
try:
n = int(input("请输入你想要输出的前n个最常见单词的数量 >> "))
if n <= 0:
raise ValueError("数量必须大于0。")
except ValueError as e:
print(f"输入错误:{e}")
return
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"文件未找到: {file_path}")
except Exception as e:
print(f"发生错误: {e}")
# 主函数
def main():
print("欢迎使用词频统计工具。")
interactive_mode()
if __name__ == "__main__":
main()

@ -1,30 +0,0 @@
from flask import Flask, render_template, request, redirect, url_for
from collections import Counter
from cppy.cp_util import *
import os
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 获取上传的文件
file = request.files['file']
# 保存临时文件并读取内容
filename = os.path.join('/temp', file.filename)
file.save(filename)
# 计算词频
words = extract_file_words(filename)
word_counts = Counter(words)
# 删除临时文件
os.remove(filename)
return render_template('result.html', word_counts=word_counts.most_common())
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)

@ -1,14 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Upload Text File</title>
</head>
<body>
<h1>Upload a Text File to Count Word Frequencies</h1>
<form action="/" method="post" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Submit">
</form>
</body>
</html>

@ -1,16 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Word Frequencies</title>
</head>
<body>
<h1>Top Word Frequencies:</h1>
<ul>
{% for word, count in word_counts %}
<li>{{ word }}: {{ count }}</li>
{% endfor %}
</ul>
<a href="{{ url_for('index') }}">Back to Upload</a>
</body>
</html>

@ -1,34 +0,0 @@
# 创建对象是消耗资源的,如果发现对象已经存在,可以返回引用,不创造新对象 。设计模式中这个做法叫享元
from cppy.cp_util import *
#享元类
class WordFrequencyController():
def __init__(self, controllertype,filepath ):
word_list = extract_file_words(filepath)
word_freq = get_frequencies(word_list)
self.word_freq = sort_dict(word_freq)
self.number = controllertype
def print_word_freqs( self ):
print_word_freqs( self.word_freq,self.number)
#享元工厂
class WordFrequencyControllerFactory():
def __init__(self):
self.types = {}
def get_WordFrequencyController(self, number,testfilepath):
if number not in self.types:
self.types[number] = WordFrequencyController(number,testfilepath) # 创建新的对象
print('new obj: ','*'*30,number)
else:
print('ref obj: ','*'*30,number)
return self.types[number] # 重复使用已存在的对象
if __name__ == "__main__":
factory = WordFrequencyControllerFactory()
for number in [ 1,3,5,3,5,7 ]:
WordFrequency = factory.get_WordFrequencyController(number,testfilepath)
# print(flush=True)
WordFrequency.print_word_freqs()

@ -1,64 +0,0 @@
'''
入门级示例是用来帮助理解其他例子
把观察者挂到自己的处理队列上
适当时机调用所有队列上的约定的观察者的 update 方法
如果观察者有多个职能参与不同的任务链不一定要统一命名update方法
这是一个示例性质的原型具体环境下需要调整
'''
import collections
from abc import ABC, abstractmethod
from cppy.cp_util import *
# 定义观察者接口 ,在 Pyhon中并不是必须
class Observer(ABC):
@abstractmethod
def update(self, word):
pass
# 定义具体观察者类,用于统计词频
class WordFrequencyObserver(Observer):
def __init__(self):
self.word_count = collections.Counter()
def update(self, word):
self.word_count[word] += 1
# 定义主题类
class WordSubject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def notify(self, word):
for observer in self.observers:
observer.update(word)
# 主函数
def main(testfilepath, top_n = 10 ):
stopwords = get_stopwords()
subject = WordSubject()
# 创建一个观察者并附加到主题
observer = WordFrequencyObserver()
subject.attach(observer)
# 处理文件
wordlist = re_split( read_file(testfilepath) )
for word in wordlist:
if word not in stopwords:
subject.notify(word) # 通知
# 打印最高的N个词频
top_words = observer.word_count.most_common(top_n)
print_word_freqs(top_words)
if __name__ == "__main__":
main( testfilepath )

@ -1,69 +0,0 @@
'''
本例的基本模式还是观察者
基类 Subject 提供注册和提醒注册上的对象提醒机制
因为函数和参数混杂在一起传递使得各个模块的处理结构其实是 case by case
'''
from collections import Counter
from typing import List
from cppy.cp_util import *
class Subject:
def register_handler(self, handler: callable, *args, **kwargs):
self.handler = handler
self.args = args
self.kwargs = kwargs
def notify(self, *args, **kwargs):
self.handler( self.data, *self.args, **self.kwargs)
# 组件一TextLoader - 负责读取文本并过滤停用词
class TextLoader(Subject):
def load_text(self, filename: str) -> List[str]:
return extract_file_words(filename)
def notify(self, *args, **kwargs):
filename = args[0]
self.data = self.load_text(filename)
super().notify(self.data, *args, **kwargs)
# 组件二WordCounter - 计算词频
class WordCounter(Subject):
def count_words(self, words: List[str]) -> dict:
return Counter(words)
def notify(self, *args, **kwargs ):
words = args[0]
self.data = self.count_words(words)
super().notify(self.data, *args, **kwargs)
# 组件三TopWordsPresenter - 排序并输出前10个词
class TopWordsPresenter(Subject):
def notify(self, words,*args, **kwargs):
n = args[0]
top_words = words.most_common(n)
print_word_freqs( top_words )
# 主程序逻辑
def main():
loader = TextLoader()
counter = WordCounter()
presenter = TopWordsPresenter()
# 注册事件处理器
loader.register_handler(counter.notify)
counter.register_handler( presenter.notify,10 )
# 触发加载文本并开始流程
loader.notify(testfilepath)
if __name__ == "__main__":
main()

@ -1,86 +0,0 @@
################ 待整理
'''
注册者 = 观察者
每个组件提供注册消息接口和注册消息动作
在其它单元上注册自己对于特定事件消息的响应函数
同时负责自己的注册队列的序贯调用
Python 中有一个Callable类型可以用来判断是否是可以回调类型
from typing import Callable
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
from cppy.cp_util import *
#
# event_manager
#
class EventManager:
def __init__(self):
self.load_handlers = [] # 用于加载文件的事件处理器
self.process_handlers = [] # 用于处理数据的事件处理器
self.end_handlers = [] # 用于结束流程的事件处理器
def register_load_event(self, handler):
self.load_handlers.append(handler)
def register_process_event(self, handler):
self.process_handlers.append(handler)
def register_end_event(self, handler):
self.end_handlers.append(handler)
# 运行框架,按顺序执行注册的事件处理器
def run(self, file_path):
for handler in self.load_handlers: handler(file_path)
for handler in self.process_handlers: handler()
for handler in self.end_handlers: handler()
#
# 功能组件
#
# 定义数据存储类,用于模拟文件内容的加载和处理
class TextData:
_word_event_handlers = []
def __init__( self, event_manager ):
self._stop_words = get_stopwords()
event_manager.register_load_event(self.__load)
event_manager.register_process_event(self.__process_words)
def __load(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def __process_words(self):
for word in self._data:
if word not in self._stop_words:
for handler in self._word_event_handlers:
handler(word)
def register_word_event(self, handler):
self._word_event_handlers.append(handler)
class WordFrequencyCounter:
def __init__(self, event_manager, data_storage):
self._word_freqs = defaultdict(int) # 存储单词频率
data_storage.register_word_event(self.__increment_count) # 注册单词事件
event_manager.register_end_event(self.__print_freqs) # 注册结束事件
def __increment_count(self, word):
self._word_freqs[word] += 1
def __print_freqs(self):
print_word_freqs ( sort_dict (self._word_freqs) )
if __name__ == '__main__':
em = EventManager()
data_storage = TextData(em)
word_freq_counter = WordFrequencyCounter(em, data_storage)
em.run(testfilepath)

@ -1,107 +0,0 @@
################ 待整理
from cppy.cp_util import *
'''
订阅者 = 注册者 = 观察者
注册回调的一个变体
要点是中心化统一化
为了简化消息订阅可能形成的复杂性
提供一个中心消息管理器统一负责消息的订阅和回调
各个功能组件只是完成自己的功能
在中心管理器上订阅消息挂到自己响应的处理函数上
总结相比较的改变
- 注册的时候通过提供一个类型字段标识不同消息
- 其它实体不做注册和做回调统一这两个功能到一个中心单元
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
#################################################
# Event Manager
#################################################
class EventManager:
def __init__(self):
self._subs = defaultdict(list)
def subscribe(self, event_type, handler):
self._subs[event_type].append(handler)
def publish(self, event):
event_type = event[0]
for handle in self._subs.get(event_type, []):
handle(event)
#################################################
# Application Entities
#################################################
class DataStorage:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self._load)
self._event_manager.subscribe('start', self.produce_words)
def _load(self, event):
self._data = extract_file_words( event[1] )
def produce_words(self, _):
for word in self._data:
self._event_manager.publish(('word', word ))
self._event_manager.publish(('eof', None))
class StopWordFilter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load_stop_words)
self._event_manager.subscribe('word', self.filter_word)
self._stop_words = set()
def load_stop_words(self, _ ):
self._stop_words = set( get_stopwords() )
def filter_word(self, event):
word = event[1]
if word not in self._stop_words:
self._event_manager.publish(('valid_word', word))
class WordFrequencyCounter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('valid_word', self.count_word)
self._event_manager.subscribe('print', self.print_freqs)
self._word_freqs = {}
def count_word(self, event):
word = event[1]
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def print_freqs(self, _ ):
print_word_freqs ( sort_dict (self._word_freqs) )
class WordFrequencyApp:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('run', self.start_application)
self._event_manager.subscribe('eof', self.stop_application)
def start_application(self, event):
path_to_file = event[1]
self._event_manager.publish(('load', path_to_file))
self._event_manager.publish(('start', ))
def stop_application(self, _ ):
self._event_manager.publish(('print', ))
def main():
event_manager = EventManager()
DataStorage( event_manager )
StopWordFilter( event_manager )
WordFrequencyCounter( event_manager )
WordFrequencyApp( event_manager )
event_manager.publish(('run', testfilepath ))
if __name__ == "__main__":
main()

@ -1,9 +0,0 @@
注册
- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。
- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。
- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。
- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。
- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。

@ -1,98 +0,0 @@
################ 待整理
'''
应用场景针对各个组件的 notify 方法发指令来驱动所有工作
这是一个示例性质的原型具体分布式环境下需要调整
notify 用了四种写法是和本主题无关的测试
'''
from cppy.cp_util import *
from collections import defaultdict
badmsg = lambda : exec (''' raise Exception("Message not understood " , action ) ''')
class fff:
def __init__(self, d):
self._data = defaultdict( badmsg )
self._data.update(d)
def __getitem__(self, key):
return self._data[key]
class DataStorageMod():
def __init__(self):
self._data = []
def notify(self, action, *args):
return {
'init': lambda : self._init,
'words': lambda : self._words
}.get( action , badmsg )()(*args)
def _init(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
class StopWordMod():
_stop_words = []
def notify(self, action, *args):
return { 'init': self._init,
'is_stop_word': self._is_stop_word
}[ action ](*args)
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, wordx):
return wordx in self._stop_words
class WordFrequencyMod():
_word_freqs = {}
def notify(self, action, *args):
return fff( {
'increment_count': lambda : self._increment_count,
'sorted': lambda : self._sorted
})[ action ]()(*args)
def _increment_count(self, word):
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sort_dict(self._word_freqs)
class ScenarioManager():
def notify(self, action, *args):
if action == 'init':
return self._init( *args)
elif action == 'run':
return self._run()
else:
raise Exception("Message not understood " + action )
def _init(self, path_to_file):
self._storage_manager = DataStorageMod()
self._stop_word_manager = StopWordMod()
self._word_freq_manager = WordFrequencyMod()
self._storage_manager.notify('init', path_to_file)
self._stop_word_manager.notify('init')
def _run(self):
for word in self._storage_manager.notify('words'):
if not self._stop_word_manager.notify('is_stop_word', word):
self._word_freq_manager.notify('increment_count', word )
word_freqs = self._word_freq_manager.notify('sorted')
print_word_freqs(word_freqs)
if __name__ == '__main__':
sm = ScenarioManager()
sm.notify('init', testfilepath)
sm.notify('run')

@ -1,24 +0,0 @@
from cppy.cp_util import *
# 这个例子没有实际意义,是用来帮助理解其他例子
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
def readfile(file_path, func):
data = read_file(file_path)
func(data, frequencies)
def extractwords(str_data,func):
func(extract_str_words(str_data), sort)
def frequencies(word_list, func):
wf = get_frequencies(word_list)
func(wf, printall)
def sort(wf, func):
func(sort_dict(wf), None)
def printall(word_freqs, _ ):
print_word_freqs(word_freqs)
if __name__ == "__main__":
readfile(testfilepath, extractwords)

@ -1,102 +0,0 @@
'''
后续组件挂载到前序组件后续链上
仅提供 self.next_observer 的抽象关系
后续组件接到指令和数据自己决定动作
理论上每个组件可以参与到多个生产队列
本例使用了类来封装消息相对于字符串理论上提供了更丰富的扩展可能
这是一个示例性质的原型具体环境下需要调整
'''
from collections import Counter
from typing import List, Dict
from cppy.cp_util import *
# 定义消息类型
class Message:
def __init__(self, data):
self.data = data
class TokenizedText(Message):
pass
class FilteredText(Message):
pass
class WordFrequency(Message):
pass
# 定义观察者接口
class Observer:
def notify(self, message: Message):
pass
# 切词订阅者
class TokenizerSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message.data, str):
return
tokenized_text = re_split(message.data)
self.next_observer.notify(TokenizedText(tokenized_text))
# 停用词订阅者
class StopWordsRemoverSubscriber(Observer):
def __init__(self, next_observer: Observer, stop_words: List[str]):
self.next_observer = next_observer
self.stop_words = set(stop_words)
def notify(self, message: Message):
if not isinstance(message, TokenizedText):
return
filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ]
self.next_observer.notify(FilteredText(filtered_text))
# 词频统计订阅者
class WordFrequencyCalculatorSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message, FilteredText):
return
word_freq = Counter(message.data)
self.next_observer.notify( WordFrequency(word_freq) )
# 输出前N个词订阅者
class TopNWordsDisplaySubscriber(Observer):
def __init__(self, n: int):
self.n = n
def notify(self, message: Message):
if not isinstance(message, WordFrequency):
return
print_word_freqs( message.data.most_common(self.n) )
# 模拟发布者
def publish_text(text: str, observers: List[Observer]):
for observer in observers:
observer.notify(Message(text))
# 主函数
def main():
text = read_file()
stop_words = get_stopwords()
# 创建订阅者链
display_subscriber = TopNWordsDisplaySubscriber( n=10 )
freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber)
stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words)
tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber)
# 发布文本
publish_text(text, [tokenizer_subscriber])
if __name__ == "__main__":
main()

@ -1,25 +0,0 @@
import requests
from cppy.cp_util import *
def main():
# 读测试文件的内容
content = read_file()
# 抽词
tokenize_response = requests.post("http://localhost:7770/tokenize", json={"text": content})
words = tokenize_response.json()["words"]
# 计算词频
count_response = requests.post("http://localhost:7771/count", json={"words": words})
word_count = count_response.json()["word_count"]
# 排序
sort_response = requests.post("http://localhost:7772/sort", json={"word_count": word_count})
top_10_words = sort_response.json()["top_10_words"]
print("Top 10 words:")
print_word_freqs(top_10_words)
if __name__ == "__main__":
main()

@ -1,14 +0,0 @@
from fastapi import FastAPI
from collections import Counter
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/count")
async def count(words_list: dict): # {"words": ["word1", "word2", ...]}
word_count = Counter(words_list["words"])
return {"word_count": dict(word_count)}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7771)

@ -1,13 +0,0 @@
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.post("/sort")
async def sort(word_count_dict: dict):
sorted_word_count = sorted(word_count_dict["word_count"].items(), key=lambda x: x[1], reverse=True)
top_10_words = sorted_word_count[:10]
return {"top_10_words": top_10_words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7772)

@ -1,13 +0,0 @@
from fastapi import FastAPI
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/tokenize")
async def tokenize(text: str):
words = extract_str_words(text)
return {"words": words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7770)

@ -1,5 +0,0 @@
[Plugins]
;; Options: plugins/f1.pyc, plugins/f2.pyc
frequencies = plugins/f2.pyc

@ -1,30 +0,0 @@
import configparser, importlib.machinery
from cppy.cp_util import *
class PluginManager:
def __init__(self):
self.plugins = {}
def load_plugins(self):
_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(_dir)
config = configparser.ConfigParser()
config.read("config.ini")
frequencies_plugin = config.get("Plugins", "frequencies")
# 加载插件
self.plugins['word_freqs'] = importlib.machinery.SourcelessFileLoader('', frequencies_plugin).load_module()
def get_plugin(self, name):
return self.plugins.get(name)
# 创建 PluginManager 实例
plugin_manager = PluginManager()
plugin_manager.load_plugins()
wordlist = extract_file_words(testfilepath) # 提取文件中的单词
word_freqs = plugin_manager.get_plugin('word_freqs').top_word(wordlist) # 调用实例方法
print_word_freqs(word_freqs) # 打印词频

@ -1,28 +0,0 @@
import py_compile
py_compile.compile('f1.py')
py_compile.compile('f2.py')
import os
import shutil
# 设置源目录和目标目录
source_dir = os.path.join(os.path.dirname(__file__), '__pycache__') # 当前目录下的 __pycache__ 目录
target_dir = os.path.join(os.path.dirname(__file__), '..', 'plugins') # 上一级目录下的 plugins 目录
# 确保目标目录存在
os.makedirs(target_dir, exist_ok=True)
# 遍历源目录中的所有 .pyc 文件
for filename in os.listdir(source_dir):
if filename.endswith('.pyc'):
# 提取文件名的前两个字符
new_filename = filename[:2]
# 构建源文件和目标文件的完整路径
source_file = os.path.join(source_dir, filename)
target_file = os.path.join(target_dir, new_filename + '.pyc')
# 拷贝文件
shutil.copyfile(source_file, target_file)
# 删除原始文件
os.remove(source_file)
print(f"Copied {filename} to {target_file} and removed original file.")

@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-
import collections
def top_word(word_list):
counts = collections.Counter( word_list )
return counts.most_common(10)

@ -1,16 +0,0 @@
import cppy.cp_util as util
def extract_words(path_to_file:str) -> list:
return util.extract_file_words(path_to_file)
def frequencies( word_list:list ) -> dict :
return util.get_frequencies(word_list)
def sort(word_freq:dict) -> list :
return util.sort_dict(word_freq)
if __name__ == "__main__":
word_freqs = sort( frequencies(extract_words( util.testfilepath )) )
util.print_word_freqs(word_freqs)

@ -1,36 +0,0 @@
from cppy.cp_util import *
from dataclasses import dataclass
from collections import Counter
import re
@dataclass
class WordFrequency:
text: str
stop_words: set = None
def __post_init__(self):
# 如果未提供停用词表
if self.stop_words is None:
self.stop_words = get_stopwords()
def tokenize(self):
# 分词并去除停用词
words = re.findall(r'\b\w+\b', self.text.lower())
filtered_words = [word for word in words if word not in self.stop_words and len(word)>2]
return filtered_words
def get_top_n(self, n=10):
# 计算词频
word_freqs = Counter(self.tokenize())
return word_freqs.most_common(n)
# 使用示例
if __name__ == '__main__':
# 创建WordFrequency实例
text = read_file()
word_freq = WordFrequency( text )
# 获取并打印词频
top_words = word_freq.get_top_n()
print_word_freqs(top_words)

@ -1,25 +0,0 @@
from cppy.cp_util import *
def extractWords(path_to_file):
assert(type(path_to_file) is str), "Must be a string"
assert(path_to_file), "Must be a non-empty string"
return extract_file_words(path_to_file)
def frequencies(word_list):
assert(type(word_list) is list), "Must be a list"
assert(word_list != []), "Must be a non-empty list"
return get_frequencies(word_list)
def sort(word_freqs):
assert(type(word_freqs) is dict), "Must be a dictionary"
assert(word_freqs != {}), "Must be a non-empty dictionary"
return sort_dict(word_freqs)
if __name__ == '__main__':
try:
word_freqs = sort(frequencies(extractWords( testfilepath )))
print_word_freqs(word_freqs)
except Exception as e:
print(" Something wrong: {0}".format(e) )

@ -1,192 +0,0 @@
import site
import os, re, time
import string, operator
################################################################################
# 变量
################################################################################
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy', 'data', 'stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy', 'data', testfilename)
################################################################################
# 项目函数
################################################################################
def read_file(path_to_file):
"""
读取指定文件的内容
Args:
path_to_file (str): 文件路径
Returns:
str: 文件内容
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read()
return data
def re_split(data):
"""
使用正则表达式分割字符串将非字母字符替换为空格并将所有字符转换为小写
Args:
data (str): 输入字符串
Returns:
list: 分割后的单词列表
"""
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords(path_to_file=stopwordfilepath):
"""
获取停用词列表
Args:
path_to_file (str): 停用词文件路径默认为 stopwordfilepath
Returns:
list: 停用词列表
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read().split(',')
data.extend(list(string.ascii_lowercase))
return data
def get_chunks(file_path=testfilepath, chunk_size=1000):
"""
将文件内容分割成多个块
Args:
file_path (str): 文件路径默认为 testfilepath
chunk_size (int): 每个块的大小默认为 1000
Returns:
list: 分割后的块列表
"""
content = re_split(read_file(file_path))
chunks = [
content[i:i + chunk_size] for i in range(0, len(content), chunk_size)
]
return chunks
def extract_file_words(path_to_file):
"""
提取文件中的单词去除停用词和长度小于3的单词
Args:
path_to_file (str): 文件路径
Returns:
list: 提取后的单词列表
"""
word_list = re_split(read_file(path_to_file))
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def extract_str_words(data_str):
"""
提取字符串中的单词去除停用词和长度小于3的单词
Args:
data_str (str): 输入字符串
Returns:
list: 提取后的单词列表
"""
word_list = re_split(data_str)
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def count_word(word, word_freqs, stopwords):
"""
统计单词频率
Args:
word (str): 单词
word_freqs (dict): 单词频率字典
stopwords (list): 停用词列表
"""
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
"""
获取单词频率
Args:
word_list (list): 单词列表
Returns:
dict: 单词频率字典
"""
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict(word_freq):
"""
对字典进行排序
Args:
word_freq (dict): 单词频率字典
Returns:
list: 排序后的单词频率列表
"""
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
def print_word_freqs(word_freqs, n=10):
"""
打印单词频率
Args:
word_freqs (list): 单词频率列表
n (int): 打印的单词数量默认为 10
"""
for (w, c) in word_freqs[:n]:
print(w, '-', c)
################################################################################
# 通用工具
################################################################################
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 调用原始函数
end_time = time.time() # 记录结束时间
run_time = end_time - start_time # 计算运行时间
print(f"{func.__name__} 运行时间: {run_time*1000:.2f}")
return result
return wrapper
def test():
print('cppy welcome')

@ -1,4 +0,0 @@
## 任务
本项目的主要功能任务:做文本文件的分词,过滤常见词,求词频,并排序输出。

@ -1,74 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为普通做法即使用requests库通过Post请求爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='普通爬取进度', unit='', ncols=80)
title_list = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,86 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多进程做法即使用多进程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import multiprocessing
import tqdm
lock = multiprocessing.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通做法")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
title_list = []
pbar = tqdm.tqdm(total=size * 10, desc='多进程爬取进度', unit='', ncols=80)
with multiprocessing.Pool(processes=5) as pool:
results = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
results.append(pool.apply_async(spider.fetch, (config, )))
for result in results:
data = result.get()
title_list += spider.parse(data)
lock.acquire()
pbar.update(size)
lock.release()
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,89 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用多线程并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import util
import logging
from typing import List
import tqdm
lock = threading.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行多线程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='多线程爬取进度', unit='', ncols=80)
title_list = []
tasks = []
with ThreadPoolExecutor(max_workers=5) as executor:
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
future = executor.submit(spider.fetch, config)
tasks.append(future)
# 更新进度条
lock.acquire()
pbar.update(size)
lock.release()
for future in as_completed(tasks):
data = future.result()
title_list += spider.parse(data)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,89 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为协程做法即使用gevent库通过协程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import gevent
from gevent import monkey
# 打补丁使标准库能够与gevent协同工作
monkey.patch_all()
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行协程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='协程爬取进度', unit='', ncols=80)
title_list = []
def fetch_and_parse(keyword, current):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
titles = spider.parse(data)
title_list.extend(titles)
pbar.update(size)
jobs = [
gevent.spawn(fetch_and_parse, keyword, current) for keyword in keywords
for current in range(1, 11)
]
gevent.joinall(jobs)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,85 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用异步并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))
```
"""
import asyncio
import util
import logging
from typing import List
import tqdm
@util.timeit_async
async def main_async(keywords: List[str],
begin_date: str,
end_date: str,
size: int = 10):
"""
使用异步方式爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行异步爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='异步爬取进度', unit='', ncols=80)
title_list = []
tasks = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
task = asyncio.create_task(spider.fetch_async(config))
tasks.append(task)
for task in asyncio.as_completed(tasks):
data = await task
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))

@ -1,25 +0,0 @@
# 目标
本节使用一个爬虫任务来展示如何追求代码的性能 。
充分理解线程、协程、进程、同步、异步、阻塞、非阻塞等概念,并能够根据具体场景选择合适的并发模型。
主线问题如何解决IO和计算速度不匹配、如何任务分解、分发和协作 。
# 任务
# 讨论分析
普通做法连续进行了五次测试时间分别为34.231s、34.091s、34.164s、34.226s、33.958s平均时间为34.134s
多进程(进程数=5连续进行了五次测试时间分别为7.719s、7.716s、7.690s、7.730s、7.711s平均时间为7.7132s
多线程(线程数=5连续进行了五次测试时间分别为7.185s、7.964s、6.983s、6.969s、7.035s平均时间为7.2272s
协程连续进行了五次测试时间分别为3.775s、3.807s、3.733s、3.824s、3.744s平均时间为3.776s
异步连续进行了五次测试时间分别为6.975s、7.675s、7.018s、7.032s、7.049s平均时间为7.1498s
为保证公平性每一次Post请求后休眠3秒
可以看出,协程的性能最好,普通做法的性能最差,多线程、多进程和异步的性能介于两者之间。
考虑到多进程和多线程是故意开的5个进程和线程而协程是单线程所以协程的性能最好。
另外,异步的性能最差,可能是由于异步的并发模型需要频繁地切换线程,导致性能下降。
总的来说,协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。
# 总结
协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。

@ -1,188 +0,0 @@
"""
"""
import re
import time
import functools
import json
import asyncio
import requests
from typing import Any, Dict, List
class Spider:
"""
爬虫类
Args:
keywords (List[str]): 用于搜索新闻的关键词列表
begin_date (str): 开始日期用于搜索
end_date (str): 结束日期用于搜索
size (int): 一次请求返回的新闻或政策的最大数量
Attributes:
URL (str): 网址
"""
# 天水市人民政府网站
URL = ('https://www.tianshui.gov.cn/aop_component/'
'/webber/search/search/search/queryPage')
def __init__(self, keywords: List[str], begin_date: str, end_date: str,
size: int):
self.keywords = keywords
self.begin_date = begin_date
self.end_date = end_date
self.size = size
def get_config(self, keyword: str, current: int) -> Dict[str, Any]:
"""
获取配置信息
Args:
keyword (str): 关键词
size (int): 一次请求返回的新闻的最大数量
Returns:
Dict[str, Any]: 配置信息
"""
return {
"aliasName": "article_data,open_data,mailbox_data,article_file",
"keyWord": keyword,
"lastkeyWord": keyword,
"searchKeyWord": False,
"orderType": "score",
"searchType": "text",
"searchScope": "3",
"searchOperator": 0,
"searchDateType": "custom",
"searchDateName": f"{self.begin_date}-{self.end_date}",
"beginDate": self.begin_date,
"endDate": self.end_date,
"showId": "c2ee13065aae85d7a998b8a3cd645961",
"auditing": ["1"],
"owner": "1912126876",
"token": "tourist",
"urlPrefix": "/aop_component/",
"page": {
"current": current,
"size": self.size,
"pageSizes": [2, 5, 10, 20, 50, 100],
"total": 0,
"totalPage": 0,
"indexs": []
},
"advance": False,
"advanceKeyWord": "",
"lang": "i18n_zh_CN"
}
def generate_headers(self) -> dict:
"""
生成请求头
Returns:
dict: 请求头
"""
return {
'Authorization':
'tourist',
'User-Agent':
('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit'
'/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari'
'/537.36 Edg/124.0.0.0')
}
def fetch(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
普通做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
time.sleep(3)
return json.loads(response)
async def fetch_async(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
异步做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
await asyncio.sleep(3)
return json.loads(response)
def parse(self, data: Dict[str, Any]) -> List[str]:
"""
解析网页内容
Args:
data (Dict[str, Any]): 网页内容
Returns:
List[str]: 标题列表
"""
title_list = []
records = data['data']['page']['records']
for i in range(self.size):
title = records[i]['title']
title = re.sub('<[^>]*>', '', title) # 去除html标签
title_list.append(title)
# print(title)
return title_list
def save(self, title_list: List[str]):
"""
保存数据
"""
pass
# 时间装饰器
def timeit(func):
"""
计算函数运行时间
Args:
func: 函数
Return:
函数
"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper
def timeit_async(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper

@ -1,29 +0,0 @@
# 目标
本节使用一个书城的各种业务环节来展示面向对象的各种设计模式 。
# 任务
背景假设为一个综合书城,提供线上线下购买,还经营一个书吧、一个报告厅。
# 说明
面向对象的模式把编程过程中的一些思路固定化,并给一个名字方便理解 。
它是软件工程中一组经过验证的、可重复使用的代码写法 。
所以,模式不是语法,而是编程思路 。
这样做的好处是,统一大家的代码形式,提高代码可读性、可维护性、可扩展性 。
那为啥,面向过程没有这么做
是因为这个思维提炼过程,充分利用了面向对象语言的特性:封装、继承、多态 。
面向过程语言,没有这些特性,所以,面向过程语言没有面向对象模式 。
因为 Python 对象协议的机制,多态、接口概念发生了根本变化 。
很多模式中,类的继承关系没必要了。下面示例中很多依旧保持了基类 。
一是致敬经典,二是起到一个工程上更工整和强注释的作用 。
另外,Python的动态语言的特性 。使得一些C++、Java 的模式没用了 。
比如 “ 原型模式Prototype可以使用copy.deepcopy()非常简便来创建 。
# 应用场景
面向对象设计模式在管理信息系统和图形用户界面系统应用比较广泛 。

@ -1,17 +0,0 @@
'''
全局只允许一个实例的办法
在该电商系统中全局只有一个数据库连接使用单例模式确保在整个应用程序内只创建一次数据库连接实例
'''
class DatabaseConnection:
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.connect_to_db()
return cls._instance
def connect_to_db(self):
# 连接到数据库的代码...
pass

@ -1,31 +0,0 @@
# 如果有一个选择结构来决定实现不同的类,再面向对象设计里面一般把这个选择做成一个类,叫做工厂模式
# 定义一个ProductFactory类用于创建不同类型的商品实例如电子产品、书籍等。具体的产品由子类实现。
#
class Product:
def __init__(self, name, price):
self.name = name
self.price = price
class Electronic(Product):
def __init__(self, name, price, brand):
super().__init__(name, price)
self.brand = brand
class Book(Product):
def __init__(self, name, price, author):
super().__init__(name, price)
self.author = author
class ProductFactory:
@staticmethod
def create_product(product_type, *args, **kwargs):
if product_type == 'electronic':
return Electronic(*args, **kwargs)
elif product_type == 'book':
return Book(*args, **kwargs)
else:
raise ValueError("Invalid product type")
# 使用工厂方法创建产品
product = ProductFactory.create_product('book', 'Python编程艺术', 50.0, 'Mark Lutz')

@ -1,114 +0,0 @@
'''
建造者模式Builder Pattern允许构建一个复杂对象的各个部分然后一步一步地返回这个对象的完整版本
将建造者模式应用于网购的下单和出库过程时我们可以设计一个Order类来表示订单
以及一个OrderBuilder类来构建订单的各个部分
此外我们还可以引入一个ShoppingCart类来表示购物车以及一个Inventory类来处理库存和出库逻辑
'''
######################################################################
# Order类它包含订单的基本信息如下单时间、用户信息、订单项列表
######################################################################
from datetime import datetime
class OrderItem:
def __init__(self, product_id, quantity):
self.product_id = product_id
self.quantity = quantity
class Order:
def __init__(self, user_id, order_items, order_time=None):
self.user_id = user_id
self.order_items = order_items
self.order_time = order_time or datetime.now()
self.status = "PLACED" # 初始状态为已下单
def __str__(self):
return f"Order for user {self.user_id} placed at {self.order_time}. Status: {self.status}"
def fulfill(self, inventory):
# 出库逻辑,这里简化处理
for item in self.order_items:
if not inventory.deduct_stock(item.product_id, item.quantity):
return False
self.status = "FULFILLED"
return True
######################################################################
# OrderBuilder类用于构建订单
######################################################################
class OrderBuilder:
def __init__(self):
self.reset()
def reset(self):
self._user_id = None
self._order_items = []
def for_user(self, user_id):
self._user_id = user_id
return self
def add_item(self, product_id, quantity):
self._order_items.append(OrderItem(product_id, quantity))
return self
def build(self):
if not self._user_id or not self._order_items:
raise ValueError("Order cannot be built without user and items.")
return Order(self._user_id, self._order_items)
######################################################################
# 购物车和库存类
######################################################################
class ShoppingCart:
def __init__(self, user_id):
self.user_id = user_id
self.items = {} # {product_id: quantity}
def add_to_cart(self, product_id, quantity):
self.items[product_id] = self.items.get(product_id, 0) + quantity
def checkout(self):
order_items = [OrderItem(product_id, quantity) for product_id, quantity in self.items.items()]
self.items.clear() # 清空购物车
return order_items
class Inventory:
def __init__(self):
self.stock = {} # {product_id: quantity}
def add_stock(self, product_id, quantity):
self.stock[product_id] = self.stock.get(product_id, 0) + quantity
def deduct_stock(self, product_id, quantity):
if self.stock.get(product_id, 0) >= quantity:
self.stock[product_id] -= quantity
return True
return False
######################################################################
# 模拟整个下单和出库过程
######################################################################
# 初始化库存和购物车
inventory = Inventory()
inventory.add_stock("book1", 10)
inventory.add_stock("book2", 5)
cart = ShoppingCart(user_id="user123")
cart.add_to_cart("book1", 2)
cart.add_to_cart("book2", 1)
# 使用OrderBuilder构建订单
order_items = cart.checkout() # 结账,获取订单项列表并清空购物车
order_builder = OrderBuilder().for_user("user123")
for item in order_items:
order_builder.add_item(item.product_id, item.quantity)
order = order_builder.build() # 构建订单对象
print(order) # 输出订单信息
# 出库处理
if order.fulfill(inventory):
print("Order has been fulfilled.")
else:
print("Order fulfillment failed due to insufficient stock.")

@ -1,55 +0,0 @@
'''
享元模式Flyweight Pattern可以用来减少对象的创建数量比如对于重复的书籍信息或者频繁请求的书籍分类可以通过享元模式来共享这些信息以提高内存使用效率和系统性能
在下面的代码中BookFlyweight 是享元抽象类它使用了一个类级别的字典 _books 来存储已经创建的书籍对象__new__ 方法被用来在创建新实例之前检查是否已经存在具有相同ISBN的书籍对象如果已经存在就返回那个对象的引用如果不存在就创建一个新对象并将其存储在 _books 字典中
请注意在这个例子中我故意尝试使用相同的ISBN但不同的标题来创建书籍对象以展示不正确的使用方式在真正的享元模式实现中一旦对象被创建并且其内在状态被设置在这个例子中是由ISBN标题和作者定义的就不应该再修改这些状态如果需要处理变化的状态通常会将这部分状态外部化并通过方法的参数传递给享元对象
另外要注意的是享元模式主要适用于大量细粒度对象且这些对象可以共享状态的情况在书籍的例子中ISBN是一个很好的共享状态的键但标题和作者通常不应该在对象创建后被改变因此这个例子更多的是为了展示享元模式的基本结构和原理而不是一个完全贴合实际的实现在实际应用中需要更仔细地设计享元对象的不可变状态和可变状态
'''
# 享元抽象类
class BookFlyweight:
_books = {}
def __new__(cls, isbn, title, author):
# 根据ISBN创建或获取书籍享元对象
if isbn not in cls._books:
cls._books[isbn] = super(BookFlyweight, cls).__new__(cls)
cls._books[isbn].set_book_info(title, author)
return cls._books[isbn]
def set_book_info(self, title, author):
self.title = title
self.author = author
def get_book_info(self):
return f"{self.title} by {self.author}"
# 享元工厂类
class BookFactory:
@staticmethod
def get_book(isbn, title, author):
return BookFlyweight(isbn, title, author)
# 客户端代码
if __name__ == "__main__":
# 使用相同的ISBN创建书籍对象它们应该是同一个对象的引用
book1 = BookFactory.get_book("123456789", "The Great Gatsby", "F. Scott Fitzgerald")
book2 = BookFactory.get_book("123456789", "The Same Book With Different Title?", "F. Scott Fitzgerald")
# 尽管我们试图设置不同的标题但因为ISBN相同所以它们是同一个对象
# 实际上,在这个实现中,我们应该确保在创建对象时就设置好所有必要的属性,并且之后不再修改它们。
# 这里为了演示,我们错误地修改了标题,这不是享元模式的典型用法。
# 在实际应用中,应该避免在享元对象创建后修改其内在状态(除了可能的状态复位)。
print(book1.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
print(book2.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
# 使用不同的ISBN创建书籍对象它们应该是不同的对象
book3 = BookFactory.get_book("987654321", "1984", "George Orwell")
print(book3.get_book_info()) # 输出1984 by George Orwell
# 验证是否是同一个对象
print(book1 is book2) # 输出True
print(book1 is book3) # 输出False

@ -1,23 +0,0 @@
# 装饰器模式允许我们在不修改原有类的基础上,动态地添加额外的功能。
# 就增加功能来说,装饰器模式比生成子类更为灵活。
# 餐吧的顾客可以选择为他们的咖啡添加额外的调料。
class Beverage:
def __init__(self, description):
self.description = description
self.price = 0.0
def cost(self):
return self.price
class CondimentDecorator(Beverage): # 进行装饰
def __init__(self, beverage, description, price_increase):
self.beverage = beverage
self.description = f"{beverage.description}, {description}"
self.price_increase = price_increase
def cost(self):
return self.beverage.cost() + self.price_increase
# 使用装饰器模式
coffee = Beverage("Espresso")
coffee_with_chocolate = CondimentDecorator(coffee, "Chocolate", 0.50)

@ -1,42 +0,0 @@
'''
适配器模式Adapter
应用将一个类的接口转换成客户期望的另一个接口使得原本由于接口不兼容而无法一起工作的类能够一起工作
'''
########################################################################
# 定义一个目标接口Target和一个与之不兼容的类Adaptee
############################################################################
# 目标接口
class Target:
def request(self):
pass
# 需要适配的类
class Adaptee:
def specific_request(self):
print("Called Adaptee's specific_request.")
########################################################################
# 定义一个适配器类Adapter它实现了Target接口并且持有Adaptee的实例
# 从而能够在request方法中调用Adaptee的specific_request方法
# 一个继承,一个当参数加入构造函数
############################################################################
# 适配器
class Adapter(Target):
def __init__(self, adaptee):
self.adaptee = adaptee
def request(self):
# 调用Adaptee的specific_request方法
self.adaptee.specific_request()
if __name__ == "__main__":
# 创建Adaptee实例
adaptee = Adaptee()
# 创建Adapter实例将Adaptee实例作为参数传递
adapter = Adapter(adaptee)
# 客户端调用Target的request方法实际上调用的是Adaptee的specific_request方法
adapter.request()

@ -1,60 +0,0 @@
'''
代理模式Proxy Pattern为其他对象提供一种代理以控制对这个对象的访问
在书城的业务背景中代理模式Proxy Pattern可以应用于多种场景例如实现延迟加载访问控制远程代理等
下面示例展示如何使用代理模式来控制对书城中书籍对象的访问
假设我们有一个Book类代表书城中的书籍和一个BookProxy类作为Book的代理类来控制对书籍的访问
'''
# 书籍类
class Book:
def __init__(self, title, author, price):
self.title = title
self.author = author
self.price = price
self.is_loaded = False # 假设书籍内容初始时是未加载的
def load_content(self):
# 模拟加载书籍内容的过程,这里仅打印一条消息
print(f"Loading content for book '{self.title}' by {self.author}...")
self.is_loaded = True
def display(self):
if not self.is_loaded:
self.load_content()
print(f"Book Title: {self.title}")
print(f"Author: {self.author}")
print(f"Price: {self.price}")
print("Content is loaded and displayed.")
# 书籍代理类
class BookProxy:
def __init__(self, book):
self.book = book
def display(self):
# 在显示书籍信息之前,代理可以控制一些额外的操作
# 比如检查用户权限、记录访问日志等
# 这里我们模拟一个简单的访问控制
print("Checking access permissions...")
# 假设权限检查通过调用实际对象的display方法
self.book.display()
# 客户端代码
if __name__ == "__main__":
# 创建一个书籍对象(假设内容尚未加载)
book = Book("The Great Gatsby", "F. Scott Fitzgerald", 29.99)
# 创建一个书籍代理对象
book_proxy = BookProxy(book)
# 通过代理来访问书籍信息
book_proxy.display()
'''
在这个示例中Book类有一个load_content方法来模拟加载书籍内容的过程以及一个display方法来显示书籍的信息
在实际应用中load_content可能会执行更加复杂的操作如从数据库或远程服务器加载数据
BookProxy类作为代理包装了对Book对象的访问
在这个简单的例子中它在调用display方法之前执行了一个模拟的权限检查
在实际应用中代理类可以执行各种操作如缓存懒加载权限验证等
客户端代码通过创建BookProxy对象来间接访问Book对象而不是直接访问
这种方式提供了一种灵活的控制机制使得可以在不修改原始类的情况下增加额外的功能或控制逻辑
'''

@ -1,75 +0,0 @@
'''
在书城的业务背景中外观模式Facade Pattern可以用于提供一个简化的接口以隐藏系统的复杂性
假设书城提供了多种服务如用户认证购物车管理订单处理等外观模式可以将这些服务整合到一个统一的接口中
使客户端能够更方便地使用这些服务
下面是一个简单的实现代码示例展示如何使用外观模式来整合书城的不同服务
'''
# 用户服务类
class UserService:
def authenticate(self, username, password):
# 这里是用户认证的实现代码
print(f"Authenticating user {username}...")
return True # 假设认证总是成功
# 购物车服务类
class CartService:
def add_to_cart(self, user_id, book_id):
# 这里是将书籍添加到购物车的实现代码
print(f"User {user_id} added book {book_id} to the cart.")
def remove_from_cart(self, user_id, book_id):
# 这里是从购物车中移除书籍的实现代码
print(f"User {user_id} removed book {book_id} from the cart.")
# 订单服务类
class OrderService:
def create_order(self, user_id, cart_items):
# 这里是创建订单的实现代码
print(f"Creating order for user {user_id} with items {cart_items}...")
return "OrderID123" # 假设返回一个订单ID
# 书城外观类
class BookstoreFacade:
def __init__(self):
self.user_service = UserService()
self.cart_service = CartService()
self.order_service = OrderService()
def login_and_add_to_cart(self, username, password, book_id):
if self.user_service.authenticate(username, password):
print("Login successful.")
# 假设用户ID为1实际应用中应该通过认证服务获取
user_id = 1
self.cart_service.add_to_cart(user_id, book_id)
else:
print("Login failed.")
def checkout(self, username, password):
if self.user_service.authenticate(username, password):
print("Login successful.")
# 假设用户ID为1实际应用中应该通过认证服务获取
user_id = 1
# 假设获取购物车项目的方法存在(实际应用中需要实现)
cart_items = self.get_cart_items(user_id)
if cart_items:
order_id = self.order_service.create_order(user_id, cart_items)
print(f"Order created with ID: {order_id}")
else:
print("Your cart is empty.")
else:
print("Login failed.")
def get_cart_items(self, user_id):
# 这里应该有一个方法来获取购物车中的项目,但为了简化示例,我们直接返回一个列表
return [1, 2, 3] # 假设的书籍ID列表
# 客户端代码
if __name__ == "__main__":
bookstore = BookstoreFacade()
# 用户登录并添加书籍到购物车
bookstore.login_and_add_to_cart("alice", "password123", "book456")
# 用户结账创建订单
bookstore.checkout("alice", "password123")

@ -1,85 +0,0 @@
'''
在书城的业务背景中组合模式Composite Pattern可以用于构建树形结构比如书籍的分类结构
每个分类可以包含子分类也可以包含具体的书籍通过这种方式可以方便地管理和遍历整个书籍分类体系
下面是一个简单的实现代码示例展示如何使用组合模式来构建书城的书籍分类结构
'''
from abc import ABC, abstractmethod
# 组件抽象类
class BookComponent(ABC):
@abstractmethod
def add(self, component):
pass
@abstractmethod
def remove(self, component):
pass
@abstractmethod
def display(self, depth):
pass
# 叶子节点:书籍类
class Book(BookComponent):
def __init__(self, title, author):
self.title = title
self.author = author
def add(self, component):
print("Cannot add to a leaf node")
def remove(self, component):
print("Cannot remove from a leaf node")
def display(self, depth):
print("-" * depth + f" {self.title} by {self.author}")
# 复合节点:书籍分类类
class BookCategory(BookComponent):
def __init__(self, name):
self.name = name
self.children = []
def add(self, component):
self.children.append(component)
def remove(self, component):
self.children.remove(component)
def display(self, depth):
print("-" * depth + self.name)
for child in self.children:
child.display(depth + 1)
# 客户端代码
if __name__ == "__main__":
# 创建书籍分类和书籍对象
fiction = BookCategory("Fiction")
non_fiction = BookCategory("Non-Fiction")
novel = Book("The Great Gatsby", "F. Scott Fitzgerald")
biography = Book("Steve Jobs", "Walter Isaacson")
programming = Book("Clean Code", "Robert C. Martin")
# 构建书籍分类结构
fiction.add(novel)
non_fiction.add(biography)
non_fiction.add(programming)
# 创建一个根分类,并将其他分类添加到其中
root = BookCategory("Root")
root.add(fiction)
root.add(non_fiction)
# 显示整个书籍分类结构
root.display(0)
'''
在这个示例中BookComponent 是一个抽象类定义了所有组件无论是分类还是书籍都应该有的方法addremove display
Book 类是叶子节点代表具体的书籍它实现了 BookComponent 接口
add remove 方法对于书籍来说是不适用的因此它们只是打印一条错误消息
BookCategory 类是复合节点代表书籍的分类它可以包含其他分类或书籍因此它实现了 add remove 方法来管理子节点并且实现了 display 方法来显示分类及其子节点的信息
客户端代码创建了一些书籍和分类对象并构建了一个书籍分类结构最后通过调用根分类的 display 方法可以显示整个书籍分类结构
'''

@ -1,94 +0,0 @@
'''
桥接模式Bridge Pattern可以将抽象与实现解耦让它们可以独立变化
这在处理多种分类的书籍时特别有用比如你想在不同的平台上展示这些书籍同时这些书籍还分属不同的分类
下面是一个简单的实现代码示例展示如何使用桥接模式来构建书城的书籍分类与展示平台
'''
# 定义书籍接口
class IBook:
def get_title(self):
pass
def get_author(self):
pass
# 具体书籍实现
class NovelBook(IBook):
def __init__(self, title, author):
self.title = title
self.author = author
def get_title(self):
return self.title
def get_author(self):
return self.author
# 定义抽象分类
class BookCategory:
def __init__(self, name):
self.name = name
self.books = []
def add_book(self, book):
self.books.append(book)
def get_books(self):
return self.books
# 定义抽象展示平台
class DisplayPlatform:
def display(self, book):
pass
# 具体展示平台实现
class WebDisplayPlatform(DisplayPlatform):
def display(self, book):
return f"On the web: {book.get_title()} by {book.get_author()}"
class MobileDisplayPlatform(DisplayPlatform):
def display(self, book):
return f"On mobile: {book.get_title()} by {book.get_author()}"
# 桥接类,将分类与展示平台连接起来
class BookShop:
def __init__(self, category, platform):
self.category = category
self.platform = platform
def show_books(self):
for book in self.category.get_books():
print(self.platform.display(book))
# 客户端代码
if __name__ == "__main__":
# 创建书籍
novel1 = NovelBook("The Great Gatsby", "F. Scott Fitzgerald")
novel2 = NovelBook("1984", "George Orwell")
# 创建分类
fiction_category = BookCategory("Fiction")
fiction_category.add_book(novel1)
fiction_category.add_book(novel2)
# 创建展示平台
web_platform = WebDisplayPlatform()
mobile_platform = MobileDisplayPlatform()
# 创建书城并展示书籍
web_bookshop = BookShop(fiction_category, web_platform)
web_bookshop.show_books()
mobile_bookshop = BookShop(fiction_category, mobile_platform)
mobile_bookshop.show_books()
'''
在这个示例中
IBook 是一个接口定义了书籍应有的行为比如获取标题和作者
NovelBook 是一个具体书籍类实现了 IBook 接口
BookCategory 是一个书籍分类类它可以包含多个书籍实例
DisplayPlatform 是一个抽象展示平台类定义了如何展示书籍
WebDisplayPlatform MobileDisplayPlatform 是具体展示平台类分别实现了 DisplayPlatform 接口以提供不同的展示方式
BookShop 是一个桥接类它将书籍分类与展示平台连接起来通过 show_books 方法可以展示分类中的所有书籍
'''

@ -1,66 +0,0 @@
# 和工厂模式类似,不过这里的结果只是产生不同的类方法
# 设想书店有多种折扣策略,比如“普通会员折扣”、“金牌会员折扣”和“无折扣”。每种折扣策略都是一个具体的策略实现。
from abc import ABC, abstractmethod
########################################################
# 创建折扣策略接口
########################################################
class DiscountStrategy(ABC):
@abstractmethod
def calculate_discount(self, book_price):
pass
########################################################
# 创建实现了DiscountStrategy接口的具体折扣策略类
########################################################
class NoDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price # 无折扣,原价返回
class RegularMemberDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price * 0.9 # 普通会员9折
class GoldMemberDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price * 0.8 # 金牌会员8折
########################################################
# 定义Book类和Bookstore类。Book类包含书籍的信息和价格Bookstore类则使用折扣策略来计算书籍的折后价
########################################################
class Book:
def __init__(self, title, price):
self.title = title
self.price = price
class Bookstore:
def __init__(self, discount_strategy):
self.discount_strategy = discount_strategy
def set_discount_strategy(self, discount_strategy):
self.discount_strategy = discount_strategy
def calculate_final_price(self, book):
discounted_price = self.discount_strategy.calculate_discount(book.price)
return discounted_price
if __name__ == "__main__":
# 创建书籍对象
book = Book("The Great Gatsby", 30.0)
# 创建折扣策略对象
no_discount = NoDiscountStrategy()
regular_discount = RegularMemberDiscountStrategy()
gold_discount = GoldMemberDiscountStrategy()
# 创建书店对象,并设置不同的折扣策略
bookstore = Bookstore(no_discount)
print(f"No Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")
bookstore.set_discount_strategy(regular_discount)
print(f"Regular Member Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")
bookstore.set_discount_strategy(gold_discount)
print(f"Gold Member Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")

@ -1,45 +0,0 @@
# 观察者模式允许一个对象(观察者)监听另一个对象(主题)的状态变化,并在状态变化时得到通知。
# 主类信息发生变化,通知登记的各个对象(把自己当当参数传过去)自行处理变化
# 当购物车中的商品数量发生变化时,库存系统和价格计算系统需要实时更新。
# 观察者模式Observer或发布-订阅模式Publish-Subscribe
import abc
class Observer(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update(self, cart):
pass
class InventorySystem(Observer):
def update(self, cart):
# 更新库存逻辑...
pass
class PriceCalculator(Observer):
def update(self, cart):
# 重新计算总价逻辑...
pass
class ShoppingCart:
def __init__(self):
self._items = {}
self.observers = []
def add_item(self, item_id, quantity):
# 添加商品到购物车并通知所有观察者
self._items[item_id] = quantity
for observer in self.observers:
observer.update(self)
def attach(self, observer):
self.observers.append(observer)
# 创建购物车并添加观察者
cart = ShoppingCart()
inventory_system = InventorySystem()
price_calculator = PriceCalculator()
cart.attach(inventory_system)
cart.attach(price_calculator)
cart.add_item('item1', 2) # 当添加商品时,库存系统和价格计算器都会收到更新通知

@ -1,80 +0,0 @@
'''
状态模式State Pattern允许一个对象在其内部状态改变时改变它的行为
这个模式将状态封装成独立的类并将状态转换的逻辑分散到这些类中从而减少相互间的依赖
以下是一个使用状态模式的简单示例我们将创建一个订单类Order它有几个状态
Placed已下单Paid已支付Fulfilled已履行和Delivered已交付
每个状态都是一个类它们继承自一个抽象状态类OrderState
'''
################################################################################
# 定义抽象状态类和一些具体的状态类
################################################################################
class OrderState:
def handle(self, order):
pass
class PlacedState(OrderState):
def handle(self, order):
print("Order placed. Waiting for payment...")
order.set_state(order.get_paid_state())
class PaidState(OrderState):
def handle(self, order):
print("Order paid. Preparing for fulfillment...")
order.set_state(order.get_fulfilled_state())
class FulfilledState(OrderState):
def handle(self, order):
print("Order fulfilled. Preparing for delivery...")
order.set_state(order.get_delivered_state())
class DeliveredState(OrderState):
def handle(self, order):
print("Order delivered. Process completed.")
################################################################################
# 定义Order类它包含一个对当前状态的引用并且能够通过set_state方法改变其状态
################################################################################
class Order:
def __init__(self):
self._state = None
self.set_state(self.get_placed_state())
def set_state(self, state):
self._state = state
def get_state(self):
return self._state
def get_placed_state(self):
return PlacedState()
def get_paid_state(self):
return PaidState()
def get_fulfilled_state(self):
return FulfilledState()
def get_delivered_state(self):
return DeliveredState()
def process(self):
self._state.handle(self)
################################################################################
# 创建一个Order对象并模拟其状态转换
################################################################################
if __name__ == "__main__":
order = Order()
# 模拟订单处理流程
order.process() # 初始状态为Placed执行后将变为Paid
order.process() # 当前状态为Paid执行后将变为Fulfilled
order.process() # 当前状态为Fulfilled执行后将变为Delivered
order.process() # 当前状态为Delivered执行后不会改变因为Delivered是最终状态
################################################################################
# 这个例子中每个状态类都负责决定下一个状态是什么并在handle方法中触发状态转换。
# Order类不直接知道所有可能的状态转换这些逻辑被封装在状态类中。
# 这使得添加新的状态或修改现有状态的行为变得更加容易因为不需要修改Order类本身。
################################################################################

@ -1,71 +0,0 @@
'''
模板方法模式Template Method
定义算法的骨架而将一些步骤延迟到子类中实现
'''
from abc import ABC, abstractmethod
class AbstractClass(ABC):
def template_method(self):
# 这是一个模板方法,它定义了一个算法的骨架
self.base_operation1()
self.required_operations1()
self.base_operation2()
self.hook1()
self.required_operations2()
self.base_operation3()
self.hook2()
@abstractmethod
def base_operation1(self):
pass
@abstractmethod
def base_operation2(self):
pass
@abstractmethod
def base_operation3(self):
pass
@abstractmethod
def required_operations1(self):
pass
@abstractmethod
def required_operations2(self):
pass
def hook1(self):
pass # 钩子操作,子类可以选择是否覆盖
def hook2(self):
pass # 另一个钩子操作
class ConcreteClass(AbstractClass):
def base_operation1(self):
print("AbstractClass says: I am doing the bulk of the work")
def base_operation2(self):
print("AbstractClass says: But I let subclasses override some operations")
def base_operation3(self):
print("AbstractClass says: But I am doing the bulk of the work anyway")
def required_operations1(self):
print("ConcreteClass says: Implemented Operation1")
def required_operations2(self):
print("ConcreteClass says: Implemented Operation2")
def hook1(self):
print("ConcreteClass says: Overridden Hook1")
def hook2(self):
# 没有覆盖hook2所以它将执行AbstractClass中的空实现
pass
if __name__ == "__main__":
concrete_class = ConcreteClass()
concrete_class.template_method()

@ -1,86 +0,0 @@
'''
中介者模式Mediator Pattern是一种行为型设计模式它定义了一个中介对象来封装一系列对象之间的交互
中介者使各对象不需要显式地相互引用从而使其耦合松散而且可以独立地改变它们之间的交互
设想书店管理系统其中包含了多个组件比如库存管理订单处理顾客管理等
这些组件之间需要相互通信来协同工作我们可以使用中介者模式来减少这些组件之间的直接依赖
在这个示例中BookstoreMediator 充当了中介者的角色它协调了库存管理订单处理和顾客管理之间的交互
当某个事件发生时比如顾客下订单相应的组件会通过中介者发送通知中介者再根据事件类型协调其他组件的响应
'''
from abc import ABC, abstractmethod
#定义一个中介者接口和具体的中介者实现
# 中介者接口
class Mediator(ABC):
@abstractmethod
def notify(self, sender, event):
pass
# 具体的中介者实现
class BookstoreMediator(Mediator):
def __init__(self, inventory, order, customer):
self.inventory = inventory
self.order = order
self.customer = customer
def notify(self, sender, event):
if event == 'book_ordered':
print("BookstoreMediator: Order notification received. Checking inventory...")
if self.inventory.has_stock():
print("BookstoreMediator: Inventory has stock. Processing order...")
self.order.process_order()
self.customer.notify_order_success()
else:
print("BookstoreMediator: Inventory out of stock. Cancelling order...")
self.order.cancel_order()
self.customer.notify_order_failure()
# 可以添加更多的事件处理逻辑
# 库存管理组件
class Inventory:
def has_stock(self):
# 这里简化逻辑,直接返回有库存
return True
# 订单处理组件
class Order:
def process_order(self):
print("Order: Order is being processed...")
def cancel_order(self):
print("Order: Order is being cancelled...")
# 顾客管理组件
class Customer:
def notify_order_success(self):
print("Customer: Order successful. Notifying customer...")
def notify_order_failure(self):
print("Customer: Order failed. Notifying customer...")
# 组件之间的交互通过中介者进行
class BookstoreComponent:
def __init__(self, mediator):
self.mediator = mediator
def send_notification(self, event):
self.mediator.notify(self, event)
# 示例使用
if __name__ == "__main__":
# 创建组件
inventory = Inventory()
order = Order()
customer = Customer()
# 创建中介者并注入组件
mediator = BookstoreMediator(inventory, order, customer)
# 创建与中介者关联的书店组件(这里以订单为例)
order_component = BookstoreComponent(mediator)
# 触发事件(比如:下订单)
order_component.send_notification('book_ordered')

@ -1,96 +0,0 @@
'''
职责链模式Chain of Responsibility Pattern
一种行为设计模式它允许你将请求沿着处理者链进行发送
收到请求后每个处理者均可对请求进行处理或将其传递给链上的下个处理者
在书城的业务背景中可以想象有这样的场景
客户下了一个订单该订单需要经历多个处理步骤比如验证库存计算价格生成发货通知等
这些处理步骤按照一定的顺序组织成一个处理链
'''
class Order:
def __init__(self, book_id, quantity):
self.book_id = book_id
self.quantity = quantity
class Handler:
def __init__(self, successor=None):
self.successor = successor
def handle_request(self, order):
raise NotImplementedError("Subclasses must implement handle_request()")
def successor_handle_request(self, order):
if self.successor is not None:
return self.successor.handle_request(order)
class InventoryHandler(Handler):
def handle_request(self, order):
if check_inventory(order.book_id, order.quantity):
print("InventoryHandler: Inventory checked, enough stock.")
return self.successor_handle_request(order)
else:
print("InventoryHandler: Inventory check failed, not enough stock.")
return False
class PricingHandler(Handler):
def handle_request(self, order):
price = calculate_price(order.book_id, order.quantity)
print(f"PricingHandler: Price calculated, total: {price}")
return self.successor_handle_request(order)
class NotificationHandler(Handler):
def handle_request(self, order):
send_notification(order)
print("NotificationHandler: Notification sent to customer.")
# Since this is the end of the chain, no successor to call.
return True
# Dummy functions to simulate the bookstore logic
def check_inventory(book_id, quantity):
return True # Simulate enough stock
def calculate_price(book_id, quantity):
return 10.0 * quantity # Simulate a fixed price per book
def send_notification(order):
pass # Simulate sending a notification to the customer
# Setting up the Chain of Responsibility
inventory_handler = InventoryHandler()
pricing_handler = PricingHandler()
notification_handler = NotificationHandler()
inventory_handler.successor = pricing_handler
pricing_handler.successor = notification_handler
# Client code
order = Order(book_id="12345", quantity=2)
result = inventory_handler.handle_request(order)
if result:
print("Order processing completed successfully.")
else:
print("Order processing failed.")
'''
在这个例子中我们有一个Handler基类它定义了一个处理请求的基本框架
具体的处理逻辑则在子类InventoryHandlerPricingHandler和NotificationHandler中实现
这些子类形成了职责链通过successor引用链接在一起
客户下了一个订单后这个订单请求会从库存处理器InventoryHandler开始处理
如果库存足够请求将被传递给价格处理器PricingHandler来计算总价
然后价格处理器再将请求传递给通知处理器NotificationHandler
最后发送通知给客户
如果任何处理器无法处理请求例如库存不足处理将在那里终止并且返回相应的结果
这个模式的好处是可以很容易地改变处理的顺序或者增加/删除处理器而不需要修改已有的代码
'''

@ -1,75 +0,0 @@
'''
命令模式Command Pattern是一种行为设计模式它封装了一个请求作为一个对象从而让你使用不同的请求把客户端与服务端操作解耦在书城的业务背景中命令模式可以用来实现例如添加购物车项结算订单发送通知等操作
下面是一个简单的使用Python实现的命令模式示例以书城的购物车操作为例
'''
# 购物车类
class ShoppingCart:
def __init__(self):
self.items = []
def add_item(self, book_id, quantity):
self.items.append({'book_id': book_id, 'quantity': quantity})
print(f"Added {quantity} of book ID {book_id} to the cart.")
def remove_item(self, book_id):
self.items = [item for item in self.items if item['book_id'] != book_id]
print(f"Removed book ID {book_id} from the cart.")
# 命令接口
class Command:
def execute(self):
pass
# 具体命令类 - 添加购物车项
class AddToCartCommand(Command):
def __init__(self, shopping_cart, book_id, quantity):
self.shopping_cart = shopping_cart
self.book_id = book_id
self.quantity = quantity
def execute(self):
self.shopping_cart.add_item(self.book_id, self.quantity)
# 具体命令类 - 移除购物车项
class RemoveFromCartCommand(Command):
def __init__(self, shopping_cart, book_id):
self.shopping_cart = shopping_cart
self.book_id = book_id
def execute(self):
self.shopping_cart.remove_item(self.book_id)
# 调用者Invoker
class Invoker:
def __init__(self):
self.commands = []
def store_and_execute_command(self, command):
self.commands.append(command)
command.execute()
# 客户端代码
if __name__ == "__main__":
cart = ShoppingCart()
invoker = Invoker()
# 创建添加购物车项命令
add_command = AddToCartCommand(cart, "12345", 2)
invoker.store_and_execute_command(add_command)
# 创建移除购物车项命令
remove_command = RemoveFromCartCommand(cart, "12345")
invoker.store_and_execute_command(remove_command)
'''
在这个例子中我们有一个ShoppingCart类它有两个方法add_item和remove_item用于添加和移除购物车项然后我们定义了一个命令接口Command和一个调用者InvokerInvoker可以存储并执行命令
具体命令类AddToCartCommand和RemoveFromCartCommand实现了Command接口并在execute方法中调用了ShoppingCart的相应操作
客户端代码创建了ShoppingCart实例和Invoker实例并创建了添加和移除购物车项的具体命令对象这些命令对象通过调用者的store_and_execute_command方法被存储和执行
这个命令模式的实现将购物车操作和实际的执行逻辑解耦允许客户端在不知道具体操作的情况下存储和执行命令这对于支持撤销操作记录命令历史排队命令等高级功能非常有用
'''

@ -1,75 +0,0 @@
'''
备忘录模式Memento Pattern
在不破坏封装的前提下捕获一个对象的内部状态并在该对象之外保存这个状态
备忘录模式Memento Pattern可以用来保存和恢复书籍对象的内部状态
例如用户可能想要保存他们当前阅读的书籍状态比如阅读进度高亮部分等以便之后可以恢复到这个状态
'''
# 书籍类,包含阅读状态
class Book:
def __init__(self, title, author):
self.title = title
self.author = author
self.page = 1 # 初始页码
self.highlights = [] # 高亮部分
def read(self, page):
self.page = page
print(f"Reading page {self.page} of '{self.title}' by {self.author}")
def highlight(self, text):
self.highlights.append(text)
print(f"Highlighted text: {text}")
def get_current_state(self):
return BookMemento(self.page, self.highlights[:])
def restore_state(self, memento):
self.page = memento.get_page()
self.highlights = memento.get_highlights()
# 备忘录类,用于保存书籍状态
class BookMemento:
def __init__(self, page, highlights):
self.page = page
self.highlights = highlights
def get_page(self):
return self.page
def get_highlights(self):
return self.highlights
# 客户端代码
if __name__ == "__main__":
# 创建书籍对象
book = Book("The Hobbit", "J.R.R. Tolkien")
# 阅读和高亮文本
book.read(50)
book.highlight("Gollum's precious!")
# 保存当前状态
memento = book.get_current_state()
# 改变状态
book.read(100)
book.highlight("The dragon is coming!")
# 恢复状态
print("Restoring state...")
book.restore_state(memento)
print(f"Current page is {book.page}")
print("Highlighted texts:")
for highlight in book.highlights:
print(highlight)
'''
在这个示例中Book 类代表了一本书包含了页码和高亮部分等状态信息BookMemento 类用于保存书籍的某一时刻的状态Book 类提供了 get_current_state 方法来创建并返回一个包含当前阅读状态的 BookMemento 对象同样地Book 类也提供了 restore_state 方法来从 BookMemento 对象中恢复状态
客户端代码创建了一个书籍对象对其进行了阅读和高亮操作然后保存了当前状态之后改变了书籍的状态并使用之前保存的状态恢复了书籍对象
这样用户就可以在需要的时候回到之前的阅读状态无论是在同一个会话中还是在不同的会话中这种模式在需要提供撤销恢复功能的应用中非常有用
'''

@ -1,72 +0,0 @@
'''
访问者模式Visitor Pattern
表示一个作用于某对象结构中的各元素的操作它使你可以在不改变各元素的类的前提下定义作用于这些元素的新操作
访问者模式Visitor Pattern可以用于将操作逻辑从对象结构中分离出来假设我们有一个书城系统其中包含不同类型的书籍如小说教材等我们想要对不同类型的书籍执行不同的操作如打印价格增加库存等访问者模式允许我们定义一个新的操作而无需改变书籍的类结构
'''
# 书籍类作为元素Element角色
class Book:
def __init__(self, title, price, category):
self.title = title
self.price = price
self.category = category
def accept(self, visitor):
visitor.visit(self)
# 小说类,继承自书籍类
class NovelBook(Book):
def __init__(self, title, price):
super().__init__(title, price, "Novel")
# 教材类,继承自书籍类
class Textbook(Book):
def __init__(self, title, price):
super().__init__(title, price, "Textbook")
# 访问者接口
class IVisitor:
def visit(self, book):
pass
# 具体的访问者类,实现访问者接口
class PricePrinter(IVisitor):
def visit(self, book):
print(f"Price of '{book.title}': ${book.price}")
class StockAdder(IVisitor):
def __init__(self, additional_stock):
self.additional_stock = additional_stock
def visit(self, book):
# 假设这里有一个增加库存的方法,实际上应该通过数据库或其他方式更新
print(f"Adding {self.additional_stock} copies of '{book.title}' to stock")
# 客户端代码
if __name__ == "__main__":
# 创建书籍对象
novel = NovelBook("The Great Gatsby", 19.99)
textbook = Textbook("Introduction to Algorithms", 49.99)
# 创建访问者对象
price_printer = PricePrinter()
stock_adder = StockAdder(5)
# 书籍接受访问者
novel.accept(price_printer)
textbook.accept(price_printer)
# 增加库存操作
novel.accept(stock_adder)
textbook.accept(stock_adder)
'''
在这个示例中Book 类是元素角色它包含一个 accept 方法该方法接受一个访问者对象NovelBook Textbook 是具体元素它们继承自 Book IVisitor 是访问者接口声明了一个 visit 方法PricePrinter StockAdder 是实现了 IVisitor 接口的具体访问者类它们分别用于打印书籍价格和增加书籍库存
客户端代码创建了书籍对象和访问者对象并通过调用书籍的 accept 方法来接受访问者的访问这样访问者就可以对书籍执行相应的操作
需要注意的是访问者模式的一个主要缺点是增加新的操作意味着增加新的访问者类这可能会导致类的数量增加此外如果需要在访问者中访问对象结构的复杂关系代码可能会变得难以理解和维护因此在使用访问者模式时需要权衡其优缺点
'''

@ -0,0 +1,72 @@
import site
import os,re
import string,operator
################################################################################
# 变量
################################################################################
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy','data','stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy','data',testfilename )
################################################################################
# 函数
################################################################################
def read_file(path_to_file):
with open(path_to_file,encoding='utf-8') as f:
data = f.read()
return data
def re_split( data ):
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords( path_to_file = stopwordfilepath ):
with open(path_to_file,encoding='utf-8') as f:
data = f.read().split(',')
data.extend(list(string.ascii_lowercase))
return data
def extract_file_words(path_to_file):
word_list = re_split( read_file(path_to_file) )
stop_words = get_stopwords()
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
def extract_str_words(data_str):
word_list = re_split( data_str )
stop_words = get_stopwords()
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
def count_word(word, word_freqs, stopwords):
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict (word_freq):
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
# return sorted( word_freq, key=lambda x: x[1], reverse=True )
def print_word_freqs( word_freqs, n = 10):
for (w, c) in word_freqs[ :n ]:
print( w, '-', c )
def test():
print( 'cppy welcome' )

@ -1,4 +0,0 @@
## 代码为啥要这样写,我要这样写代码

@ -1,19 +1,17 @@
# 引入停用词表和测试文件的路径 import string
from cppy.cp_util import stopwordfilepath, testfilepath from cppy.cp_util import *
# 准备停用词表 # 准备词和停用词表
word_freqs = []
with open( stopwordfilepath,encoding='utf-8' ) as f: with open( stopwordfilepath,encoding='utf-8' ) as f:
stop_words = f.read().split(',') stop_words = f.read().split(',')
for letter in 'abcdefghijklmnopqrstuvwxyz': stop_words.extend(list(string.ascii_lowercase))
stop_words.append(letter)
# 读文件,逐行扫描文本,发现词,确定不是停用词,计数
word_freqs = []
for line in open( testfilepath ,encoding='utf-8' ): for line in open( testfilepath ,encoding='utf-8' ):
start_char = None start_char = None
i = 0 i = 0
for c in line: for c in line:
if start_char is None: if start_char == None:
if c.isalnum(): if c.isalnum():
# 一个单词开始 # 一个单词开始
start_char = i start_char = i
@ -34,17 +32,15 @@ for line in open(testfilepath, encoding='utf-8'):
pair_index += 1 pair_index += 1
if not found: if not found:
word_freqs.append([word, 1]) word_freqs.append([word, 1])
elif len(word_freqs) > 1:
for n in reversed(range(pair_index)):
if word_freqs[pair_index][1] > word_freqs[n][1]:
# 交换
word_freqs[n], word_freqs[pair_index] = word_freqs[pair_index], word_freqs[n]
pair_index = n
# 重置开始标记 # 重置开始标记
start_char = None start_char = None
i += 1 i += 1
# 使用冒泡排序对词频进行排序 for tf in word_freqs[0:10]:
n = len(word_freqs)
for i in range(n):
for j in range(0, n - i - 1):
if word_freqs[j][1] < word_freqs[j + 1][1]:
word_freqs[j], word_freqs[j + 1] = word_freqs[j + 1], word_freqs[j]
# 打印频率最高的前10个词
for tf in word_freqs[:10]:
print(tf[0], '-', tf[1]) print(tf[0], '-', tf[1])

@ -0,0 +1,33 @@
import re
from collections import Counter
import string
from cppy.cp_util import stopwordfilepath,testfilepath
# 读取停用词并创建一个集合以便快速查找
stop_words = set()
with open(stopwordfilepath, encoding='utf-8') as f:
for line in f:
stop_words.update(word.strip() for word in line.split(','))
# 停用词集合中添加所有小写英文字母
# 注意:这里我们不直接添加所有字母,而是在过滤时检查单词长度
# 如果单词只包含一个字符,则视为字母,排除在外
stop_words.update(set(string.ascii_lowercase))
# 读取测试文件并计算单词频率
with open(testfilepath, encoding='utf-8') as f:
# 使用正则表达式移除标点并分割单词,排除单个字符
words = re.findall(r'\b\w{2,}\b', f.read().lower()) # 只匹配至少两个字符的单词
# 过滤停用词并计数
word_freqs = Counter(word for word in words if word not in stop_words and len(word) > 1)
# 获取出现频率最高的前10个单词
most_common_words = word_freqs.most_common(10)
# 打印结果
for word, freq in most_common_words:
print(f'{word} - {freq}')
# 修改逻辑A01没有排除逗号的影响同时一遍提取一边排序资源占用大
# 解决方案引入re将逗号去除。并且引入counter进行计数

@ -1,5 +1,4 @@
from cppy.cp_util import stopwordfilepath, testfilepath from cppy.cp_util import *
import string
from collections import Counter from collections import Counter
# 准备词和停用词表 # 准备词和停用词表
@ -23,6 +22,7 @@ with open(testfilepath, encoding='utf8') as f:
# 打印前10个最常见的单词 # 打印前10个最常见的单词
for word, freq in word_freqs.most_common(10): for word, freq in word_freqs.most_common(10):
print(f"{word}-{freq}") print(f"{word}-{freq}")
''' '''
相比 A01 相比 A01
使用collections.Counter来计数单词频率从而简化了代码并提高了效率 使用collections.Counter来计数单词频率从而简化了代码并提高了效率

@ -1,13 +1,12 @@
import re import re, sys, collections
import collections from cppy.cp_util import *
from cppy.cp_util import stopwordfilepath, testfilepath
stopwords = set(open( stopwordfilepath,encoding = 'utf8' ).read().split(',')) stopwords = set(open( stopwordfilepath,encoding = 'utf8' ).read().split(','))
words = re.findall('[a-z]{2,}', words = re.findall('[a-z]{2,}', open( testfilepath,encoding = 'utf8').read().lower())
open(testfilepath, encoding='utf8').read().lower())
counts = collections.Counter(w for w in words if w not in stopwords) counts = collections.Counter(w for w in words if w not in stopwords)
for (w, c) in counts.most_common(10): for (w, c) in counts.most_common(10):
print(w, '-', c) print(w, '-', c)
''' '''
熟练的软件工程师会如此简单完成任务 熟练的软件工程师会如此简单完成任务
后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法 后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法

@ -2,28 +2,24 @@ import concurrent.futures
from collections import Counter from collections import Counter
import cppy.cp_util as util import cppy.cp_util as util
'''
concurrent.futures模块为Python中的并发编程提供了一个统一接口,
这个模块隐藏了低层次的线程和进程创建同步和清理的细节,提供了一个更高层次的API来处理并发任务
当前版本推荐它与asyncio模块结合使用完成Python中的各种异步编程任务
'''
stop_words = util.get_stopwords()
class WordFrequencyAgent: class WordFrequencyAgent:
def __init__(self, words): def __init__(self, words):
self.words = words self.words = words
def compute_word_frequency(self): def compute_word_frequency(self):
words = [ w for w in self.words if ( not w in stop_words ) and len(w) >= 3 ] self.word_freq = Counter(self.words)
self.word_freq = Counter( words)
def get_word_frequency(self): def get_word_frequency(self):
return self.word_freq return self.word_freq
# 将文本分割成多个部分并为每个部分创建一个Agent # 将文本分割成多个部分并为每个部分创建一个Agent
def create_agents( words ): def create_agents(words, num_agents = 4 ):
return [ WordFrequencyAgent(chunk) for chunk in words ] text_chunks = [ words[i::num_agents] for i in range(num_agents) ]
agents = [ WordFrequencyAgent(chunk) for chunk in text_chunks ]
return agents
def compute_all_word_frequencies(agents): def compute_all_word_frequencies(agents):
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
@ -31,7 +27,13 @@ def compute_all_word_frequencies(agents):
future_to_agent = {executor.submit(agent.compute_word_frequency): agent for agent in agents} future_to_agent = {executor.submit(agent.compute_word_frequency): agent for agent in agents}
for future in concurrent.futures.as_completed(future_to_agent): for future in concurrent.futures.as_completed(future_to_agent):
agent = future_to_agent[future] agent = future_to_agent[future]
data = future.result() # 词频被保存在agent中 try:
# 获取计算结果,但不处理异常
data = future.result()
except Exception as exc:
print(f'生成 {agent.text_chunk[:10]}... 的词频时出错: {exc}')
# 词频已经被保存在agent中
# 所有Agent计算完成后合并它们的词频结果 # 所有Agent计算完成后合并它们的词频结果
def merge_word_frequencies(agents): def merge_word_frequencies(agents):
@ -40,13 +42,11 @@ def merge_word_frequencies(agents):
merged_freq.update(agent.get_word_frequency()) merged_freq.update(agent.get_word_frequency())
return merged_freq return merged_freq
@util.timing_decorator
def main(): if __name__ == '__main__':
words = util.get_chunks(util.testfilepath) words = util.extract_file_words(util.testfilepath) # 从文本抽词
agents = create_agents(words) # 创建代理 agents = create_agents(words) # 创建代理
compute_all_word_frequencies(agents) # 计算 compute_all_word_frequencies(agents) # 计算
merged_word_freq = merge_word_frequencies(agents) # 合并结果 merged_word_freq = merge_word_frequencies(agents) # 合并结果
util.print_word_freqs(merged_word_freq.most_common(10)) # 排序输出 for (w, c) in merged_word_freq.most_common(10): # 排序输出
print(w, '-', c)
if __name__ == '__main__':
main()

@ -0,0 +1,45 @@
import sys, collections
from cppy.cp_util import *
class WordFrequenciesModel:
""" 模型:数据 """
def __init__(self, path_to_file):
self.update(path_to_file)
def update(self, path_to_file):
try:
self.freqs = collections.Counter( extract_file_words(path_to_file) )
except IOError:
print("File not found")
self.freqs = {}
class WordFrequenciesView:
""" 视图:数据展现 """
def __init__(self, model):
self._model = model
def render(self):
sorted_freqs = sort_dict(self._model.freqs)
print_word_freqs(sorted_freqs)
class WordFrequencyController:
""" 控制:操作逻辑 """
def __init__(self, model, view):
self._model, self._view = model, view
view.render()
def run(self):
while True:
print("Enter the file path (or 'q' to quit): ", file=sys.stderr, flush=True)
filename = sys.stdin.readline().strip()
if filename.lower() == 'q': break
self._model.update(filename)
self._view.render()
m = WordFrequenciesModel( testfilepath )
v = WordFrequenciesView(m)
c = WordFrequencyController(m, v)
c.run()

@ -2,10 +2,8 @@ import string
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
################################
# data # data
################################ data = []
data = ''
words = [] words = []
word_freqs = [] word_freqs = []
@ -15,12 +13,17 @@ word_freqs = []
def read_file(path_to_file): def read_file(path_to_file):
global data global data
with open(path_to_file,encoding='utf-8') as f: with open(path_to_file,encoding='utf-8') as f:
data = f.read() data = data + list(f.read())
def extractwords(): def filter_chars_and_normalize():
global data global data
global words global words
words = data.lower().split() for i in range(len(data)):
data[i] = ' ' if not data[i].isalnum() else data[i].lower()
data_str = ''.join(data)
words = words + data_str.split()
with open(stopwordfilepath) as f: with open(stopwordfilepath) as f:
stop_words = set(f.read().split(',')) stop_words = set(f.read().split(','))
stop_words.update(string.ascii_lowercase) stop_words.update(string.ascii_lowercase)
@ -38,7 +41,7 @@ def sort():
if __name__ == "__main__": if __name__ == "__main__":
read_file( testfilepath ) read_file( testfilepath )
extractwords() filter_chars_and_normalize()
frequencies() frequencies()
sort() sort()

@ -2,26 +2,31 @@ import re
from cppy.cp_util import * from cppy.cp_util import *
def extractwords(str_data): def filter_chars_and_normalize(str_data):
pattern = re.compile('[\W_]+') pattern = re.compile('[\W_]+')
word_list = pattern.sub(' ', str_data).lower().split() word_list = pattern.sub(' ', str_data).lower().split()
stop_words = get_stopwords() stop_words = get_stopwords()
return [w for w in word_list if not w in stop_words] return [w for w in word_list if not w in stop_words]
def frequencies(word_list): def frequencies(word_list):
word_freqs = {} word_freqs = {}
for word in word_list: for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1 word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs return word_freqs
def sort(word_freq): def sort(word_freq):
return sorted( word_freq.items(), key=lambda x: x[1], reverse=True ) return sorted( word_freq.items(), key=lambda x: x[1], reverse=True )
def print_all(word_freqs, n = 10 ):
for word, freq in word_freqs[ :n ]:
print(word, '-', freq)
if __name__ == "__main__": if __name__ == "__main__":
txtcontent = read_file( testfilepath ) print_all(sort(frequencies(
word_list = extractwords( txtcontent ) filter_chars_and_normalize(
word_freqs = frequencies( word_list ) read_file( testfilepath ))))
word_sorts = sort ( word_freqs ) )
for tf in word_sorts[:10]:
print(tf[0], '-', tf[1])

@ -0,0 +1,39 @@
import re, operator
from cppy.cp_util import *
def print_text(word_freqs, func):
print_word_freqs(word_freqs)
func(None)
def frequencies(word_list, func):
wf = get_frequencies(word_list)
func(wf, print_text)
def scan(str_data, func):
func(str_data.split(), frequencies)
def filter_chars(str_data, func):
pattern = re.compile('[\W_]+')
func(pattern.sub(' ', str_data), scan)
def remove_stop_words(word_list, func):
stop_words = get_stopwords()
func([w for w in word_list if not w in stop_words], sort)
def sort(wf, func):
func(sorted(wf.items(), key=operator.itemgetter(1), reverse=True), no_op)
def no_op(func):
return
def normalize(str_data, func):
func(str_data.lower(), remove_stop_words)
def read_file(path_to_file, func):
with open(path_to_file,encoding='utf-8') as f:
data = f.read()
func(data, normalize)
if __name__ == "__main__":
read_file(testfilepath, filter_chars)

@ -0,0 +1,53 @@
from collections import Counter
from cppy.cp_util import *
class DataStorageManager:
""" 数据模型 """
def __init__(self, path_to_file):
data = read_file(path_to_file)
self._data = re_split( data )
def words(self):
return self._data
class StopWordManager:
""" 停用词模型 """
def __init__(self):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
return word in self._stop_words
class WordFrequencyManager:
""" 词频模型 """
def __init__(self):
self._word_freqs = Counter()
def increment_count(self, word):
self._word_freqs[word] += 1
def sorted(self):
return self._word_freqs.most_common()
class WordFrequencyController:
def __init__(self, path_to_file):
self._storage_manager = DataStorageManager(path_to_file)
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
def run(self):
for w in self._storage_manager.words():
if not self._stop_word_manager.is_stop_word(w):
self._word_freq_manager.increment_count(w)
word_freqs = self._word_freq_manager.sorted()
print_word_freqs(word_freqs)
if __name__ == '__main__':
WordFrequencyController(testfilepath).run()

@ -0,0 +1,41 @@
from cppy.cp_util import *
def extract_words(obj, path_to_file):
obj['data'] = re_split( read_file(path_to_file) )
def load_stop_words(obj):
obj['stop_words'] = get_stopwords()
def increment_count(obj, w):
obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w]+1
data_storage_obj = {
'data' : [],
'init' : lambda path_to_file : extract_words(data_storage_obj, path_to_file),
'words' : lambda : data_storage_obj['data']
}
stop_words_obj = {
'stop_words' : [],
'init' : lambda : load_stop_words(stop_words_obj),
'is_stop_word' : lambda word : word in stop_words_obj['stop_words']
}
word_freqs_obj = {
'freqs' : {},
'increment_count' : lambda w : increment_count(word_freqs_obj, w),
'sorted' : lambda : sort_dict(word_freqs_obj['freqs'])
}
if __name__ == '__main__':
data_storage_obj['init']( testfilepath )
stop_words_obj['init']()
for w in data_storage_obj['words']():
if not stop_words_obj['is_stop_word'](w):
word_freqs_obj['increment_count'](w)
word_freqs = word_freqs_obj['sorted']()
for (w, c) in word_freqs[0:10]:
print(w, '-', c)

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save