Compare commits

...

78 Commits
master ... dev

Author SHA1 Message Date
p46318075 3d0220d49b Merge pull request 'dev' (#17) from pcz4qfnkl/CodePattern:dev into dev
3 months ago
Yao 36afa1d669 refactor: 优化代码,提高可读性和效率
4 months ago
Yao 15736d7393 refactor(code): 优化代码,提高可读性和效率
4 months ago
Yao f170c936d8 feat: 添加了根据关键词爬取天水市人民政府网站上指定日期内新闻标题的功能,并提供了多线程、多进程、协程和异步四种实现方式。
4 months ago
zj3D ceb9955051 ABC
4 months ago
zj3D 4606a87618 0803
4 months ago
zj3D 2a27a2c748 拉萨饭店
7 months ago
zj3D 26b6f4c88b 结构调整25
8 months ago
zj3D fa3e01dedc 大修 12
8 months ago
zj3D 850a3eb772 注册消息
8 months ago
zj3D 8d5c578da8 插件简化
8 months ago
zj3D 88606f2bce 修订 11
8 months ago
zj3D ebe28f7670 修 12
8 months ago
zj3D c8946209bf 大修 10
8 months ago
zj3D 2d46194636 大修 11
8 months ago
zj3D 5099345721 大修10
8 months ago
zj3D cd8186dd68 大修 10
8 months ago
zj3D 50952795a8 大修 9
8 months ago
zj3D 2cac3f2788 over
8 months ago
zj3D 44b0c00567 大修 9
8 months ago
zj3D 83c156a3d5 大修 8
8 months ago
zj3D 31a4dfc8e5 非中心化
8 months ago
zj3D e27ecadb25 流式调用
8 months ago
zj3D 9d74a5c184 大修 8
8 months ago
zj3D a3bc46dae3 大修6
8 months ago
zj3D f2ff5c8d4e 清理
8 months ago
zj3D 18f3901592 大修改6
8 months ago
zj3D 44c1f9eb1e 大修改4
8 months ago
zj3D b86f626e94 大调整2
8 months ago
zj3D fe94d8ed1b 享元修订
8 months ago
zj3D b15c7505f6 调整
8 months ago
zj3D cab45b3281 词频对象设计模式修订
8 months ago
zj3D 4aa6f8469d 调整
8 months ago
p46318075 f8f3f10d2e Add readme.MD
8 months ago
zj3D 7db531d2fc 大调整
8 months ago
zj3D 41a14b6705 设计模式
8 months ago
zj3D ac7fb13827 设计模式
8 months ago
zj3D 1920e47a1c 设计模型
8 months ago
p46318075 c5932334fa Merge pull request '观察者模式' (#15) from p26zockiw/CodePattern:master into dev
8 months ago
zj3D 239c0188d0 调整
8 months ago
zj3D bfcaab3439 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
8 months ago
zj3D f52645e7b2 test
8 months ago
p46318075 b6fc9ef4c3 Merge pull request '修改restful模式' (#14) from pbr4nzfkh/CodePattern:dev into dev
8 months ago
pbr4nzfkh 28f60e8216 restful 服务端
8 months ago
pbr4nzfkh fdf6166100 Delete '基本结构/042 restful/tf-35-app.py'
8 months ago
pbr4nzfkh ffdae7d329 restful 客户端
8 months ago
pbr4nzfkh 0b9d4a63d6 Delete '基本结构/042 restful/tf-35-request.py'
8 months ago
pbr4nzfkh ada14b9a7b restful 服务端
8 months ago
pbr4nzfkh c8cd7bbc0c Delete '基本结构/042 restful/tf-35-app.py'
8 months ago
zj3D c99a655997 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
8 months ago
zj3D 950cb41e08 debug
8 months ago
p46318075 f131c63ff4 Merge pull request '修改map-reduce模式' (#13) from pbr4nzfkh/CodePattern:dev into dev
8 months ago
zj3D 2518a5cd85 结构调整
9 months ago
pbr4nzfkh e993c23ed1 Delete '计算设备/map-reduce/tf-32.py'
9 months ago
pbr4nzfkh fb95636bb1 tf-31 map-reduce模式
9 months ago
pbr4nzfkh 285b016a30 tf-92多进程模式
9 months ago
pbr4nzfkh 1ebf2a45fe tf-91多线程模式
9 months ago
pbr4nzfkh 740f5aabff Delete '计算设备/map-reduce/tf_92.py'
9 months ago
pbr4nzfkh 2288c18e8a Delete '计算设备/map-reduce/tf_91.py'
9 months ago
pbr4nzfkh 028c7ddb07 Delete '计算设备/map-reduce/tf-31.py'
9 months ago
zj3D 9bf690d62c 风格统一
9 months ago
zj3D 041fced368 patch
9 months ago
p46318075 254c11c3c9 Merge pull request '修改restful模式' (#12) from pbr4nzfkh/CodePattern:dev into dev
9 months ago
pbr4nzfkh 29dbff26cc restful_app
9 months ago
pbr4nzfkh 4134d794ab Delete '基本结构/042 restful/tf-35-app.py'
9 months ago
pbr4nzfkh d54c43b459 restful-app
9 months ago
pbr4nzfkh 365c8bb76a Delete '基本结构/042 restful/test.txt'
9 months ago
pbr4nzfkh f8055c0044 restful-request
9 months ago
pbr4nzfkh 726a8795c7 Delete '基本结构/042 restful/tf-34.py'
9 months ago
zj3D b4a280c55c 类型申明
9 months ago
zj3D bfbc1120ec 运行时间装饰器
9 months ago
zj3D 3c439ef8d7 update
9 months ago
zj3D 445088fde8 增加一种终端模式
9 months ago
zj3D 0e55cabe5c print
9 months ago
zj3D 856fdcc1e1 1
9 months ago
zj3D 8545ada6c2 111
9 months ago
p46318075 a59ae791b3 003
9 months ago
p46318075 905b75036b Merge pull request '002' (#9) from master into dev
9 months ago

4
.gitignore vendored

@ -0,0 +1,4 @@
log.txt
/test
/.venv
__pycache__

@ -1,17 +1,19 @@
import string
from cppy.cp_util import *
# 引入停用词表和测试文件的路径
from cppy.cp_util import stopwordfilepath, testfilepath
# 准备词和停用词表
word_freqs = []
with open( stopwordfilepath,encoding='utf-8' ) as f:
# 准备停用词表
with open(stopwordfilepath, encoding='utf-8') as f:
stop_words = f.read().split(',')
stop_words.extend(list(string.ascii_lowercase))
for letter in 'abcdefghijklmnopqrstuvwxyz':
stop_words.append(letter)
for line in open( testfilepath ,encoding='utf-8' ):
# 读文件,逐行扫描文本,发现词,确定不是停用词,计数
word_freqs = []
for line in open(testfilepath, encoding='utf-8'):
start_char = None
i = 0
for c in line:
if start_char == None:
if start_char is None:
if c.isalnum():
# 一个单词开始
start_char = i
@ -32,15 +34,17 @@ for line in open( testfilepath ,encoding='utf-8' ):
pair_index += 1
if not found:
word_freqs.append([word, 1])
elif len(word_freqs) > 1:
for n in reversed(range(pair_index)):
if word_freqs[pair_index][1] > word_freqs[n][1]:
# 交换
word_freqs[n], word_freqs[pair_index] = word_freqs[pair_index], word_freqs[n]
pair_index = n
# 重置开始标记
start_char = None
i += 1
for tf in word_freqs[0:10]:
# 使用冒泡排序对词频进行排序
n = len(word_freqs)
for i in range(n):
for j in range(0, n - i - 1):
if word_freqs[j][1] < word_freqs[j + 1][1]:
word_freqs[j], word_freqs[j + 1] = word_freqs[j + 1], word_freqs[j]
# 打印频率最高的前10个词
for tf in word_freqs[:10]:
print(tf[0], '-', tf[1])

@ -1,4 +1,5 @@
from cppy.cp_util import *
from cppy.cp_util import stopwordfilepath, testfilepath
import string
from collections import Counter
# 准备词和停用词表
@ -7,7 +8,7 @@ stop_words.update(list(string.ascii_lowercase))
# 读取文件并计算单词频率
word_freqs = Counter()
with open(testfilepath,encoding = 'utf8') as f:
with open(testfilepath, encoding='utf8') as f:
for line_num, line in enumerate(f, 1):
start_char = None
for i, c in enumerate(line):
@ -22,7 +23,6 @@ with open(testfilepath,encoding = 'utf8') as f:
# 打印前10个最常见的单词
for word, freq in word_freqs.most_common(10):
print(f"{word}-{freq}")
'''
相比 A01
使用collections.Counter来计数单词频率从而简化了代码并提高了效率

@ -1,12 +1,13 @@
import re, sys, collections
from cppy.cp_util import *
import re
import collections
from cppy.cp_util import stopwordfilepath, testfilepath
stopwords = set(open( stopwordfilepath,encoding = 'utf8' ).read().split(','))
words = re.findall('[a-z]{2,}', open( testfilepath,encoding = 'utf8').read().lower())
stopwords = set(open(stopwordfilepath, encoding='utf8').read().split(','))
words = re.findall('[a-z]{2,}',
open(testfilepath, encoding='utf8').read().lower())
counts = collections.Counter(w for w in words if w not in stopwords)
for (w, c) in counts.most_common(10):
print(w, '-', c)
'''
熟练的软件工程师会如此简单完成任务
后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法

@ -2,8 +2,10 @@ import string
from collections import Counter
from cppy.cp_util import *
################################
# data
data = []
################################
data = ''
words = []
word_freqs = []
@ -13,17 +15,12 @@ word_freqs = []
def read_file(path_to_file):
global data
with open(path_to_file,encoding='utf-8') as f:
data = data + list(f.read())
data = f.read()
def filter_chars_and_normalize():
def extractwords():
global data
global words
for i in range(len(data)):
data[i] = ' ' if not data[i].isalnum() else data[i].lower()
data_str = ''.join(data)
words = words + data_str.split()
words = data.lower().split()
with open(stopwordfilepath) as f:
stop_words = set(f.read().split(','))
stop_words.update(string.ascii_lowercase)
@ -41,7 +38,7 @@ def sort():
if __name__ == "__main__":
read_file( testfilepath )
filter_chars_and_normalize()
extractwords()
frequencies()
sort()

@ -2,31 +2,26 @@ import re
from cppy.cp_util import *
def filter_chars_and_normalize(str_data):
def extractwords(str_data):
pattern = re.compile('[\W_]+')
word_list = pattern.sub(' ', str_data).lower().split()
stop_words = get_stopwords()
return [w for w in word_list if not w in stop_words]
def frequencies(word_list):
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort(word_freq):
return sorted( word_freq.items(), key=lambda x: x[1], reverse=True )
def print_all(word_freqs, n = 10 ):
for word, freq in word_freqs[ :n ]:
print(word, '-', freq)
if __name__ == "__main__":
print_all(sort(frequencies(
filter_chars_and_normalize(
read_file( testfilepath ))))
)
txtcontent = read_file( testfilepath )
word_list = extractwords( txtcontent )
word_freqs = frequencies( word_list )
word_sorts = sort ( word_freqs )
for tf in word_sorts[:10]:
print(tf[0], '-', tf[1])

@ -0,0 +1,30 @@
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def process_chunks( chunks,word_freqs,x,max ):
next = x + 1
if next < max:
process_chunks(chunks,word_freqs,next,max)
word_list = process_chunk(chunks[x])
word_freqs += Counter(word_list)
# def process_chunks( chunks,word_freqs,x,max ):
# word_list = process_chunk(chunks[x])
# word_freqs += Counter(word_list)
# next = x + 1
# if next < max:
# process_chunks(chunks,word_freqs,next,max)
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,2000)
word_freqs = Counter()
process_chunks( chunks,word_freqs,0,len(chunks) )
print_word_freqs( word_freqs.most_common(10) )

@ -0,0 +1,101 @@
from collections import Counter
from cppy.cp_util import *
class DataStorageManager:
"""
数据模型读取文件内容并将内容分割成单词
Attributes:
_data: 单词列表
Methods:
_words (self): 返回分割后的单词列表
"""
def __init__(self, path_to_file):
self._data = re_split(read_file(path_to_file))
def words(self):
"""返回分割后的单词列表。"""
return self._data
class StopWordManager:
"""
停用词模型
Attributes:
_stop_words: 停用词列表
Methods:
is_stop_word (self, word): 判断给定单词是否为停用词
"""
def __init__(self):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
"""判断给定单词是否为停用词。"""
return word in self._stop_words
class WordFrequencyManager:
"""
词频模型计算并管理单词的频率
Attributes:
_word_freqs: 使用 Counter 存储单词及其出现次数
Methods:
increment_count (self, word): 计算词频
sorted(self): 返回按出现次数排序的单词列表
"""
def __init__(self):
self._word_freqs = Counter()
def increment_count(self, word):
"""计算词频。"""
self._word_freqs[word] += 1
def sorted(self):
"""返回按出现次数排序的单词列表。"""
return self._word_freqs.most_common()
class WordFrequencyController:
"""
控制器控制整个流程读取文件处理停用词计算词频并输出结果
Attributes:
_storage_manager: DataStorageManager 实例用于读取和处理文件内容
_stop_word_manager: StopWordManager 实例用于管理停用词
_word_freq_manager: WordFrequencyManager 实例用于计算和存储单词频率
Methods:
run(self): 运行方法遍历单词列表过滤掉停用词并计算每个单词的频率最后输出结果
"""
def __init__(self, path_to_file):
self._storage_manager = DataStorageManager(path_to_file)
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
def run(self):
"""运行方法,遍历单词列表,过滤掉停用词,并计算每个单词的频率,最后输出结果。"""
for w in self._storage_manager.words():
if not self._stop_word_manager.is_stop_word(w):
self._word_freq_manager.increment_count(w)
word_freqs = self._word_freq_manager.sorted()
print_word_freqs(word_freqs)
if __name__ == '__main__':
WordFrequencyController(testfilepath).run()
'''
函数输入参数调用后你的马上接住返回值
类输入参数后实例化后你可以需要的时候去访问你需要的数据实例属性
'''

@ -0,0 +1,52 @@
from cppy.cp_util import *
def extract_words(obj, path_to_file):
"""
从文件中提取单词并存储在对象的 'data' 字段中
Args:
obj (dict): 存储数据的字典对象
path_to_file (str): 文件路径
"""
obj['data'] = extract_file_words(path_to_file)
def increment_count(obj, w):
"""
增加单词的计数如果单词不存在则将其计数设置为1
参数:
obj (dict): 存储单词频率的字典对象
w (str): 单词
"""
obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w] + 1
# 数据存储对象,包含初始化和获取单词的方法
data_storage_obj = {
'data': [], # 存储单词列表
'init': lambda path_to_file: extract_words(data_storage_obj, path_to_file
), # 初始化方法,提取文件中的单词
'words': lambda: data_storage_obj['data'] # 获取单词列表的方法
}
# 单词频率对象,包含增加计数和排序的方法
word_freqs_obj = {
'freqs': {}, # 存储单词频率的字典
'increment_count':
lambda w: increment_count(word_freqs_obj, w), # 增加单词计数的方法
'sorted': lambda: sort_dict(word_freqs_obj['freqs']) # 获取排序后的单词频率的方法
}
if __name__ == '__main__':
# 初始化数据存储对象,提取文件中的单词
data_storage_obj['init'](testfilepath)
# 遍历单词列表,增加单词的计数
for word in data_storage_obj['words']():
word_freqs_obj['increment_count'](word)
# 获取排序后的单词频率并打印
word_freqs = word_freqs_obj['sorted']()
print_word_freqs(word_freqs)

@ -0,0 +1,3 @@
from cppy.cp_util import *
print_word_freqs( sort_dict ( get_frequencies ( extract_file_words(testfilepath) )))

@ -0,0 +1,28 @@
from cppy.cp_util import *
# 如果有连续的对数据加工操作,而且总是把共同加工数据对象当第一个参数,可以用本文件夹方法提升阅读体验
# 框架类
class FunBind:
def bind(self, func,*args, **kwargs):
try:
self.data = func(self.data,*args, **kwargs)
except:
self.data = func(*args, **kwargs)
return self
data = FunBind()\
.bind(extract_file_words,testfilepath)\
.bind(get_frequencies)\
.bind(sort_dict)\
.bind(print_word_freqs,10)\
.data
print(data)
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''

@ -0,0 +1,28 @@
from cppy.cp_util import *
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''
class FunPipe:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def __or__(self, other):
_data = self.func(*self.args, **self.kwargs)
return FunPipe( other.func,_data,*other.args,**other.kwargs)
@property
def data(self):
return self.func(*self.args, **self.kwargs)
# 模仿管道
pipe = FunPipe(extract_file_words,testfilepath) | FunPipe(get_frequencies) | FunPipe(sort_dict) | FunPipe(print_word_freqs, 10)
pipe.data

@ -0,0 +1,29 @@
from cppy.cp_util import *
class Flow:
def extract_file_words(self, filepath):
self.data = extract_file_words(filepath)
return self
def get_frequencies(self):
self.data = get_frequencies(self.data)
return self
def sort_dict(self):
self.data = sort_dict(self.data)
return self
def print_word_freqs(self, n):
print_word_freqs(self.data, n)
return self
# 顺序调用
Flow().extract_file_words(testfilepath).get_frequencies().sort_dict().print_word_freqs(10)
'''
连续方法调用看起来比较舒服
但是需要假设
- 每一个类方法返回 self 否则没法连续
- 类方法默认不写第一个参数数据都在 .data 里面
'''

@ -0,0 +1,50 @@
from cppy.cp_util import *
# 装饰器改写类
# - 找到以f_开头的方法
# - 将方法函数的返回值赋值给对象的data属性
# - 返回对象自身
def return_self_decorator(cls):
def return_self(func):
# 定义一个闭包函数,用于接收参数
def wrapper(self, *args, **kwargs):
self.data = func(self, *args, **kwargs)
return self # 返回类自身
return wrapper
for name, method in cls.__dict__.items():
# 判断属性是否可调用且属性名以f_开头
if callable(method) and name.startswith('f_'):
# 为类改写属性,将封装后的函数赋值
setattr(cls, name, return_self(method))
return cls
@return_self_decorator
class Flow():
def test(self):
return 'test'
def f_extract_file_words(self, filepath):
return extract_file_words(filepath)
def f_get_frequencies(self):
return get_frequencies(self.data)
def f_sort_dict(self):
return sort_dict(self.data)
def f_print_word_freqs(self, n):
print_word_freqs(self.data, n)
# 顺序调用
Flow().f_extract_file_words(testfilepath).f_get_frequencies().f_sort_dict().f_print_word_freqs(10)
'''
改写后参与 function flow 功能的方法
- 需要以 'f_' 开头
- 类方法默认不写第一个参数数据都在 .data 里面
仍旧需要特殊的方法写法
所以还是 12种方法比较自然
'''

@ -0,0 +1,26 @@
from cppy.cp_util import *
from collections import Counter
# 定义一个带计数器的元类
class CounterMetaclass(type):
def __new__(mcs, name, bases, attrs):
attrs['_counter'] = Counter()
return super().__new__(mcs, name, bases, attrs)
# 基于元类创建类
class Word( metaclass=CounterMetaclass ):
def __init__(self, word):
self.word = word
self._counter[self.word] += 1
@classmethod
def get_word_freqs(cls,n) -> Counter:
return cls._counter.most_common(n)
for word in extract_file_words ( testfilepath ) : Word(word)
print_word_freqs(Word.get_word_freqs(10))
'''
常用于将依赖项如服务或配置自动注入到类中
'''

@ -0,0 +1,20 @@
from cppy.cp_util import *
#
# 生成器
#
def non_stop_words(testfilepath):
stopwords = get_stopwords()
data_str = read_file(testfilepath)
wordlist = re_split( data_str )
for word in wordlist:
if word not in stopwords:
yield word # 弹出一个非停用词
freqs = {}
for word in non_stop_words(testfilepath):
freqs[word] = freqs.get(word, 0) + 1
data = sort_dict(freqs)
print_word_freqs(data)

@ -3,6 +3,10 @@ import aiofiles
from collections import Counter
from cppy.cp_util import *
#
# 协程: 有点复杂; 读文件的Io还是太快的爬虫
#
async def read_file(file_path):
async with aiofiles.open(file_path, 'r', encoding='utf-8') as file:
content = await file.read()
@ -21,7 +25,8 @@ async def main():
top_words = await count_words(text)
wordfreqs += top_words
for word, count in wordfreqs.most_common(10):
print(f"{word}: {count//10}")
print(f"{word}: {count//10}") # 突出 Io 的提升价值
# 运行异步主函数
asyncio.run(main())

@ -1,7 +1,7 @@
from collections import Counter
from cppy.cp_util import *
class AcceptTypes:
class TypesCheck:
def __init__(self, *args):
self._args = args
@ -9,22 +9,23 @@ class AcceptTypes:
def wrapped_f(*args, **kwargs):
for i, arg_type in enumerate(self._args):
if not isinstance(args[i], arg_type):
raise TypeError(f"Argument {i} expected {arg_type}, got {type(args[i])}")
raise TypeError(f" {i} expected {arg_type}, got {type(args[i])}")
return f(*args, **kwargs)
return wrapped_f
@AcceptTypes(str)
@TypesCheck(str)
def extract_words_(path_to_file):
return extract_file_words(path_to_file)
@AcceptTypes(list)
@TypesCheck(list)
def frequencies_(word_list):
return Counter(word_list)
@AcceptTypes(Counter)
@TypesCheck(Counter)
def sort_(word_freq):
return word_freq.most_common()
if __name__ == '__main__':
word_freqs = sort_(frequencies_(extract_words_( testfilepath )))
print_word_freqs(word_freqs)

@ -21,17 +21,15 @@ class sortTaskHandler:
def handle_task(task_type,*args):
handler_class_name = f"{task_type}TaskHandler" # 构建处理器类名
# 使用globals()获取当前全局符号表
handler_class = globals().get(handler_class_name)
if handler_class:
handler = handler_class() # 实例化处理器类
return handler.handle(*args) # 调用处理方法
else:
print(f"No handler found for task type: {task_type}")
print(f"No found for task type: {task_type}")
if __name__ == '__main__':
word_list = handle_task("words",util.testfilepath)
word_freq = handle_task("frequencies",word_list)
word_sort = handle_task("sort",word_freq)
util.print_word_freqs(word_sort)
word_list = handle_task("words",util.testfilepath)
word_freq = handle_task("frequencies",word_list)
word_sort = handle_task("sort",word_freq)
util.print_word_freqs(word_sort)

@ -0,0 +1,56 @@
import threading, queue
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数
class WordFrequencyCounter:
def __init__(self, input_file):
self.word_space = queue.Queue()
self.freq_space = queue.Queue()
for chunk in get_chunks(input_file,3000):
self.word_space.put(chunk)
def process_words(self):
while not self.word_space.empty():
try:
chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据
except queue.Empty:
break # 队列为空,退出循环
# print(f"Worker thread ID: {threading.get_ident()}",len(chunk))
words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ]
word_freqs = Counter(words)
self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典
def run(self):
workers = [ threading.Thread(target=self.process_words) for _ in range(5)]
for worker in workers: worker.start()
for worker in workers: worker.join()
word_freqs = Counter() # 初始化一个空的Counter对象
while not self.freq_space.empty():
freqs = self.freq_space.get()
if freqs: # 确保freqs非空
word_freqs.update(freqs)
print_word_freqs ( sort_dict (word_freqs) )
@timing_decorator
def main():
counter = WordFrequencyCounter( testfilepath )
counter.run()
if __name__ == '__main__':
main()
'''
在多线程之间传递数据建议使用线程安全的队列如queue.Queue或multiprocessing.Queue后者也适用于多进程环境
这些队列提供了线程安全的数据传输机制可以避免竞态条件和数据损坏
全局变量不可预测
multiprocessing.Queue 利用了操作系统提供的进程间通信IPC, Inter-Process Communication机制具体实现取决于不同操作系统的支持
在Unix/Linux系统中multiprocessing.Queue通常基于管道pipes共享内存和/或消息队列等机制实现
而在Windows系统上可能使用命名管道named pipes或者内存映射文件memory-mapped files以及某些版本的Windows特有的进程间同步对象如MutexesSemaphores和事件
'''

@ -0,0 +1,62 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
使用 multiprocessing.Manager 来完成统计词频
需要注意
- Manager() 必须用函数包起来,不能按脚本随便放外面否则会提示freeze_support
- 工作函数需要放到外面不能做内部函数否则会提示参数错误
- 无法在 Jupyter 类似环境运行
'''
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(shared_chunks,word_count):
while True:
try:
chunk = shared_chunks.pop(0) # 从共享列表中取出一个数据块
if chunk is None: break # 如果取出的是None表示所有数据块已处理完毕
words = extract_str_words(chunk)
for word in words:
if word not in stop_words:
word_count[word] = word_count.get(word, 0) + 1
except Exception as e:
print(e)
break
@timing_decorator
def main():
# 创建一个Manager实例
manager = Manager()
shared_chunks = manager.list()
word_count = manager.dict()
# 读取文件并按块大小分割,将块添加到共享列表中
chunk_size = 1024 * 10 # 假设每个块是10KB可以根据需要调整
with open(testfilepath, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk: break
shared_chunks.append(chunk)
shared_chunks.append(None)
print('-------------------',len(shared_chunks))
processes = [ Process( target=process_chunk,
args=(shared_chunks,word_count))
for _ in range( 4 ) ] # 假设启动4个工作进程
for p in processes: p.start()
for p in processes: p.join()
# 将Manager类型的字典转换为普通的字典以便使用Counter
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -0,0 +1,42 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
'''
# 使用 multiprocessing.Manager 来完成统计词频
# 怎么得到最快的一个结果是一个试错过程X程创建数目多少、分片的大小 ...
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(chunk,word_count):
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
for word in words: # 非常化时间
word_count[word] = word_count.get(word, 0) + 1
# word_count.update( Counter(words) ) # 类型不起作用
@timing_decorator
def main():
manager = Manager()
word_count = manager.dict()
chunks = get_chunks(testfilepath,2800)
print('-------------------',len(chunks))
processes = []
for chunk in chunks:
p = Process(target=process_chunk,
args=(chunk,word_count) )
processes.append(p)
p.start()
for p in processes: p.join()
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -4,20 +4,10 @@ from cppy.cp_util import testfilepath,db_filename,extract_file_words
# 数据库表结构
TABLES = {
'documents': '''CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL
)''',
'words': '''CREATE TABLE IF NOT EXISTS words (
doc_id INTEGER NOT NULL,
value TEXT NOT NULL,
FOREIGN KEY (doc_id) REFERENCES documents (id)
doc_name INTEGER NOT NULL,
value TEXT NOT NULL
)''',
'characters': '''CREATE TABLE IF NOT EXISTS characters (
word_id INTEGER NOT NULL,
value TEXT NOT NULL,
FOREIGN KEY (word_id) REFERENCES words (id)
)'''
}
@ -33,15 +23,10 @@ def create_db_schema(connection):
def load_file_into_database(path_to_file, connection):
words = extract_file_words( path_to_file )
doc_name = os.path.basename(testfilepath).split('.')[0]
c = connection.cursor()
c.execute("INSERT INTO documents (name) VALUES (?)", (path_to_file,))
doc_id = c.lastrowid
for w in words:
c.execute("INSERT INTO words (doc_id, value) VALUES (?, ?)", (doc_id, w))
word_id = c.lastrowid
for char in w:
c.execute("INSERT INTO characters (word_id, value) VALUES (?, ?)", (word_id, char))
c.execute("INSERT INTO words (doc_name, value) VALUES (?, ?)", (doc_name, w))
connection.commit()
c.close()
@ -49,11 +34,9 @@ def load_file_into_database(path_to_file, connection):
# 建数据库,处理数据入库
#######################################################
# 获取当前文件所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构造数据库文件的完整路径
current_dir = os.path.dirname(os.path.abspath(__file__))
db_file_path = os.path.join(current_dir, db_filename)
if os.path.exists(db_file_path):
os.remove(db_file_path)
@ -69,3 +52,10 @@ with sqlite3.connect(db_file_path) as connection:
c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10")
for row in c.fetchall():
print(row[0], '-', row[1])
'''
也可以把数据库看做解决共享数据的竞争死锁的办法
不过本例中的计算太快
用数据库共享数据成本太高
'''

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify, abort
from functools import lru_cache
from cppy.cp_util import *
from functools import cache
app = Flask(__name__)
# 模拟数据库
books_db = []
# 用于缓存用户数据库的装饰器
@lru_cache(maxsize=None)
def get_books_db():
return books_db
#查询所有资源
@app.route('/books', methods=['GET'])
def get_books():
return jsonify(get_books_db())
#查询某个资源
@app.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = next((book for book in get_books_db() if book['id'] == book_id), None)
if book is None:
abort(404)
return jsonify(book['content'])
# 创建或更新新资源
@app.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
global books_db
book_to_update = request.json
print(book_to_update)
books_db = get_books_db()
book = next((book for book in books_db if book['id'] == book_id), None)
if book is None:
# 如果资源不存在,创建新资源
books_db.append(book_to_update)
else:
# 如果资源存在,更新资源
book.update(book_to_update)
# 清除缓存的数据库
cache.delete(get_books_db)
return jsonify(books_db), 200
#操作一个资源
@app.route('/books/<int:book_id>/word_frequency', methods=['GET'])
def word_frequency(book_id):
global books_db
book = next((book for book in get_books_db() if book['id'] == book_id), None)
filepath = book['content']
word_list = extract_file_words(filepath)
word_frequency = get_frequencies(word_list)
word_frequency = sort_dict(word_frequency)
print_word_freqs(word_frequency)
return jsonify(word_frequency), 200
@app.route('/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
global books_db
books_db = [book for book in books_db if book['id'] != book_id]
if len(books_db) == len([l for l in books_db if l['id'] == book_id]):
abort(404) # 用户不存在
return jsonify({'message': f'book {book_id} deleted'}), 200
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
import requests
from cppy.cp_util import *
# 查询资源,得到空列表
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 创建一个1号资源
print('创建一个1号资源')
book_1 = {"id": 1, "title": "Python编程:从入门到实践", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/1'
response = requests.put(url,json=book_1)
time.sleep(2)
# - 创建一个2号资源修改testfilepaht变量
print('创建一个2号资源')
testfilepath = testfilepath.replace('Prey.txt','Pride-and-Prejudice.txt')
book_2 = {"id": 2, "title": "深入浅出计算机组成原理", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/2'
response = requests.put(url,json=book_2)
time.sleep(2)
# - 创建一个3号资源修改testfilepaht变量正好有3个文件
print('创建一个3号资源')
testfilepath = testfilepath.replace('Pride-and-Prejudice.txt','test.txt')
book_3 = {"id": 3, "title": "算法导论", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/3'
response = requests.put(url,json=book_3)
time.sleep(2)
# - 查询资源,看到结果
print('查询资源,看到结果')
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 操作1号资源得到词频
print('操作1号资源得到词频')
url = 'http://127.0.0.1:5000/books/1/word_frequency'
response = requests.get(url)
print_word_freqs(response.json())

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from functools import reduce
stop_words = get_stopwords()
# map - reduce
def process_chunk(chunk): # 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(count1,count2):
return count1 + count2
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 使用 map 方法和 process_chunk 函数处理每个分区
counts_list = list(map(process_chunk, chunks))
# 使用 reduce 和 merge_counts 函数统计所有分区的词频
total_counts = (reduce(merge_counts,counts_list))
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from multiprocessing.pool import ThreadPool
#
# 多线程
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
def thread_function(chunk, counts_list):
word_count = process_chunk(chunk)
counts_list.append(word_count)
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 线程池
pool = ThreadPool(len(chunks)) # 随意指定的线程数
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
#
# 多进程: 因为创建进程相比计算过程开销太大,结果最慢
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
@timing_decorator
def main():
# 读取文件内容,分割文件内容为多个块,每个块由一个进程处理
chunks = get_chunks(testfilepath,1000)
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -2,24 +2,28 @@ import concurrent.futures
from collections import Counter
import cppy.cp_util as util
'''
concurrent.futures模块为Python中的并发编程提供了一个统一接口,
这个模块隐藏了低层次的线程和进程创建同步和清理的细节,提供了一个更高层次的API来处理并发任务
当前版本推荐它与asyncio模块结合使用完成Python中的各种异步编程任务
'''
stop_words = util.get_stopwords()
class WordFrequencyAgent:
def __init__(self, words):
self.words = words
def compute_word_frequency(self):
self.word_freq = Counter(self.words)
words = [ w for w in self.words if ( not w in stop_words ) and len(w) >= 3 ]
self.word_freq = Counter( words)
def get_word_frequency(self):
return self.word_freq
# 将文本分割成多个部分并为每个部分创建一个Agent
def create_agents(words, num_agents = 4 ):
text_chunks = [ words[i::num_agents] for i in range(num_agents) ]
agents = [ WordFrequencyAgent(chunk) for chunk in text_chunks ]
return agents
def create_agents( words ):
return [ WordFrequencyAgent(chunk) for chunk in words ]
def compute_all_word_frequencies(agents):
with concurrent.futures.ThreadPoolExecutor() as executor:
@ -27,13 +31,7 @@ def compute_all_word_frequencies(agents):
future_to_agent = {executor.submit(agent.compute_word_frequency): agent for agent in agents}
for future in concurrent.futures.as_completed(future_to_agent):
agent = future_to_agent[future]
try:
# 获取计算结果,但不处理异常
data = future.result()
except Exception as exc:
print(f'生成 {agent.text_chunk[:10]}... 的词频时出错: {exc}')
# 词频已经被保存在agent中
data = future.result() # 词频被保存在agent中
# 所有Agent计算完成后合并它们的词频结果
def merge_word_frequencies(agents):
@ -42,11 +40,13 @@ def merge_word_frequencies(agents):
merged_freq.update(agent.get_word_frequency())
return merged_freq
if __name__ == '__main__':
words = util.extract_file_words(util.testfilepath) # 从文本抽词
@util.timing_decorator
def main():
words = util.get_chunks(util.testfilepath)
agents = create_agents(words) # 创建代理
compute_all_word_frequencies(agents) # 计算
merged_word_freq = merge_word_frequencies(agents) # 合并结果
for (w, c) in merged_word_freq.most_common(10): # 排序输出
print(w, '-', c)
util.print_word_freqs(merged_word_freq.most_common(10)) # 排序输出
if __name__ == '__main__':
main()

@ -0,0 +1,46 @@
import sys
import re
from collections import Counter
# 使用 python command_line_1.py testfilepath 10
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 主函数
def main():
# 检查命令行参数数量
if len(sys.argv) != 3:
print("Usage: python command_line_1.py <file_path> <n>")
sys.exit(1)
file_path = sys.argv[1]
n = int(sys.argv[2])
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"File not found: {file_path}")
except ValueError as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,48 @@
import re
from collections import Counter
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 交互式提示用户输入文件路径和前n个单词的数量
def interactive_mode():
file_path = input("请输入文件路径 >> ")
try:
n = int(input("请输入你想要输出的前n个最常见单词的数量 >> "))
if n <= 0:
raise ValueError("数量必须大于0。")
except ValueError as e:
print(f"输入错误:{e}")
return
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"文件未找到: {file_path}")
except Exception as e:
print(f"发生错误: {e}")
# 主函数
def main():
print("欢迎使用词频统计工具。")
interactive_mode()
if __name__ == "__main__":
main()

@ -0,0 +1,30 @@
from flask import Flask, render_template, request, redirect, url_for
from collections import Counter
from cppy.cp_util import *
import os
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 获取上传的文件
file = request.files['file']
# 保存临时文件并读取内容
filename = os.path.join('/temp', file.filename)
file.save(filename)
# 计算词频
words = extract_file_words(filename)
word_counts = Counter(words)
# 删除临时文件
os.remove(filename)
return render_template('result.html', word_counts=word_counts.most_common())
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,14 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Upload Text File</title>
</head>
<body>
<h1>Upload a Text File to Count Word Frequencies</h1>
<form action="/" method="post" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Submit">
</form>
</body>
</html>

@ -0,0 +1,16 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Word Frequencies</title>
</head>
<body>
<h1>Top Word Frequencies:</h1>
<ul>
{% for word, count in word_counts %}
<li>{{ word }}: {{ count }}</li>
{% endfor %}
</ul>
<a href="{{ url_for('index') }}">Back to Upload</a>
</body>
</html>

@ -23,7 +23,7 @@ top_10_words = calculate_word_frequency(testfilepath)
print_word_freqs(top_10_words)
'''
python 提供了一种缓存调用函数的机制
Python 提供了一个缓存调用函数的装饰器
import functools
# 使用 functools.lru_cache 缓存结果

@ -0,0 +1,34 @@
# 创建对象是消耗资源的,如果发现对象已经存在,可以返回引用,不创造新对象 。设计模式中这个做法叫享元
from cppy.cp_util import *
#享元类
class WordFrequencyController():
def __init__(self, controllertype,filepath ):
word_list = extract_file_words(filepath)
word_freq = get_frequencies(word_list)
self.word_freq = sort_dict(word_freq)
self.number = controllertype
def print_word_freqs( self ):
print_word_freqs( self.word_freq,self.number)
#享元工厂
class WordFrequencyControllerFactory():
def __init__(self):
self.types = {}
def get_WordFrequencyController(self, number,testfilepath):
if number not in self.types:
self.types[number] = WordFrequencyController(number,testfilepath) # 创建新的对象
print('new obj: ','*'*30,number)
else:
print('ref obj: ','*'*30,number)
return self.types[number] # 重复使用已存在的对象
if __name__ == "__main__":
factory = WordFrequencyControllerFactory()
for number in [ 1,3,5,3,5,7 ]:
WordFrequency = factory.get_WordFrequencyController(number,testfilepath)
# print(flush=True)
WordFrequency.print_word_freqs()

@ -0,0 +1,64 @@
'''
入门级示例是用来帮助理解其他例子
把观察者挂到自己的处理队列上
适当时机调用所有队列上的约定的观察者的 update 方法
如果观察者有多个职能参与不同的任务链不一定要统一命名update方法
这是一个示例性质的原型具体环境下需要调整
'''
import collections
from abc import ABC, abstractmethod
from cppy.cp_util import *
# 定义观察者接口 ,在 Pyhon中并不是必须
class Observer(ABC):
@abstractmethod
def update(self, word):
pass
# 定义具体观察者类,用于统计词频
class WordFrequencyObserver(Observer):
def __init__(self):
self.word_count = collections.Counter()
def update(self, word):
self.word_count[word] += 1
# 定义主题类
class WordSubject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def notify(self, word):
for observer in self.observers:
observer.update(word)
# 主函数
def main(testfilepath, top_n = 10 ):
stopwords = get_stopwords()
subject = WordSubject()
# 创建一个观察者并附加到主题
observer = WordFrequencyObserver()
subject.attach(observer)
# 处理文件
wordlist = re_split( read_file(testfilepath) )
for word in wordlist:
if word not in stopwords:
subject.notify(word) # 通知
# 打印最高的N个词频
top_words = observer.word_count.most_common(top_n)
print_word_freqs(top_words)
if __name__ == "__main__":
main( testfilepath )

@ -0,0 +1,69 @@
'''
本例的基本模式还是观察者
基类 Subject 提供注册和提醒注册上的对象提醒机制
因为函数和参数混杂在一起传递使得各个模块的处理结构其实是 case by case
'''
from collections import Counter
from typing import List
from cppy.cp_util import *
class Subject:
def register_handler(self, handler: callable, *args, **kwargs):
self.handler = handler
self.args = args
self.kwargs = kwargs
def notify(self, *args, **kwargs):
self.handler( self.data, *self.args, **self.kwargs)
# 组件一TextLoader - 负责读取文本并过滤停用词
class TextLoader(Subject):
def load_text(self, filename: str) -> List[str]:
return extract_file_words(filename)
def notify(self, *args, **kwargs):
filename = args[0]
self.data = self.load_text(filename)
super().notify(self.data, *args, **kwargs)
# 组件二WordCounter - 计算词频
class WordCounter(Subject):
def count_words(self, words: List[str]) -> dict:
return Counter(words)
def notify(self, *args, **kwargs ):
words = args[0]
self.data = self.count_words(words)
super().notify(self.data, *args, **kwargs)
# 组件三TopWordsPresenter - 排序并输出前10个词
class TopWordsPresenter(Subject):
def notify(self, words,*args, **kwargs):
n = args[0]
top_words = words.most_common(n)
print_word_freqs( top_words )
# 主程序逻辑
def main():
loader = TextLoader()
counter = WordCounter()
presenter = TopWordsPresenter()
# 注册事件处理器
loader.register_handler(counter.notify)
counter.register_handler( presenter.notify,10 )
# 触发加载文本并开始流程
loader.notify(testfilepath)
if __name__ == "__main__":
main()

@ -0,0 +1,86 @@
################ 待整理
'''
注册者 = 观察者
每个组件提供注册消息接口和注册消息动作
在其它单元上注册自己对于特定事件消息的响应函数
同时负责自己的注册队列的序贯调用
Python 中有一个Callable类型可以用来判断是否是可以回调类型
from typing import Callable
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
from cppy.cp_util import *
#
# event_manager
#
class EventManager:
def __init__(self):
self.load_handlers = [] # 用于加载文件的事件处理器
self.process_handlers = [] # 用于处理数据的事件处理器
self.end_handlers = [] # 用于结束流程的事件处理器
def register_load_event(self, handler):
self.load_handlers.append(handler)
def register_process_event(self, handler):
self.process_handlers.append(handler)
def register_end_event(self, handler):
self.end_handlers.append(handler)
# 运行框架,按顺序执行注册的事件处理器
def run(self, file_path):
for handler in self.load_handlers: handler(file_path)
for handler in self.process_handlers: handler()
for handler in self.end_handlers: handler()
#
# 功能组件
#
# 定义数据存储类,用于模拟文件内容的加载和处理
class TextData:
_word_event_handlers = []
def __init__( self, event_manager ):
self._stop_words = get_stopwords()
event_manager.register_load_event(self.__load)
event_manager.register_process_event(self.__process_words)
def __load(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def __process_words(self):
for word in self._data:
if word not in self._stop_words:
for handler in self._word_event_handlers:
handler(word)
def register_word_event(self, handler):
self._word_event_handlers.append(handler)
class WordFrequencyCounter:
def __init__(self, event_manager, data_storage):
self._word_freqs = defaultdict(int) # 存储单词频率
data_storage.register_word_event(self.__increment_count) # 注册单词事件
event_manager.register_end_event(self.__print_freqs) # 注册结束事件
def __increment_count(self, word):
self._word_freqs[word] += 1
def __print_freqs(self):
print_word_freqs ( sort_dict (self._word_freqs) )
if __name__ == '__main__':
em = EventManager()
data_storage = TextData(em)
word_freq_counter = WordFrequencyCounter(em, data_storage)
em.run(testfilepath)

@ -0,0 +1,107 @@
################ 待整理
from cppy.cp_util import *
'''
订阅者 = 注册者 = 观察者
注册回调的一个变体
要点是中心化统一化
为了简化消息订阅可能形成的复杂性
提供一个中心消息管理器统一负责消息的订阅和回调
各个功能组件只是完成自己的功能
在中心管理器上订阅消息挂到自己响应的处理函数上
总结相比较的改变
- 注册的时候通过提供一个类型字段标识不同消息
- 其它实体不做注册和做回调统一这两个功能到一个中心单元
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
#################################################
# Event Manager
#################################################
class EventManager:
def __init__(self):
self._subs = defaultdict(list)
def subscribe(self, event_type, handler):
self._subs[event_type].append(handler)
def publish(self, event):
event_type = event[0]
for handle in self._subs.get(event_type, []):
handle(event)
#################################################
# Application Entities
#################################################
class DataStorage:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self._load)
self._event_manager.subscribe('start', self.produce_words)
def _load(self, event):
self._data = extract_file_words( event[1] )
def produce_words(self, _):
for word in self._data:
self._event_manager.publish(('word', word ))
self._event_manager.publish(('eof', None))
class StopWordFilter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load_stop_words)
self._event_manager.subscribe('word', self.filter_word)
self._stop_words = set()
def load_stop_words(self, _ ):
self._stop_words = set( get_stopwords() )
def filter_word(self, event):
word = event[1]
if word not in self._stop_words:
self._event_manager.publish(('valid_word', word))
class WordFrequencyCounter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('valid_word', self.count_word)
self._event_manager.subscribe('print', self.print_freqs)
self._word_freqs = {}
def count_word(self, event):
word = event[1]
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def print_freqs(self, _ ):
print_word_freqs ( sort_dict (self._word_freqs) )
class WordFrequencyApp:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('run', self.start_application)
self._event_manager.subscribe('eof', self.stop_application)
def start_application(self, event):
path_to_file = event[1]
self._event_manager.publish(('load', path_to_file))
self._event_manager.publish(('start', ))
def stop_application(self, _ ):
self._event_manager.publish(('print', ))
def main():
event_manager = EventManager()
DataStorage( event_manager )
StopWordFilter( event_manager )
WordFrequencyCounter( event_manager )
WordFrequencyApp( event_manager )
event_manager.publish(('run', testfilepath ))
if __name__ == "__main__":
main()

@ -0,0 +1,9 @@
注册
- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。
- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。
- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。
- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。
- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。

@ -0,0 +1,98 @@
################ 待整理
'''
应用场景针对各个组件的 notify 方法发指令来驱动所有工作
这是一个示例性质的原型具体分布式环境下需要调整
notify 用了四种写法是和本主题无关的测试
'''
from cppy.cp_util import *
from collections import defaultdict
badmsg = lambda : exec (''' raise Exception("Message not understood " , action ) ''')
class fff:
def __init__(self, d):
self._data = defaultdict( badmsg )
self._data.update(d)
def __getitem__(self, key):
return self._data[key]
class DataStorageMod():
def __init__(self):
self._data = []
def notify(self, action, *args):
return {
'init': lambda : self._init,
'words': lambda : self._words
}.get( action , badmsg )()(*args)
def _init(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
class StopWordMod():
_stop_words = []
def notify(self, action, *args):
return { 'init': self._init,
'is_stop_word': self._is_stop_word
}[ action ](*args)
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, wordx):
return wordx in self._stop_words
class WordFrequencyMod():
_word_freqs = {}
def notify(self, action, *args):
return fff( {
'increment_count': lambda : self._increment_count,
'sorted': lambda : self._sorted
})[ action ]()(*args)
def _increment_count(self, word):
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sort_dict(self._word_freqs)
class ScenarioManager():
def notify(self, action, *args):
if action == 'init':
return self._init( *args)
elif action == 'run':
return self._run()
else:
raise Exception("Message not understood " + action )
def _init(self, path_to_file):
self._storage_manager = DataStorageMod()
self._stop_word_manager = StopWordMod()
self._word_freq_manager = WordFrequencyMod()
self._storage_manager.notify('init', path_to_file)
self._stop_word_manager.notify('init')
def _run(self):
for word in self._storage_manager.notify('words'):
if not self._stop_word_manager.notify('is_stop_word', word):
self._word_freq_manager.notify('increment_count', word )
word_freqs = self._word_freq_manager.notify('sorted')
print_word_freqs(word_freqs)
if __name__ == '__main__':
sm = ScenarioManager()
sm.notify('init', testfilepath)
sm.notify('run')

@ -0,0 +1,24 @@
from cppy.cp_util import *
# 这个例子没有实际意义,是用来帮助理解其他例子
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
def readfile(file_path, func):
data = read_file(file_path)
func(data, frequencies)
def extractwords(str_data,func):
func(extract_str_words(str_data), sort)
def frequencies(word_list, func):
wf = get_frequencies(word_list)
func(wf, printall)
def sort(wf, func):
func(sort_dict(wf), None)
def printall(word_freqs, _ ):
print_word_freqs(word_freqs)
if __name__ == "__main__":
readfile(testfilepath, extractwords)

@ -0,0 +1,102 @@
'''
后续组件挂载到前序组件后续链上
仅提供 self.next_observer 的抽象关系
后续组件接到指令和数据自己决定动作
理论上每个组件可以参与到多个生产队列
本例使用了类来封装消息相对于字符串理论上提供了更丰富的扩展可能
这是一个示例性质的原型具体环境下需要调整
'''
from collections import Counter
from typing import List, Dict
from cppy.cp_util import *
# 定义消息类型
class Message:
def __init__(self, data):
self.data = data
class TokenizedText(Message):
pass
class FilteredText(Message):
pass
class WordFrequency(Message):
pass
# 定义观察者接口
class Observer:
def notify(self, message: Message):
pass
# 切词订阅者
class TokenizerSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message.data, str):
return
tokenized_text = re_split(message.data)
self.next_observer.notify(TokenizedText(tokenized_text))
# 停用词订阅者
class StopWordsRemoverSubscriber(Observer):
def __init__(self, next_observer: Observer, stop_words: List[str]):
self.next_observer = next_observer
self.stop_words = set(stop_words)
def notify(self, message: Message):
if not isinstance(message, TokenizedText):
return
filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ]
self.next_observer.notify(FilteredText(filtered_text))
# 词频统计订阅者
class WordFrequencyCalculatorSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message, FilteredText):
return
word_freq = Counter(message.data)
self.next_observer.notify( WordFrequency(word_freq) )
# 输出前N个词订阅者
class TopNWordsDisplaySubscriber(Observer):
def __init__(self, n: int):
self.n = n
def notify(self, message: Message):
if not isinstance(message, WordFrequency):
return
print_word_freqs( message.data.most_common(self.n) )
# 模拟发布者
def publish_text(text: str, observers: List[Observer]):
for observer in observers:
observer.notify(Message(text))
# 主函数
def main():
text = read_file()
stop_words = get_stopwords()
# 创建订阅者链
display_subscriber = TopNWordsDisplaySubscriber( n=10 )
freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber)
stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words)
tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber)
# 发布文本
publish_text(text, [tokenizer_subscriber])
if __name__ == "__main__":
main()

@ -1,26 +1,34 @@
################ 待整理
'''
多线程各个模块比较乱的但是协作序贯的完成了数据处理
各个组件完全不能互操作仅依靠队列发消息进行协作
适合环节多数据可分块有IO-计算性能设计考量要求让各个模块自己适应调整
在某些情况下可以避免复杂的控制流设计使代码简洁
'''
from threading import Thread
from queue import Queue
from cppy.cp_util import *
class ActiveWFObject(Thread):
class ThreadObject(Thread):
def __init__(self):
super().__init__()
self.queue = Queue()
self._stopMe = False
self._over = False
self.start()
def run(self):
while not self._stopMe:
while not self._over:
message = self.queue.get()
self._dispatch(message)
if message[0] == 'die':
self._stopMe = True
if message[0] == 'over':
break
def send(receiver, message):
receiver.queue.put(message)
class DataStorageManager(ActiveWFObject):
""" Models the contents of the file """
class TxtManager(ThreadObject):
_data = ''
def _dispatch(self, message):
@ -29,22 +37,20 @@ class DataStorageManager(ActiveWFObject):
elif message[0] == 'send_word_freqs':
self._process_words(message[1:])
else:
# forward
send(self._stop_word_manager, message)
def _init(self, message):
path_to_file = message[0]
self._data = extract_file_words(message[0])
self._stop_word_manager = message[1]
self._data = extract_file_words(path_to_file)
def _process_words(self, message):
recipient = message[0]
for w in self._data:
send(self._stop_word_manager, ['filter', w])
send(self._stop_word_manager, ['top10', recipient])
send(self._stop_word_manager, ['topWord', recipient])
class StopWordManager(ActiveWFObject):
""" Models the stop word filter """
class FilterManager(ThreadObject):
_stop_words = []
def _dispatch(self, message):
@ -53,7 +59,6 @@ class StopWordManager(ActiveWFObject):
elif message[0] == 'filter':
return self._filter(message[1:])
else:
# forward
send(self._word_freqs_manager, message)
def _init(self, message):
@ -65,31 +70,29 @@ class StopWordManager(ActiveWFObject):
if word not in self._stop_words:
send(self._word_freqs_manager, ['word', word])
class WordFrequencyManager(ActiveWFObject):
""" Keeps the word frequency data """
class WFManager(ThreadObject):
_word_freqs = {}
def _dispatch(self, message):
if message[0] == 'word':
self._increment_count(message[1:])
elif message[0] == 'top10':
self._top10(message[1:])
elif message[0] == 'topWord':
self._topWord(message[1:])
def _increment_count(self, message):
word, = message
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def _top10(self, message):
def _topWord(self, message):
recipient = message[0]
freqs_sorted = sort_dict ( self._word_freqs )
send(recipient, ['top10', freqs_sorted])
class WordFrequencyController(ActiveWFObject):
send(recipient, ['topWord', freqs_sorted])
class MyController(ThreadObject):
def _dispatch(self, message):
if message[0] == 'run':
self._run(message[1:])
elif message[0] == 'top10':
elif message[0] == 'topWord':
self._display(message[1:])
else:
raise Exception("Message not understood " + message[0])
@ -101,20 +104,20 @@ class WordFrequencyController(ActiveWFObject):
def _display(self, message):
word_freqs, = message
print_word_freqs( word_freqs)
send(self._storage_manager, ['die'])
self._stopMe = True
send(self._storage_manager, ['over'])
self._over = True
if __name__ == '__main__':
word_freq_manager = WordFrequencyManager()
stop_word_manager = StopWordManager()
storage_manager = DataStorageManager()
word_freq_manager = WFManager()
stop_word_manager = FilterManager()
storage_manager = TxtManager()
wfcontroller = MyController()
send(stop_word_manager, ['init', word_freq_manager])
send(storage_manager, ['init', testfilepath, stop_word_manager])
wfcontroller = WordFrequencyController()
send(stop_word_manager, ['init', word_freq_manager])
send(wfcontroller, ['run', storage_manager])
# Wait for the active objects to finish
[t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]]
# 等待所有管理器完成工作
threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]
for thread in threads: thread.join()

@ -0,0 +1,25 @@
import requests
from cppy.cp_util import *
def main():
# 读测试文件的内容
content = read_file()
# 抽词
tokenize_response = requests.post("http://localhost:7770/tokenize", json={"text": content})
words = tokenize_response.json()["words"]
# 计算词频
count_response = requests.post("http://localhost:7771/count", json={"words": words})
word_count = count_response.json()["word_count"]
# 排序
sort_response = requests.post("http://localhost:7772/sort", json={"word_count": word_count})
top_10_words = sort_response.json()["top_10_words"]
print("Top 10 words:")
print_word_freqs(top_10_words)
if __name__ == "__main__":
main()

@ -0,0 +1,14 @@
from fastapi import FastAPI
from collections import Counter
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/count")
async def count(words_list: dict): # {"words": ["word1", "word2", ...]}
word_count = Counter(words_list["words"])
return {"word_count": dict(word_count)}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7771)

@ -0,0 +1,13 @@
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.post("/sort")
async def sort(word_count_dict: dict):
sorted_word_count = sorted(word_count_dict["word_count"].items(), key=lambda x: x[1], reverse=True)
top_10_words = sorted_word_count[:10]
return {"top_10_words": top_10_words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7772)

@ -0,0 +1,13 @@
from fastapi import FastAPI
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/tokenize")
async def tokenize(text: str):
words = extract_str_words(text)
return {"words": words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7770)

@ -0,0 +1,5 @@
[Plugins]
;; Options: plugins/f1.pyc, plugins/f2.pyc
frequencies = plugins/f2.pyc

@ -0,0 +1,30 @@
import configparser, importlib.machinery
from cppy.cp_util import *
class PluginManager:
def __init__(self):
self.plugins = {}
def load_plugins(self):
_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(_dir)
config = configparser.ConfigParser()
config.read("config.ini")
frequencies_plugin = config.get("Plugins", "frequencies")
# 加载插件
self.plugins['word_freqs'] = importlib.machinery.SourcelessFileLoader('', frequencies_plugin).load_module()
def get_plugin(self, name):
return self.plugins.get(name)
# 创建 PluginManager 实例
plugin_manager = PluginManager()
plugin_manager.load_plugins()
wordlist = extract_file_words(testfilepath) # 提取文件中的单词
word_freqs = plugin_manager.get_plugin('word_freqs').top_word(wordlist) # 调用实例方法
print_word_freqs(word_freqs) # 打印词频

@ -0,0 +1,28 @@
import py_compile
py_compile.compile('f1.py')
py_compile.compile('f2.py')
import os
import shutil
# 设置源目录和目标目录
source_dir = os.path.join(os.path.dirname(__file__), '__pycache__') # 当前目录下的 __pycache__ 目录
target_dir = os.path.join(os.path.dirname(__file__), '..', 'plugins') # 上一级目录下的 plugins 目录
# 确保目标目录存在
os.makedirs(target_dir, exist_ok=True)
# 遍历源目录中的所有 .pyc 文件
for filename in os.listdir(source_dir):
if filename.endswith('.pyc'):
# 提取文件名的前两个字符
new_filename = filename[:2]
# 构建源文件和目标文件的完整路径
source_file = os.path.join(source_dir, filename)
target_file = os.path.join(target_dir, new_filename + '.pyc')
# 拷贝文件
shutil.copyfile(source_file, target_file)
# 删除原始文件
os.remove(source_file)
print(f"Copied {filename} to {target_file} and removed original file.")

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import operator
def top25(word_list):
def top_word(word_list):
word_freqs = {}
for w in word_list:
if w in word_freqs:

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
import collections
def top_word(word_list):
counts = collections.Counter( word_list )
return counts.most_common(10)

@ -0,0 +1,16 @@
import cppy.cp_util as util
def extract_words(path_to_file:str) -> list:
return util.extract_file_words(path_to_file)
def frequencies( word_list:list ) -> dict :
return util.get_frequencies(word_list)
def sort(word_freq:dict) -> list :
return util.sort_dict(word_freq)
if __name__ == "__main__":
word_freqs = sort( frequencies(extract_words( util.testfilepath )) )
util.print_word_freqs(word_freqs)

@ -0,0 +1,36 @@
from cppy.cp_util import *
from dataclasses import dataclass
from collections import Counter
import re
@dataclass
class WordFrequency:
text: str
stop_words: set = None
def __post_init__(self):
# 如果未提供停用词表
if self.stop_words is None:
self.stop_words = get_stopwords()
def tokenize(self):
# 分词并去除停用词
words = re.findall(r'\b\w+\b', self.text.lower())
filtered_words = [word for word in words if word not in self.stop_words and len(word)>2]
return filtered_words
def get_top_n(self, n=10):
# 计算词频
word_freqs = Counter(self.tokenize())
return word_freqs.most_common(n)
# 使用示例
if __name__ == '__main__':
# 创建WordFrequency实例
text = read_file()
word_freq = WordFrequency( text )
# 获取并打印词频
top_words = word_freq.get_top_n()
print_word_freqs(top_words)

@ -3,25 +3,25 @@ from cppy.cp_util import *
def extract_words(path_to_file):
assert(type(path_to_file) is str), "I need a string!"
assert(path_to_file), "I need a non-empty string!"
assert(type(path_to_file) is str), "Must be a string!"
assert(path_to_file), "Must be a non-empty string!"
try:
with open(path_to_file,encoding='utf-8') as f:
str_data = f.read()
except IOError as e:
print("I/O error({0}) when opening {1}: {2}! I quit!".format(e.errno, path_to_file, e.strerror))
print("I/O error({0}) when opening {1}: {2}".format(e.errno, path_to_file, e.strerror))
raise e
return re_split(str_data)
def remove_stop_words(word_list):
assert(type(word_list) is list), "I need a list!"
assert(type(word_list) is list), "Must be a list!"
try:
stop_words = get_stopwords()
except IOError as e:
print("I/O error({0}) opening stops_words.txt: {1}! I quit!".format(e.errno, e.strerror))
print("I/O error({0}) opening stops_words.txt: {1}".format(e.errno, e.strerror))
raise e
return [w for w in word_list if not w in stop_words]

@ -0,0 +1,25 @@
from cppy.cp_util import *
def extractWords(path_to_file):
assert(type(path_to_file) is str), "Must be a string"
assert(path_to_file), "Must be a non-empty string"
return extract_file_words(path_to_file)
def frequencies(word_list):
assert(type(word_list) is list), "Must be a list"
assert(word_list != []), "Must be a non-empty list"
return get_frequencies(word_list)
def sort(word_freqs):
assert(type(word_freqs) is dict), "Must be a dictionary"
assert(word_freqs != {}), "Must be a non-empty dictionary"
return sort_dict(word_freqs)
if __name__ == '__main__':
try:
word_freqs = sort(frequencies(extractWords( testfilepath )))
print_word_freqs(word_freqs)
except Exception as e:
print(" Something wrong: {0}".format(e) )

@ -56,5 +56,4 @@ state_machine = WordFrequencyStateMachine(util.testfilepath)
word_frequencies = state_machine.run()
# 打印结果
for word, freq in word_frequencies.most_common(10):
print(f"{word}: {freq}")
util.print_word_freqs(word_frequencies.most_common(10))

@ -0,0 +1,192 @@
import site
import os, re, time
import string, operator
################################################################################
# 变量
################################################################################
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy', 'data', 'stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy', 'data', testfilename)
################################################################################
# 项目函数
################################################################################
def read_file(path_to_file):
"""
读取指定文件的内容
Args:
path_to_file (str): 文件路径
Returns:
str: 文件内容
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read()
return data
def re_split(data):
"""
使用正则表达式分割字符串将非字母字符替换为空格并将所有字符转换为小写
Args:
data (str): 输入字符串
Returns:
list: 分割后的单词列表
"""
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords(path_to_file=stopwordfilepath):
"""
获取停用词列表
Args:
path_to_file (str): 停用词文件路径默认为 stopwordfilepath
Returns:
list: 停用词列表
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read().split(',')
data.extend(list(string.ascii_lowercase))
return data
def get_chunks(file_path=testfilepath, chunk_size=1000):
"""
将文件内容分割成多个块
Args:
file_path (str): 文件路径默认为 testfilepath
chunk_size (int): 每个块的大小默认为 1000
Returns:
list: 分割后的块列表
"""
content = re_split(read_file(file_path))
chunks = [
content[i:i + chunk_size] for i in range(0, len(content), chunk_size)
]
return chunks
def extract_file_words(path_to_file):
"""
提取文件中的单词去除停用词和长度小于3的单词
Args:
path_to_file (str): 文件路径
Returns:
list: 提取后的单词列表
"""
word_list = re_split(read_file(path_to_file))
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def extract_str_words(data_str):
"""
提取字符串中的单词去除停用词和长度小于3的单词
Args:
data_str (str): 输入字符串
Returns:
list: 提取后的单词列表
"""
word_list = re_split(data_str)
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def count_word(word, word_freqs, stopwords):
"""
统计单词频率
Args:
word (str): 单词
word_freqs (dict): 单词频率字典
stopwords (list): 停用词列表
"""
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
"""
获取单词频率
Args:
word_list (list): 单词列表
Returns:
dict: 单词频率字典
"""
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict(word_freq):
"""
对字典进行排序
Args:
word_freq (dict): 单词频率字典
Returns:
list: 排序后的单词频率列表
"""
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
def print_word_freqs(word_freqs, n=10):
"""
打印单词频率
Args:
word_freqs (list): 单词频率列表
n (int): 打印的单词数量默认为 10
"""
for (w, c) in word_freqs[:n]:
print(w, '-', c)
################################################################################
# 通用工具
################################################################################
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 调用原始函数
end_time = time.time() # 记录结束时间
run_time = end_time - start_time # 计算运行时间
print(f"{func.__name__} 运行时间: {run_time*1000:.2f}")
return result
return wrapper
def test():
print('cppy welcome')

@ -0,0 +1,4 @@
## 任务
本项目的主要功能任务:做文本文件的分词,过滤常见词,求词频,并排序输出。

@ -0,0 +1,74 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为普通做法即使用requests库通过Post请求爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='普通爬取进度', unit='', ncols=80)
title_list = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,86 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多进程做法即使用多进程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import multiprocessing
import tqdm
lock = multiprocessing.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通做法")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
title_list = []
pbar = tqdm.tqdm(total=size * 10, desc='多进程爬取进度', unit='', ncols=80)
with multiprocessing.Pool(processes=5) as pool:
results = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
results.append(pool.apply_async(spider.fetch, (config, )))
for result in results:
data = result.get()
title_list += spider.parse(data)
lock.acquire()
pbar.update(size)
lock.release()
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,89 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用多线程并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import util
import logging
from typing import List
import tqdm
lock = threading.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行多线程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='多线程爬取进度', unit='', ncols=80)
title_list = []
tasks = []
with ThreadPoolExecutor(max_workers=5) as executor:
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
future = executor.submit(spider.fetch, config)
tasks.append(future)
# 更新进度条
lock.acquire()
pbar.update(size)
lock.release()
for future in as_completed(tasks):
data = future.result()
title_list += spider.parse(data)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,89 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为协程做法即使用gevent库通过协程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import gevent
from gevent import monkey
# 打补丁使标准库能够与gevent协同工作
monkey.patch_all()
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行协程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='协程爬取进度', unit='', ncols=80)
title_list = []
def fetch_and_parse(keyword, current):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
titles = spider.parse(data)
title_list.extend(titles)
pbar.update(size)
jobs = [
gevent.spawn(fetch_and_parse, keyword, current) for keyword in keywords
for current in range(1, 11)
]
gevent.joinall(jobs)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,85 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用异步并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))
```
"""
import asyncio
import util
import logging
from typing import List
import tqdm
@util.timeit_async
async def main_async(keywords: List[str],
begin_date: str,
end_date: str,
size: int = 10):
"""
使用异步方式爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行异步爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='异步爬取进度', unit='', ncols=80)
title_list = []
tasks = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
task = asyncio.create_task(spider.fetch_async(config))
tasks.append(task)
for task in asyncio.as_completed(tasks):
data = await task
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))

@ -0,0 +1,25 @@
# 目标
本节使用一个爬虫任务来展示如何追求代码的性能 。
充分理解线程、协程、进程、同步、异步、阻塞、非阻塞等概念,并能够根据具体场景选择合适的并发模型。
主线问题如何解决IO和计算速度不匹配、如何任务分解、分发和协作 。
# 任务
# 讨论分析
普通做法连续进行了五次测试时间分别为34.231s、34.091s、34.164s、34.226s、33.958s平均时间为34.134s
多进程(进程数=5连续进行了五次测试时间分别为7.719s、7.716s、7.690s、7.730s、7.711s平均时间为7.7132s
多线程(线程数=5连续进行了五次测试时间分别为7.185s、7.964s、6.983s、6.969s、7.035s平均时间为7.2272s
协程连续进行了五次测试时间分别为3.775s、3.807s、3.733s、3.824s、3.744s平均时间为3.776s
异步连续进行了五次测试时间分别为6.975s、7.675s、7.018s、7.032s、7.049s平均时间为7.1498s
为保证公平性每一次Post请求后休眠3秒
可以看出,协程的性能最好,普通做法的性能最差,多线程、多进程和异步的性能介于两者之间。
考虑到多进程和多线程是故意开的5个进程和线程而协程是单线程所以协程的性能最好。
另外,异步的性能最差,可能是由于异步的并发模型需要频繁地切换线程,导致性能下降。
总的来说,协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。
# 总结
协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。

@ -0,0 +1,188 @@
"""
"""
import re
import time
import functools
import json
import asyncio
import requests
from typing import Any, Dict, List
class Spider:
"""
爬虫类
Args:
keywords (List[str]): 用于搜索新闻的关键词列表
begin_date (str): 开始日期用于搜索
end_date (str): 结束日期用于搜索
size (int): 一次请求返回的新闻或政策的最大数量
Attributes:
URL (str): 网址
"""
# 天水市人民政府网站
URL = ('https://www.tianshui.gov.cn/aop_component/'
'/webber/search/search/search/queryPage')
def __init__(self, keywords: List[str], begin_date: str, end_date: str,
size: int):
self.keywords = keywords
self.begin_date = begin_date
self.end_date = end_date
self.size = size
def get_config(self, keyword: str, current: int) -> Dict[str, Any]:
"""
获取配置信息
Args:
keyword (str): 关键词
size (int): 一次请求返回的新闻的最大数量
Returns:
Dict[str, Any]: 配置信息
"""
return {
"aliasName": "article_data,open_data,mailbox_data,article_file",
"keyWord": keyword,
"lastkeyWord": keyword,
"searchKeyWord": False,
"orderType": "score",
"searchType": "text",
"searchScope": "3",
"searchOperator": 0,
"searchDateType": "custom",
"searchDateName": f"{self.begin_date}-{self.end_date}",
"beginDate": self.begin_date,
"endDate": self.end_date,
"showId": "c2ee13065aae85d7a998b8a3cd645961",
"auditing": ["1"],
"owner": "1912126876",
"token": "tourist",
"urlPrefix": "/aop_component/",
"page": {
"current": current,
"size": self.size,
"pageSizes": [2, 5, 10, 20, 50, 100],
"total": 0,
"totalPage": 0,
"indexs": []
},
"advance": False,
"advanceKeyWord": "",
"lang": "i18n_zh_CN"
}
def generate_headers(self) -> dict:
"""
生成请求头
Returns:
dict: 请求头
"""
return {
'Authorization':
'tourist',
'User-Agent':
('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit'
'/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari'
'/537.36 Edg/124.0.0.0')
}
def fetch(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
普通做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
time.sleep(3)
return json.loads(response)
async def fetch_async(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
异步做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
await asyncio.sleep(3)
return json.loads(response)
def parse(self, data: Dict[str, Any]) -> List[str]:
"""
解析网页内容
Args:
data (Dict[str, Any]): 网页内容
Returns:
List[str]: 标题列表
"""
title_list = []
records = data['data']['page']['records']
for i in range(self.size):
title = records[i]['title']
title = re.sub('<[^>]*>', '', title) # 去除html标签
title_list.append(title)
# print(title)
return title_list
def save(self, title_list: List[str]):
"""
保存数据
"""
pass
# 时间装饰器
def timeit(func):
"""
计算函数运行时间
Args:
func: 函数
Return:
函数
"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper
def timeit_async(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper

@ -0,0 +1,29 @@
# 目标
本节使用一个书城的各种业务环节来展示面向对象的各种设计模式 。
# 任务
背景假设为一个综合书城,提供线上线下购买,还经营一个书吧、一个报告厅。
# 说明
面向对象的模式把编程过程中的一些思路固定化,并给一个名字方便理解 。
它是软件工程中一组经过验证的、可重复使用的代码写法 。
所以,模式不是语法,而是编程思路 。
这样做的好处是,统一大家的代码形式,提高代码可读性、可维护性、可扩展性 。
那为啥,面向过程没有这么做
是因为这个思维提炼过程,充分利用了面向对象语言的特性:封装、继承、多态 。
面向过程语言,没有这些特性,所以,面向过程语言没有面向对象模式 。
因为 Python 对象协议的机制,多态、接口概念发生了根本变化 。
很多模式中,类的继承关系没必要了。下面示例中很多依旧保持了基类 。
一是致敬经典,二是起到一个工程上更工整和强注释的作用 。
另外,Python的动态语言的特性 。使得一些C++、Java 的模式没用了 。
比如 “ 原型模式Prototype可以使用copy.deepcopy()非常简便来创建 。
# 应用场景
面向对象设计模式在管理信息系统和图形用户界面系统应用比较广泛 。

@ -0,0 +1,17 @@
'''
全局只允许一个实例的办法
在该电商系统中全局只有一个数据库连接使用单例模式确保在整个应用程序内只创建一次数据库连接实例
'''
class DatabaseConnection:
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.connect_to_db()
return cls._instance
def connect_to_db(self):
# 连接到数据库的代码...
pass

@ -0,0 +1,31 @@
# 如果有一个选择结构来决定实现不同的类,再面向对象设计里面一般把这个选择做成一个类,叫做工厂模式
# 定义一个ProductFactory类用于创建不同类型的商品实例如电子产品、书籍等。具体的产品由子类实现。
#
class Product:
def __init__(self, name, price):
self.name = name
self.price = price
class Electronic(Product):
def __init__(self, name, price, brand):
super().__init__(name, price)
self.brand = brand
class Book(Product):
def __init__(self, name, price, author):
super().__init__(name, price)
self.author = author
class ProductFactory:
@staticmethod
def create_product(product_type, *args, **kwargs):
if product_type == 'electronic':
return Electronic(*args, **kwargs)
elif product_type == 'book':
return Book(*args, **kwargs)
else:
raise ValueError("Invalid product type")
# 使用工厂方法创建产品
product = ProductFactory.create_product('book', 'Python编程艺术', 50.0, 'Mark Lutz')

@ -0,0 +1,114 @@
'''
建造者模式Builder Pattern允许构建一个复杂对象的各个部分然后一步一步地返回这个对象的完整版本
将建造者模式应用于网购的下单和出库过程时我们可以设计一个Order类来表示订单
以及一个OrderBuilder类来构建订单的各个部分
此外我们还可以引入一个ShoppingCart类来表示购物车以及一个Inventory类来处理库存和出库逻辑
'''
######################################################################
# Order类它包含订单的基本信息如下单时间、用户信息、订单项列表
######################################################################
from datetime import datetime
class OrderItem:
def __init__(self, product_id, quantity):
self.product_id = product_id
self.quantity = quantity
class Order:
def __init__(self, user_id, order_items, order_time=None):
self.user_id = user_id
self.order_items = order_items
self.order_time = order_time or datetime.now()
self.status = "PLACED" # 初始状态为已下单
def __str__(self):
return f"Order for user {self.user_id} placed at {self.order_time}. Status: {self.status}"
def fulfill(self, inventory):
# 出库逻辑,这里简化处理
for item in self.order_items:
if not inventory.deduct_stock(item.product_id, item.quantity):
return False
self.status = "FULFILLED"
return True
######################################################################
# OrderBuilder类用于构建订单
######################################################################
class OrderBuilder:
def __init__(self):
self.reset()
def reset(self):
self._user_id = None
self._order_items = []
def for_user(self, user_id):
self._user_id = user_id
return self
def add_item(self, product_id, quantity):
self._order_items.append(OrderItem(product_id, quantity))
return self
def build(self):
if not self._user_id or not self._order_items:
raise ValueError("Order cannot be built without user and items.")
return Order(self._user_id, self._order_items)
######################################################################
# 购物车和库存类
######################################################################
class ShoppingCart:
def __init__(self, user_id):
self.user_id = user_id
self.items = {} # {product_id: quantity}
def add_to_cart(self, product_id, quantity):
self.items[product_id] = self.items.get(product_id, 0) + quantity
def checkout(self):
order_items = [OrderItem(product_id, quantity) for product_id, quantity in self.items.items()]
self.items.clear() # 清空购物车
return order_items
class Inventory:
def __init__(self):
self.stock = {} # {product_id: quantity}
def add_stock(self, product_id, quantity):
self.stock[product_id] = self.stock.get(product_id, 0) + quantity
def deduct_stock(self, product_id, quantity):
if self.stock.get(product_id, 0) >= quantity:
self.stock[product_id] -= quantity
return True
return False
######################################################################
# 模拟整个下单和出库过程
######################################################################
# 初始化库存和购物车
inventory = Inventory()
inventory.add_stock("book1", 10)
inventory.add_stock("book2", 5)
cart = ShoppingCart(user_id="user123")
cart.add_to_cart("book1", 2)
cart.add_to_cart("book2", 1)
# 使用OrderBuilder构建订单
order_items = cart.checkout() # 结账,获取订单项列表并清空购物车
order_builder = OrderBuilder().for_user("user123")
for item in order_items:
order_builder.add_item(item.product_id, item.quantity)
order = order_builder.build() # 构建订单对象
print(order) # 输出订单信息
# 出库处理
if order.fulfill(inventory):
print("Order has been fulfilled.")
else:
print("Order fulfillment failed due to insufficient stock.")

@ -0,0 +1,55 @@
'''
享元模式Flyweight Pattern可以用来减少对象的创建数量比如对于重复的书籍信息或者频繁请求的书籍分类可以通过享元模式来共享这些信息以提高内存使用效率和系统性能
在下面的代码中BookFlyweight 是享元抽象类它使用了一个类级别的字典 _books 来存储已经创建的书籍对象__new__ 方法被用来在创建新实例之前检查是否已经存在具有相同ISBN的书籍对象如果已经存在就返回那个对象的引用如果不存在就创建一个新对象并将其存储在 _books 字典中
请注意在这个例子中我故意尝试使用相同的ISBN但不同的标题来创建书籍对象以展示不正确的使用方式在真正的享元模式实现中一旦对象被创建并且其内在状态被设置在这个例子中是由ISBN标题和作者定义的就不应该再修改这些状态如果需要处理变化的状态通常会将这部分状态外部化并通过方法的参数传递给享元对象
另外要注意的是享元模式主要适用于大量细粒度对象且这些对象可以共享状态的情况在书籍的例子中ISBN是一个很好的共享状态的键但标题和作者通常不应该在对象创建后被改变因此这个例子更多的是为了展示享元模式的基本结构和原理而不是一个完全贴合实际的实现在实际应用中需要更仔细地设计享元对象的不可变状态和可变状态
'''
# 享元抽象类
class BookFlyweight:
_books = {}
def __new__(cls, isbn, title, author):
# 根据ISBN创建或获取书籍享元对象
if isbn not in cls._books:
cls._books[isbn] = super(BookFlyweight, cls).__new__(cls)
cls._books[isbn].set_book_info(title, author)
return cls._books[isbn]
def set_book_info(self, title, author):
self.title = title
self.author = author
def get_book_info(self):
return f"{self.title} by {self.author}"
# 享元工厂类
class BookFactory:
@staticmethod
def get_book(isbn, title, author):
return BookFlyweight(isbn, title, author)
# 客户端代码
if __name__ == "__main__":
# 使用相同的ISBN创建书籍对象它们应该是同一个对象的引用
book1 = BookFactory.get_book("123456789", "The Great Gatsby", "F. Scott Fitzgerald")
book2 = BookFactory.get_book("123456789", "The Same Book With Different Title?", "F. Scott Fitzgerald")
# 尽管我们试图设置不同的标题但因为ISBN相同所以它们是同一个对象
# 实际上,在这个实现中,我们应该确保在创建对象时就设置好所有必要的属性,并且之后不再修改它们。
# 这里为了演示,我们错误地修改了标题,这不是享元模式的典型用法。
# 在实际应用中,应该避免在享元对象创建后修改其内在状态(除了可能的状态复位)。
print(book1.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
print(book2.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
# 使用不同的ISBN创建书籍对象它们应该是不同的对象
book3 = BookFactory.get_book("987654321", "1984", "George Orwell")
print(book3.get_book_info()) # 输出1984 by George Orwell
# 验证是否是同一个对象
print(book1 is book2) # 输出True
print(book1 is book3) # 输出False

@ -0,0 +1,23 @@
# 装饰器模式允许我们在不修改原有类的基础上,动态地添加额外的功能。
# 就增加功能来说,装饰器模式比生成子类更为灵活。
# 餐吧的顾客可以选择为他们的咖啡添加额外的调料。
class Beverage:
def __init__(self, description):
self.description = description
self.price = 0.0
def cost(self):
return self.price
class CondimentDecorator(Beverage): # 进行装饰
def __init__(self, beverage, description, price_increase):
self.beverage = beverage
self.description = f"{beverage.description}, {description}"
self.price_increase = price_increase
def cost(self):
return self.beverage.cost() + self.price_increase
# 使用装饰器模式
coffee = Beverage("Espresso")
coffee_with_chocolate = CondimentDecorator(coffee, "Chocolate", 0.50)

@ -0,0 +1,42 @@
'''
适配器模式Adapter
应用将一个类的接口转换成客户期望的另一个接口使得原本由于接口不兼容而无法一起工作的类能够一起工作
'''
########################################################################
# 定义一个目标接口Target和一个与之不兼容的类Adaptee
############################################################################
# 目标接口
class Target:
def request(self):
pass
# 需要适配的类
class Adaptee:
def specific_request(self):
print("Called Adaptee's specific_request.")
########################################################################
# 定义一个适配器类Adapter它实现了Target接口并且持有Adaptee的实例
# 从而能够在request方法中调用Adaptee的specific_request方法
# 一个继承,一个当参数加入构造函数
############################################################################
# 适配器
class Adapter(Target):
def __init__(self, adaptee):
self.adaptee = adaptee
def request(self):
# 调用Adaptee的specific_request方法
self.adaptee.specific_request()
if __name__ == "__main__":
# 创建Adaptee实例
adaptee = Adaptee()
# 创建Adapter实例将Adaptee实例作为参数传递
adapter = Adapter(adaptee)
# 客户端调用Target的request方法实际上调用的是Adaptee的specific_request方法
adapter.request()

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save