Compare commits

..

100 Commits
master ... dev

Author SHA1 Message Date
zj3D 3f03b9014d 05
2 weeks ago
zj3D d782d65fb5 Remove .gitignore file from Git tracking
2 weeks ago
zj3D 5e83716c4d 04
2 weeks ago
zj3D d339ef454b 03
2 weeks ago
zj3D 654dabbb69 02
2 weeks ago
zj3D 59ea2b479b 01
2 weeks ago
zj3D 0c531354ce BC01
2 weeks ago
zj3D dc44e0be4b 04
2 weeks ago
zj3D 3ea3cbaa23 031
2 weeks ago
zj3D 0eb30e470f 03
2 weeks ago
zj3D 4abad6851f 02
2 weeks ago
zj3D 129f1aaa3f 01
2 weeks ago
zj3D 617465cec6 07
2 weeks ago
zj3D 35e58525f1 06
2 weeks ago
zj3D e5b9607dce 05
2 weeks ago
zj3D 49bf182cf8 04
2 weeks ago
zj3D ea53899bbd 04
2 weeks ago
zj3D 0606fc586c 03
2 weeks ago
zj3D b77d297f3e 02
2 weeks ago
zj3D 1712e964cf 02
2 weeks ago
zj3D 8b9e813ee2 2501
2 weeks ago
zj3D 7365ebb312 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
3 weeks ago
zj3D 524a65e492 ABC
3 weeks ago
p46318075 94800c4b9e Update readme.MD
2 months ago
p46318075 3d0220d49b Merge pull request 'dev' (#17) from pcz4qfnkl/CodePattern:dev into dev
7 months ago
Yao 36afa1d669 refactor: 优化代码,提高可读性和效率
7 months ago
Yao 15736d7393 refactor(code): 优化代码,提高可读性和效率
7 months ago
Yao f170c936d8 feat: 添加了根据关键词爬取天水市人民政府网站上指定日期内新闻标题的功能,并提供了多线程、多进程、协程和异步四种实现方式。
7 months ago
zj3D ceb9955051 ABC
8 months ago
zj3D 4606a87618 0803
8 months ago
zj3D 2a27a2c748 拉萨饭店
11 months ago
zj3D 26b6f4c88b 结构调整25
1 year ago
zj3D fa3e01dedc 大修 12
1 year ago
zj3D 850a3eb772 注册消息
1 year ago
zj3D 8d5c578da8 插件简化
1 year ago
zj3D 88606f2bce 修订 11
1 year ago
zj3D ebe28f7670 修 12
1 year ago
zj3D c8946209bf 大修 10
1 year ago
zj3D 2d46194636 大修 11
1 year ago
zj3D 5099345721 大修10
1 year ago
zj3D cd8186dd68 大修 10
1 year ago
zj3D 50952795a8 大修 9
1 year ago
zj3D 2cac3f2788 over
1 year ago
zj3D 44b0c00567 大修 9
1 year ago
zj3D 83c156a3d5 大修 8
1 year ago
zj3D 31a4dfc8e5 非中心化
1 year ago
zj3D e27ecadb25 流式调用
1 year ago
zj3D 9d74a5c184 大修 8
1 year ago
zj3D a3bc46dae3 大修6
1 year ago
zj3D f2ff5c8d4e 清理
1 year ago
zj3D 18f3901592 大修改6
1 year ago
zj3D 44c1f9eb1e 大修改4
1 year ago
zj3D b86f626e94 大调整2
1 year ago
zj3D fe94d8ed1b 享元修订
1 year ago
zj3D b15c7505f6 调整
1 year ago
zj3D cab45b3281 词频对象设计模式修订
1 year ago
zj3D 4aa6f8469d 调整
1 year ago
p46318075 f8f3f10d2e Add readme.MD
1 year ago
zj3D 7db531d2fc 大调整
1 year ago
zj3D 41a14b6705 设计模式
1 year ago
zj3D ac7fb13827 设计模式
1 year ago
zj3D 1920e47a1c 设计模型
1 year ago
p46318075 c5932334fa Merge pull request '观察者模式' (#15) from p26zockiw/CodePattern:master into dev
1 year ago
zj3D 239c0188d0 调整
1 year ago
zj3D bfcaab3439 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
1 year ago
zj3D f52645e7b2 test
1 year ago
p46318075 b6fc9ef4c3 Merge pull request '修改restful模式' (#14) from pbr4nzfkh/CodePattern:dev into dev
1 year ago
pbr4nzfkh 28f60e8216 restful 服务端
1 year ago
pbr4nzfkh fdf6166100 Delete '基本结构/042 restful/tf-35-app.py'
1 year ago
pbr4nzfkh ffdae7d329 restful 客户端
1 year ago
pbr4nzfkh 0b9d4a63d6 Delete '基本结构/042 restful/tf-35-request.py'
1 year ago
pbr4nzfkh ada14b9a7b restful 服务端
1 year ago
pbr4nzfkh c8cd7bbc0c Delete '基本结构/042 restful/tf-35-app.py'
1 year ago
zj3D c99a655997 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
1 year ago
zj3D 950cb41e08 debug
1 year ago
p46318075 f131c63ff4 Merge pull request '修改map-reduce模式' (#13) from pbr4nzfkh/CodePattern:dev into dev
1 year ago
zj3D 2518a5cd85 结构调整
1 year ago
pbr4nzfkh e993c23ed1 Delete '计算设备/map-reduce/tf-32.py'
1 year ago
pbr4nzfkh fb95636bb1 tf-31 map-reduce模式
1 year ago
pbr4nzfkh 285b016a30 tf-92多进程模式
1 year ago
pbr4nzfkh 1ebf2a45fe tf-91多线程模式
1 year ago
pbr4nzfkh 740f5aabff Delete '计算设备/map-reduce/tf_92.py'
1 year ago
pbr4nzfkh 2288c18e8a Delete '计算设备/map-reduce/tf_91.py'
1 year ago
pbr4nzfkh 028c7ddb07 Delete '计算设备/map-reduce/tf-31.py'
1 year ago
zj3D 9bf690d62c 风格统一
1 year ago
zj3D 041fced368 patch
1 year ago
p46318075 254c11c3c9 Merge pull request '修改restful模式' (#12) from pbr4nzfkh/CodePattern:dev into dev
1 year ago
pbr4nzfkh 29dbff26cc restful_app
1 year ago
pbr4nzfkh 4134d794ab Delete '基本结构/042 restful/tf-35-app.py'
1 year ago
pbr4nzfkh d54c43b459 restful-app
1 year ago
pbr4nzfkh 365c8bb76a Delete '基本结构/042 restful/test.txt'
1 year ago
pbr4nzfkh f8055c0044 restful-request
1 year ago
pbr4nzfkh 726a8795c7 Delete '基本结构/042 restful/tf-34.py'
1 year ago
zj3D b4a280c55c 类型申明
1 year ago
zj3D bfbc1120ec 运行时间装饰器
1 year ago
zj3D 3c439ef8d7 update
1 year ago
zj3D 445088fde8 增加一种终端模式
1 year ago
zj3D 0e55cabe5c print
1 year ago
zj3D 856fdcc1e1 1
1 year ago
zj3D 8545ada6c2 111
1 year ago

@ -1,17 +1,19 @@
import string # 引入停用词表和测试文件的路径
from cppy.cp_util import * from cppy.cp_util import stopwordfilepath, testfilepath
# 准备词和停用词表 # 准备停用词表
word_freqs = [] with open(stopwordfilepath, encoding='utf-8') as f:
with open( stopwordfilepath,encoding='utf-8' ) as f:
stop_words = f.read().split(',') stop_words = f.read().split(',')
stop_words.extend(list(string.ascii_lowercase)) for letter in 'abcdefghijklmnopqrstuvwxyz':
stop_words.append(letter)
for line in open( testfilepath ,encoding='utf-8' ): # 读文件,逐行扫描文本,发现词,确定不是停用词,计数
word_freqs = []
for line in open(testfilepath, encoding='utf-8'):
start_char = None start_char = None
i = 0 i = 0
for c in line: for c in line:
if start_char == None: if start_char is None:
if c.isalnum(): if c.isalnum():
# 一个单词开始 # 一个单词开始
start_char = i start_char = i
@ -32,15 +34,23 @@ for line in open( testfilepath ,encoding='utf-8' ):
pair_index += 1 pair_index += 1
if not found: if not found:
word_freqs.append([word, 1]) word_freqs.append([word, 1])
elif len(word_freqs) > 1:
for n in reversed(range(pair_index)):
if word_freqs[pair_index][1] > word_freqs[n][1]:
# 交换
word_freqs[n], word_freqs[pair_index] = word_freqs[pair_index], word_freqs[n]
pair_index = n
# 重置开始标记 # 重置开始标记
start_char = None start_char = None
i += 1 i += 1
for tf in word_freqs[0:10]: # 使用冒泡排序对词频进行排序
n = len(word_freqs)
for i in range(n):
for j in range(0, n - i - 1):
if word_freqs[j][1] < word_freqs[j + 1][1]:
word_freqs[j], word_freqs[j + 1] = word_freqs[j + 1], word_freqs[j]
# 打印频率最高的前10个词
for tf in word_freqs[:10]:
print(tf[0], '-', tf[1]) print(tf[0], '-', tf[1])
'''
想到哪里写到哪里
用的最基础的编程思想没有使用 Python 高级语法特性数据结构和算法
'''

@ -1,4 +1,5 @@
from cppy.cp_util import * from cppy.cp_util import stopwordfilepath, testfilepath
import string
from collections import Counter from collections import Counter
# 准备词和停用词表 # 准备词和停用词表
@ -7,7 +8,7 @@ stop_words.update(list(string.ascii_lowercase))
# 读取文件并计算单词频率 # 读取文件并计算单词频率
word_freqs = Counter() word_freqs = Counter()
with open(testfilepath,encoding = 'utf8') as f: with open(testfilepath, encoding='utf8') as f:
for line_num, line in enumerate(f, 1): for line_num, line in enumerate(f, 1):
start_char = None start_char = None
for i, c in enumerate(line): for i, c in enumerate(line):

@ -1,8 +1,10 @@
import re, sys, collections import re
from cppy.cp_util import * import collections
from cppy.cp_util import stopwordfilepath, testfilepath
stopwords = set(open( stopwordfilepath,encoding = 'utf8' ).read().split(',')) stopwords = set(open(stopwordfilepath, encoding='utf8').read().split(','))
words = re.findall('[a-z]{2,}', open( testfilepath,encoding = 'utf8').read().lower()) words = re.findall('[a-z]{2,}',
open(testfilepath, encoding='utf8').read().lower())
counts = collections.Counter(w for w in words if w not in stopwords) counts = collections.Counter(w for w in words if w not in stopwords)
for (w, c) in counts.most_common(10): for (w, c) in counts.most_common(10):
print(w, '-', c) print(w, '-', c)

@ -2,8 +2,10 @@ import string
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
################################
# data # data
data = [] ################################
data = ''
words = [] words = []
word_freqs = [] word_freqs = []
@ -13,17 +15,12 @@ word_freqs = []
def read_file(path_to_file): def read_file(path_to_file):
global data global data
with open(path_to_file,encoding='utf-8') as f: with open(path_to_file,encoding='utf-8') as f:
data = data + list(f.read()) data = f.read()
def filter_chars_and_normalize(): def extractwords():
global data global data
global words global words
for i in range(len(data)): words = data.lower().split()
data[i] = ' ' if not data[i].isalnum() else data[i].lower()
data_str = ''.join(data)
words = words + data_str.split()
with open(stopwordfilepath) as f: with open(stopwordfilepath) as f:
stop_words = set(f.read().split(',')) stop_words = set(f.read().split(','))
stop_words.update(string.ascii_lowercase) stop_words.update(string.ascii_lowercase)
@ -41,7 +38,7 @@ def sort():
if __name__ == "__main__": if __name__ == "__main__":
read_file( testfilepath ) read_file( testfilepath )
filter_chars_and_normalize() extractwords()
frequencies() frequencies()
sort() sort()

@ -2,31 +2,26 @@ import re
from cppy.cp_util import * from cppy.cp_util import *
def filter_chars_and_normalize(str_data): def extractwords(str_data):
pattern = re.compile('[\W_]+') pattern = re.compile('[\W_]+')
word_list = pattern.sub(' ', str_data).lower().split() word_list = pattern.sub(' ', str_data).lower().split()
stop_words = get_stopwords() stop_words = get_stopwords()
return [w for w in word_list if not w in stop_words] return [w for w in word_list if not w in stop_words]
def frequencies(word_list): def frequencies(word_list):
word_freqs = {} word_freqs = {}
for word in word_list: for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1 word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs return word_freqs
def sort(word_freq): def sort(word_freq):
return sorted( word_freq.items(), key=lambda x: x[1], reverse=True ) return sorted( word_freq.items(), key=lambda x: x[1], reverse=True )
def print_all(word_freqs, n = 10 ):
for word, freq in word_freqs[ :n ]:
print(word, '-', freq)
if __name__ == "__main__": if __name__ == "__main__":
print_all(sort(frequencies( txtcontent = read_file( testfilepath )
filter_chars_and_normalize( word_list = extractwords( txtcontent )
read_file( testfilepath )))) word_freqs = frequencies( word_list )
) word_sorts = sort ( word_freqs )
for tf in word_sorts[:10]:
print(tf[0], '-', tf[1])

@ -0,0 +1,30 @@
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def process_chunks( chunks,word_freqs,x,max ):
next = x + 1
if next < max:
process_chunks(chunks,word_freqs,next,max)
word_list = process_chunk(chunks[x])
word_freqs += Counter(word_list)
# def process_chunks( chunks,word_freqs,x,max ):
# word_list = process_chunk(chunks[x])
# word_freqs += Counter(word_list)
# next = x + 1
# if next < max:
# process_chunks(chunks,word_freqs,next,max)
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,2000)
word_freqs = Counter()
process_chunks( chunks,word_freqs,0,len(chunks) )
print_word_freqs( word_freqs.most_common(10) )

@ -0,0 +1,101 @@
from collections import Counter
from cppy.cp_util import *
class DataStorageManager:
"""
数据模型读取文件内容并将内容分割成单词
Attributes:
_data: 单词列表
Methods:
_words (self): 返回分割后的单词列表
"""
def __init__(self, path_to_file):
self._data = re_split(read_file(path_to_file))
def words(self):
"""返回分割后的单词列表。"""
return self._data
class StopWordManager:
"""
停用词模型
Attributes:
_stop_words: 停用词列表
Methods:
is_stop_word (self, word): 判断给定单词是否为停用词
"""
def __init__(self):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
"""判断给定单词是否为停用词。"""
return word in self._stop_words
class WordFrequencyManager:
"""
词频模型计算并管理单词的频率
Attributes:
_word_freqs: 使用 Counter 存储单词及其出现次数
Methods:
increment_count (self, word): 计算词频
sorted(self): 返回按出现次数排序的单词列表
"""
def __init__(self):
self._word_freqs = Counter()
def increment_count(self, word):
"""计算词频。"""
self._word_freqs[word] += 1
def sorted(self):
"""返回按出现次数排序的单词列表。"""
return self._word_freqs.most_common()
class WordFrequencyController:
"""
控制器控制整个流程读取文件处理停用词计算词频并输出结果
Attributes:
_storage_manager: DataStorageManager 实例用于读取和处理文件内容
_stop_word_manager: StopWordManager 实例用于管理停用词
_word_freq_manager: WordFrequencyManager 实例用于计算和存储单词频率
Methods:
run(self): 运行方法遍历单词列表过滤掉停用词并计算每个单词的频率最后输出结果
"""
def __init__(self, path_to_file):
self._storage_manager = DataStorageManager(path_to_file)
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
def run(self):
"""运行方法,遍历单词列表,过滤掉停用词,并计算每个单词的频率,最后输出结果。"""
for w in self._storage_manager.words():
if not self._stop_word_manager.is_stop_word(w):
self._word_freq_manager.increment_count(w)
word_freqs = self._word_freq_manager.sorted()
print_word_freqs(word_freqs)
if __name__ == '__main__':
WordFrequencyController(testfilepath).run()
'''
函数输入参数调用后你的马上接住返回值
类输入参数后实例化后你可以需要的时候去访问你需要的数据实例属性
'''

@ -0,0 +1,52 @@
from cppy.cp_util import *
def extract_words(obj, path_to_file):
"""
从文件中提取单词并存储在对象的 'data' 字段中
Args:
obj (dict): 存储数据的字典对象
path_to_file (str): 文件路径
"""
obj['data'] = extract_file_words(path_to_file)
def increment_count(obj, w):
"""
增加单词的计数如果单词不存在则将其计数设置为1
参数:
obj (dict): 存储单词频率的字典对象
w (str): 单词
"""
obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w] + 1
# 数据存储对象,包含初始化和获取单词的方法
data_storage_obj = {
'data': [], # 存储单词列表
'init': lambda path_to_file: extract_words(data_storage_obj, path_to_file
), # 初始化方法,提取文件中的单词
'words': lambda: data_storage_obj['data'] # 获取单词列表的方法
}
# 单词频率对象,包含增加计数和排序的方法
word_freqs_obj = {
'freqs': {}, # 存储单词频率的字典
'increment_count':
lambda w: increment_count(word_freqs_obj, w), # 增加单词计数的方法
'sorted': lambda: sort_dict(word_freqs_obj['freqs']) # 获取排序后的单词频率的方法
}
if __name__ == '__main__':
# 初始化数据存储对象,提取文件中的单词
data_storage_obj['init'](testfilepath)
# 遍历单词列表,增加单词的计数
for word in data_storage_obj['words']():
word_freqs_obj['increment_count'](word)
# 获取排序后的单词频率并打印
word_freqs = word_freqs_obj['sorted']()
print_word_freqs(word_freqs)

@ -0,0 +1,3 @@
from cppy.cp_util import *
print_word_freqs( sort_dict ( get_frequencies ( extract_file_words(testfilepath) )))

@ -0,0 +1,28 @@
from cppy.cp_util import *
# 如果有连续的对数据加工操作,而且总是把共同加工数据对象当第一个参数,可以用本文件夹方法提升阅读体验
# 框架类
class FunBind:
def bind(self, func,*args, **kwargs):
try:
self.data = func(self.data,*args, **kwargs)
except:
self.data = func(*args, **kwargs)
return self
data = FunBind()\
.bind(extract_file_words,testfilepath)\
.bind(get_frequencies)\
.bind(sort_dict)\
.bind(print_word_freqs,10)\
.data
print(data)
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''

@ -0,0 +1,28 @@
from cppy.cp_util import *
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''
class FunPipe:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def __or__(self, other):
_data = self.func(*self.args, **self.kwargs)
return FunPipe( other.func,_data,*other.args,**other.kwargs)
@property
def data(self):
return self.func(*self.args, **self.kwargs)
# 模仿管道
pipe = FunPipe(extract_file_words,testfilepath) | FunPipe(get_frequencies) | FunPipe(sort_dict) | FunPipe(print_word_freqs, 10)
pipe.data

@ -0,0 +1,29 @@
from cppy.cp_util import *
class Flow:
def extract_file_words(self, filepath):
self.data = extract_file_words(filepath)
return self
def get_frequencies(self):
self.data = get_frequencies(self.data)
return self
def sort_dict(self):
self.data = sort_dict(self.data)
return self
def print_word_freqs(self, n):
print_word_freqs(self.data, n)
return self
# 顺序调用
Flow().extract_file_words(testfilepath).get_frequencies().sort_dict().print_word_freqs(10)
'''
连续方法调用看起来比较舒服
但是需要假设
- 每一个类方法返回 self 否则没法连续
- 类方法默认不写第一个参数数据都在 .data 里面
'''

@ -0,0 +1,50 @@
from cppy.cp_util import *
# 装饰器改写类
# - 找到以f_开头的方法
# - 将方法函数的返回值赋值给对象的data属性
# - 返回对象自身
def return_self_decorator(cls):
def return_self(func):
# 定义一个闭包函数,用于接收参数
def wrapper(self, *args, **kwargs):
self.data = func(self, *args, **kwargs)
return self # 返回类自身
return wrapper
for name, method in cls.__dict__.items():
# 判断属性是否可调用且属性名以f_开头
if callable(method) and name.startswith('f_'):
# 为类改写属性,将封装后的函数赋值
setattr(cls, name, return_self(method))
return cls
@return_self_decorator
class Flow():
def test(self):
return 'test'
def f_extract_file_words(self, filepath):
return extract_file_words(filepath)
def f_get_frequencies(self):
return get_frequencies(self.data)
def f_sort_dict(self):
return sort_dict(self.data)
def f_print_word_freqs(self, n):
print_word_freqs(self.data, n)
# 顺序调用
Flow().f_extract_file_words(testfilepath).f_get_frequencies().f_sort_dict().f_print_word_freqs(10)
'''
改写后参与 function flow 功能的方法
- 需要以 'f_' 开头
- 类方法默认不写第一个参数数据都在 .data 里面
仍旧需要特殊的方法写法
所以还是 12种方法比较自然
'''

@ -0,0 +1,26 @@
from cppy.cp_util import *
from collections import Counter
# 定义一个带计数器的元类
class CounterMetaclass(type):
def __new__(mcs, name, bases, attrs):
attrs['_counter'] = Counter()
return super().__new__(mcs, name, bases, attrs)
# 基于元类创建类
class Word( metaclass=CounterMetaclass ):
def __init__(self, word):
self.word = word
self._counter[self.word] += 1
@classmethod
def get_word_freqs(cls,n) -> Counter:
return cls._counter.most_common(n)
for word in extract_file_words ( testfilepath ) : Word(word)
print_word_freqs(Word.get_word_freqs(10))
'''
常用于将依赖项如服务或配置自动注入到类中
'''

@ -0,0 +1,20 @@
from cppy.cp_util import *
#
# 生成器
#
def non_stop_words(testfilepath):
stopwords = get_stopwords()
data_str = read_file(testfilepath)
wordlist = re_split( data_str )
for word in wordlist:
if word not in stopwords:
yield word # 弹出一个非停用词
freqs = {}
for word in non_stop_words(testfilepath):
freqs[word] = freqs.get(word, 0) + 1
data = sort_dict(freqs)
print_word_freqs(data)

@ -3,6 +3,10 @@ import aiofiles
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
#
# 协程: 有点复杂; 读文件的Io还是太快的爬虫
#
async def read_file(file_path): async def read_file(file_path):
async with aiofiles.open(file_path, 'r', encoding='utf-8') as file: async with aiofiles.open(file_path, 'r', encoding='utf-8') as file:
content = await file.read() content = await file.read()
@ -21,7 +25,8 @@ async def main():
top_words = await count_words(text) top_words = await count_words(text)
wordfreqs += top_words wordfreqs += top_words
for word, count in wordfreqs.most_common(10): for word, count in wordfreqs.most_common(10):
print(f"{word}: {count//10}") print(f"{word}: {count//10}") # 突出 Io 的提升价值
# 运行异步主函数 # 运行异步主函数
asyncio.run(main()) asyncio.run(main())

@ -1,7 +1,7 @@
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
class AcceptTypes: class TypesCheck:
def __init__(self, *args): def __init__(self, *args):
self._args = args self._args = args
@ -9,22 +9,23 @@ class AcceptTypes:
def wrapped_f(*args, **kwargs): def wrapped_f(*args, **kwargs):
for i, arg_type in enumerate(self._args): for i, arg_type in enumerate(self._args):
if not isinstance(args[i], arg_type): if not isinstance(args[i], arg_type):
raise TypeError(f"Argument {i} expected {arg_type}, got {type(args[i])}") raise TypeError(f" {i} expected {arg_type}, got {type(args[i])}")
return f(*args, **kwargs) return f(*args, **kwargs)
return wrapped_f return wrapped_f
@AcceptTypes(str) @TypesCheck(str)
def extract_words_(path_to_file): def extract_words_(path_to_file):
return extract_file_words(path_to_file) return extract_file_words(path_to_file)
@AcceptTypes(list) @TypesCheck(list)
def frequencies_(word_list): def frequencies_(word_list):
return Counter(word_list) return Counter(word_list)
@AcceptTypes(Counter) @TypesCheck(Counter)
def sort_(word_freq): def sort_(word_freq):
return word_freq.most_common() return word_freq.most_common()
if __name__ == '__main__': if __name__ == '__main__':
word_freqs = sort_(frequencies_(extract_words_( testfilepath ))) word_freqs = sort_(frequencies_(extract_words_( testfilepath )))
print_word_freqs(word_freqs) print_word_freqs(word_freqs)

@ -21,17 +21,15 @@ class sortTaskHandler:
def handle_task(task_type,*args): def handle_task(task_type,*args):
handler_class_name = f"{task_type}TaskHandler" # 构建处理器类名 handler_class_name = f"{task_type}TaskHandler" # 构建处理器类名
# 使用globals()获取当前全局符号表
handler_class = globals().get(handler_class_name) handler_class = globals().get(handler_class_name)
if handler_class: if handler_class:
handler = handler_class() # 实例化处理器类 handler = handler_class() # 实例化处理器类
return handler.handle(*args) # 调用处理方法 return handler.handle(*args) # 调用处理方法
else: else:
print(f"No handler found for task type: {task_type}") print(f"No found for task type: {task_type}")
if __name__ == '__main__': word_list = handle_task("words",util.testfilepath)
word_list = handle_task("words",util.testfilepath) word_freq = handle_task("frequencies",word_list)
word_freq = handle_task("frequencies",word_list) word_sort = handle_task("sort",word_freq)
word_sort = handle_task("sort",word_freq) util.print_word_freqs(word_sort)
util.print_word_freqs(word_sort)

@ -0,0 +1,56 @@
import threading, queue
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数
class WordFrequencyCounter:
def __init__(self, input_file):
self.word_space = queue.Queue()
self.freq_space = queue.Queue()
for chunk in get_chunks(input_file,3000):
self.word_space.put(chunk)
def process_words(self):
while not self.word_space.empty():
try:
chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据
except queue.Empty:
break # 队列为空,退出循环
# print(f"Worker thread ID: {threading.get_ident()}",len(chunk))
words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ]
word_freqs = Counter(words)
self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典
def run(self):
workers = [ threading.Thread(target=self.process_words) for _ in range(5)]
for worker in workers: worker.start()
for worker in workers: worker.join()
word_freqs = Counter() # 初始化一个空的Counter对象
while not self.freq_space.empty():
freqs = self.freq_space.get()
if freqs: # 确保freqs非空
word_freqs.update(freqs)
print_word_freqs ( sort_dict (word_freqs) )
@timing_decorator
def main():
counter = WordFrequencyCounter( testfilepath )
counter.run()
if __name__ == '__main__':
main()
'''
在多线程之间传递数据建议使用线程安全的队列如queue.Queue或multiprocessing.Queue后者也适用于多进程环境
这些队列提供了线程安全的数据传输机制可以避免竞态条件和数据损坏
全局变量不可预测
multiprocessing.Queue 利用了操作系统提供的进程间通信IPC, Inter-Process Communication机制具体实现取决于不同操作系统的支持
在Unix/Linux系统中multiprocessing.Queue通常基于管道pipes共享内存和/或消息队列等机制实现
而在Windows系统上可能使用命名管道named pipes或者内存映射文件memory-mapped files以及某些版本的Windows特有的进程间同步对象如MutexesSemaphores和事件
'''

@ -0,0 +1,62 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
使用 multiprocessing.Manager 来完成统计词频
需要注意
- Manager() 必须用函数包起来,不能按脚本随便放外面否则会提示freeze_support
- 工作函数需要放到外面不能做内部函数否则会提示参数错误
- 无法在 Jupyter 类似环境运行
'''
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(shared_chunks,word_count):
while True:
try:
chunk = shared_chunks.pop(0) # 从共享列表中取出一个数据块
if chunk is None: break # 如果取出的是None表示所有数据块已处理完毕
words = extract_str_words(chunk)
for word in words:
if word not in stop_words:
word_count[word] = word_count.get(word, 0) + 1
except Exception as e:
print(e)
break
@timing_decorator
def main():
# 创建一个Manager实例
manager = Manager()
shared_chunks = manager.list()
word_count = manager.dict()
# 读取文件并按块大小分割,将块添加到共享列表中
chunk_size = 1024 * 10 # 假设每个块是10KB可以根据需要调整
with open(testfilepath, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk: break
shared_chunks.append(chunk)
shared_chunks.append(None)
print('-------------------',len(shared_chunks))
processes = [ Process( target=process_chunk,
args=(shared_chunks,word_count))
for _ in range( 4 ) ] # 假设启动4个工作进程
for p in processes: p.start()
for p in processes: p.join()
# 将Manager类型的字典转换为普通的字典以便使用Counter
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -0,0 +1,42 @@
'''
使用 multiprocessing.Manager:
Manager 提供了一个可以在不同进程之间共享和修改的数据类型 list, dict, Namespace
它实际上是在背后启动了一个单独的服务器进程其他进程通过代理来访问这些共享对象
'''
# 使用 multiprocessing.Manager 来完成统计词频
# 怎么得到最快的一个结果是一个试错过程X程创建数目多少、分片的大小 ...
from cppy.cp_util import *
from collections import Counter
from multiprocessing import Manager, Process
stop_words = get_stopwords()
def process_chunk(chunk,word_count):
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
for word in words: # 非常化时间
word_count[word] = word_count.get(word, 0) + 1
# word_count.update( Counter(words) ) # 类型不起作用
@timing_decorator
def main():
manager = Manager()
word_count = manager.dict()
chunks = get_chunks(testfilepath,2800)
print('-------------------',len(chunks))
processes = []
for chunk in chunks:
p = Process(target=process_chunk,
args=(chunk,word_count) )
processes.append(p)
p.start()
for p in processes: p.join()
word_count = dict(word_count)
word_freqs = Counter(word_count).most_common(10)
print_word_freqs(word_freqs)
if __name__ == '__main__':
main()

@ -4,20 +4,10 @@ from cppy.cp_util import testfilepath,db_filename,extract_file_words
# 数据库表结构 # 数据库表结构
TABLES = { TABLES = {
'documents': '''CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL
)''',
'words': '''CREATE TABLE IF NOT EXISTS words ( 'words': '''CREATE TABLE IF NOT EXISTS words (
doc_id INTEGER NOT NULL, doc_name INTEGER NOT NULL,
value TEXT NOT NULL, value TEXT NOT NULL
FOREIGN KEY (doc_id) REFERENCES documents (id)
)''', )''',
'characters': '''CREATE TABLE IF NOT EXISTS characters (
word_id INTEGER NOT NULL,
value TEXT NOT NULL,
FOREIGN KEY (word_id) REFERENCES words (id)
)'''
} }
@ -33,15 +23,10 @@ def create_db_schema(connection):
def load_file_into_database(path_to_file, connection): def load_file_into_database(path_to_file, connection):
words = extract_file_words( path_to_file ) words = extract_file_words( path_to_file )
doc_name = os.path.basename(testfilepath).split('.')[0]
c = connection.cursor() c = connection.cursor()
c.execute("INSERT INTO documents (name) VALUES (?)", (path_to_file,))
doc_id = c.lastrowid
for w in words: for w in words:
c.execute("INSERT INTO words (doc_id, value) VALUES (?, ?)", (doc_id, w)) c.execute("INSERT INTO words (doc_name, value) VALUES (?, ?)", (doc_name, w))
word_id = c.lastrowid
for char in w:
c.execute("INSERT INTO characters (word_id, value) VALUES (?, ?)", (word_id, char))
connection.commit() connection.commit()
c.close() c.close()
@ -49,11 +34,9 @@ def load_file_into_database(path_to_file, connection):
# 建数据库,处理数据入库 # 建数据库,处理数据入库
####################################################### #######################################################
# 获取当前文件所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构造数据库文件的完整路径 # 构造数据库文件的完整路径
current_dir = os.path.dirname(os.path.abspath(__file__))
db_file_path = os.path.join(current_dir, db_filename) db_file_path = os.path.join(current_dir, db_filename)
if os.path.exists(db_file_path): if os.path.exists(db_file_path):
os.remove(db_file_path) os.remove(db_file_path)
@ -69,3 +52,10 @@ with sqlite3.connect(db_file_path) as connection:
c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10") c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10")
for row in c.fetchall(): for row in c.fetchall():
print(row[0], '-', row[1]) print(row[0], '-', row[1])
'''
也可以把数据库看做解决共享数据的竞争死锁的办法
不过本例中的计算太快
用数据库共享数据成本太高
'''

@ -0,0 +1,76 @@
# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify, abort
from functools import lru_cache
from cppy.cp_util import *
from functools import cache
app = Flask(__name__)
# 模拟数据库
books_db = []
# 用于缓存用户数据库的装饰器
@lru_cache(maxsize=None)
def get_books_db():
return books_db
#查询所有资源
@app.route('/books', methods=['GET'])
def get_books():
return jsonify(get_books_db())
#查询某个资源
@app.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = next((book for book in get_books_db() if book['id'] == book_id), None)
if book is None:
abort(404)
return jsonify(book['content'])
# 创建或更新新资源
@app.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
global books_db
book_to_update = request.json
print(book_to_update)
books_db = get_books_db()
book = next((book for book in books_db if book['id'] == book_id), None)
if book is None:
# 如果资源不存在,创建新资源
books_db.append(book_to_update)
else:
# 如果资源存在,更新资源
book.update(book_to_update)
# 清除缓存的数据库
cache.delete(get_books_db)
return jsonify(books_db), 200
#操作一个资源
@app.route('/books/<int:book_id>/word_frequency', methods=['GET'])
def word_frequency(book_id):
global books_db
book = next((book for book in get_books_db() if book['id'] == book_id), None)
filepath = book['content']
word_list = extract_file_words(filepath)
word_frequency = get_frequencies(word_list)
word_frequency = sort_dict(word_frequency)
print_word_freqs(word_frequency)
return jsonify(word_frequency), 200
@app.route('/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
global books_db
books_db = [book for book in books_db if book['id'] != book_id]
if len(books_db) == len([l for l in books_db if l['id'] == book_id]):
abort(404) # 用户不存在
return jsonify({'message': f'book {book_id} deleted'}), 200
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
import requests
from cppy.cp_util import *
# 查询资源,得到空列表
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 创建一个1号资源
print('创建一个1号资源')
book_1 = {"id": 1, "title": "Python编程:从入门到实践", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/1'
response = requests.put(url,json=book_1)
time.sleep(2)
# - 创建一个2号资源修改testfilepaht变量
print('创建一个2号资源')
testfilepath = testfilepath.replace('Prey.txt','Pride-and-Prejudice.txt')
book_2 = {"id": 2, "title": "深入浅出计算机组成原理", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/2'
response = requests.put(url,json=book_2)
time.sleep(2)
# - 创建一个3号资源修改testfilepaht变量正好有3个文件
print('创建一个3号资源')
testfilepath = testfilepath.replace('Pride-and-Prejudice.txt','test.txt')
book_3 = {"id": 3, "title": "算法导论", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/3'
response = requests.put(url,json=book_3)
time.sleep(2)
# - 查询资源,看到结果
print('查询资源,看到结果')
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 操作1号资源得到词频
print('操作1号资源得到词频')
url = 'http://127.0.0.1:5000/books/1/word_frequency'
response = requests.get(url)
print_word_freqs(response.json())

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from functools import reduce
stop_words = get_stopwords()
# map - reduce
def process_chunk(chunk): # 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(count1,count2):
return count1 + count2
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 使用 map 方法和 process_chunk 函数处理每个分区
counts_list = list(map(process_chunk, chunks))
# 使用 reduce 和 merge_counts 函数统计所有分区的词频
total_counts = (reduce(merge_counts,counts_list))
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from multiprocessing.pool import ThreadPool
#
# 多线程
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
def thread_function(chunk, counts_list):
word_count = process_chunk(chunk)
counts_list.append(word_count)
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 线程池
pool = ThreadPool(len(chunks)) # 随意指定的线程数
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
#
# 多进程: 因为创建进程相比计算过程开销太大,结果最慢
#
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
"""合并多个Counter对象的总和"""
return sum(counts_list, Counter())
@timing_decorator
def main():
# 读取文件内容,分割文件内容为多个块,每个块由一个进程处理
chunks = get_chunks(testfilepath,1000)
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -2,24 +2,28 @@ import concurrent.futures
from collections import Counter from collections import Counter
import cppy.cp_util as util import cppy.cp_util as util
'''
concurrent.futures模块为Python中的并发编程提供了一个统一接口,
这个模块隐藏了低层次的线程和进程创建同步和清理的细节,提供了一个更高层次的API来处理并发任务
当前版本推荐它与asyncio模块结合使用完成Python中的各种异步编程任务
'''
stop_words = util.get_stopwords()
class WordFrequencyAgent: class WordFrequencyAgent:
def __init__(self, words): def __init__(self, words):
self.words = words self.words = words
def compute_word_frequency(self): def compute_word_frequency(self):
self.word_freq = Counter(self.words) words = [ w for w in self.words if ( not w in stop_words ) and len(w) >= 3 ]
self.word_freq = Counter( words)
def get_word_frequency(self): def get_word_frequency(self):
return self.word_freq return self.word_freq
# 将文本分割成多个部分并为每个部分创建一个Agent # 将文本分割成多个部分并为每个部分创建一个Agent
def create_agents(words, num_agents = 4 ): def create_agents( words ):
text_chunks = [ words[i::num_agents] for i in range(num_agents) ] return [ WordFrequencyAgent(chunk) for chunk in words ]
agents = [ WordFrequencyAgent(chunk) for chunk in text_chunks ]
return agents
def compute_all_word_frequencies(agents): def compute_all_word_frequencies(agents):
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
@ -27,13 +31,7 @@ def compute_all_word_frequencies(agents):
future_to_agent = {executor.submit(agent.compute_word_frequency): agent for agent in agents} future_to_agent = {executor.submit(agent.compute_word_frequency): agent for agent in agents}
for future in concurrent.futures.as_completed(future_to_agent): for future in concurrent.futures.as_completed(future_to_agent):
agent = future_to_agent[future] agent = future_to_agent[future]
try: data = future.result() # 词频被保存在agent中
# 获取计算结果,但不处理异常
data = future.result()
except Exception as exc:
print(f'生成 {agent.text_chunk[:10]}... 的词频时出错: {exc}')
# 词频已经被保存在agent中
# 所有Agent计算完成后合并它们的词频结果 # 所有Agent计算完成后合并它们的词频结果
def merge_word_frequencies(agents): def merge_word_frequencies(agents):
@ -42,11 +40,13 @@ def merge_word_frequencies(agents):
merged_freq.update(agent.get_word_frequency()) merged_freq.update(agent.get_word_frequency())
return merged_freq return merged_freq
@util.timing_decorator
if __name__ == '__main__': def main():
words = util.extract_file_words(util.testfilepath) # 从文本抽词 words = util.get_chunks(util.testfilepath)
agents = create_agents(words) # 创建代理 agents = create_agents(words) # 创建代理
compute_all_word_frequencies(agents) # 计算 compute_all_word_frequencies(agents) # 计算
merged_word_freq = merge_word_frequencies(agents) # 合并结果 merged_word_freq = merge_word_frequencies(agents) # 合并结果
for (w, c) in merged_word_freq.most_common(10): # 排序输出 util.print_word_freqs(merged_word_freq.most_common(10)) # 排序输出
print(w, '-', c)
if __name__ == '__main__':
main()

@ -0,0 +1,46 @@
import sys
import re
from collections import Counter
# 使用 python command_line_1.py testfilepath 10
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 主函数
def main():
# 检查命令行参数数量
if len(sys.argv) != 3:
print("Usage: python command_line_1.py <file_path> <n>")
sys.exit(1)
file_path = sys.argv[1]
n = int(sys.argv[2])
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"File not found: {file_path}")
except ValueError as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,48 @@
import re
from collections import Counter
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 交互式提示用户输入文件路径和前n个单词的数量
def interactive_mode():
file_path = input("请输入文件路径 >> ")
try:
n = int(input("请输入你想要输出的前n个最常见单词的数量 >> "))
if n <= 0:
raise ValueError("数量必须大于0。")
except ValueError as e:
print(f"输入错误:{e}")
return
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"文件未找到: {file_path}")
except Exception as e:
print(f"发生错误: {e}")
# 主函数
def main():
print("欢迎使用词频统计工具。")
interactive_mode()
if __name__ == "__main__":
main()

@ -0,0 +1,30 @@
from flask import Flask, render_template, request, redirect, url_for
from collections import Counter
from cppy.cp_util import *
import os
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 获取上传的文件
file = request.files['file']
# 保存临时文件并读取内容
filename = os.path.join('/temp', file.filename)
file.save(filename)
# 计算词频
words = extract_file_words(filename)
word_counts = Counter(words)
# 删除临时文件
os.remove(filename)
return render_template('result.html', word_counts=word_counts.most_common())
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,14 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Upload Text File</title>
</head>
<body>
<h1>Upload a Text File to Count Word Frequencies</h1>
<form action="/" method="post" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Submit">
</form>
</body>
</html>

@ -0,0 +1,16 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Word Frequencies</title>
</head>
<body>
<h1>Top Word Frequencies:</h1>
<ul>
{% for word, count in word_counts %}
<li>{{ word }}: {{ count }}</li>
{% endfor %}
</ul>
<a href="{{ url_for('index') }}">Back to Upload</a>
</body>
</html>

@ -23,7 +23,7 @@ top_10_words = calculate_word_frequency(testfilepath)
print_word_freqs(top_10_words) print_word_freqs(top_10_words)
''' '''
python 提供了一种缓存调用函数的机制 Python 提供了一个缓存调用函数的装饰器
import functools import functools
# 使用 functools.lru_cache 缓存结果 # 使用 functools.lru_cache 缓存结果

@ -0,0 +1,34 @@
# 创建对象是消耗资源的,如果发现对象已经存在,可以返回引用,不创造新对象 。设计模式中这个做法叫享元
from cppy.cp_util import *
#享元类
class WordFrequencyController():
def __init__(self, controllertype,filepath ):
word_list = extract_file_words(filepath)
word_freq = get_frequencies(word_list)
self.word_freq = sort_dict(word_freq)
self.number = controllertype
def print_word_freqs( self ):
print_word_freqs( self.word_freq,self.number)
#享元工厂
class WordFrequencyControllerFactory():
def __init__(self):
self.types = {}
def get_WordFrequencyController(self, number,testfilepath):
if number not in self.types:
self.types[number] = WordFrequencyController(number,testfilepath) # 创建新的对象
print('new obj: ','*'*30,number)
else:
print('ref obj: ','*'*30,number)
return self.types[number] # 重复使用已存在的对象
if __name__ == "__main__":
factory = WordFrequencyControllerFactory()
for number in [ 1,3,5,3,5,7 ]:
WordFrequency = factory.get_WordFrequencyController(number,testfilepath)
# print(flush=True)
WordFrequency.print_word_freqs()

@ -0,0 +1,64 @@
'''
入门级示例是用来帮助理解其他例子
把观察者挂到自己的处理队列上
适当时机调用所有队列上的约定的观察者的 update 方法
如果观察者有多个职能参与不同的任务链不一定要统一命名update方法
这是一个示例性质的原型具体环境下需要调整
'''
import collections
from abc import ABC, abstractmethod
from cppy.cp_util import *
# 定义观察者接口 ,在 Pyhon中并不是必须
class Observer(ABC):
@abstractmethod
def update(self, word):
pass
# 定义具体观察者类,用于统计词频
class WordFrequencyObserver(Observer):
def __init__(self):
self.word_count = collections.Counter()
def update(self, word):
self.word_count[word] += 1
# 定义主题类
class WordSubject:
def __init__(self):
self.observers = []
def attach(self, observer):
self.observers.append(observer)
def notify(self, word):
for observer in self.observers:
observer.update(word)
# 主函数
def main(testfilepath, top_n = 10 ):
stopwords = get_stopwords()
subject = WordSubject()
# 创建一个观察者并附加到主题
observer = WordFrequencyObserver()
subject.attach(observer)
# 处理文件
wordlist = re_split( read_file(testfilepath) )
for word in wordlist:
if word not in stopwords:
subject.notify(word) # 通知
# 打印最高的N个词频
top_words = observer.word_count.most_common(top_n)
print_word_freqs(top_words)
if __name__ == "__main__":
main( testfilepath )

@ -0,0 +1,69 @@
'''
本例的基本模式还是观察者
基类 Subject 提供注册和提醒注册上的对象提醒机制
因为函数和参数混杂在一起传递使得各个模块的处理结构其实是 case by case
'''
from collections import Counter
from typing import List
from cppy.cp_util import *
class Subject:
def register_handler(self, handler: callable, *args, **kwargs):
self.handler = handler
self.args = args
self.kwargs = kwargs
def notify(self, *args, **kwargs):
self.handler( self.data, *self.args, **self.kwargs)
# 组件一TextLoader - 负责读取文本并过滤停用词
class TextLoader(Subject):
def load_text(self, filename: str) -> List[str]:
return extract_file_words(filename)
def notify(self, *args, **kwargs):
filename = args[0]
self.data = self.load_text(filename)
super().notify(self.data, *args, **kwargs)
# 组件二WordCounter - 计算词频
class WordCounter(Subject):
def count_words(self, words: List[str]) -> dict:
return Counter(words)
def notify(self, *args, **kwargs ):
words = args[0]
self.data = self.count_words(words)
super().notify(self.data, *args, **kwargs)
# 组件三TopWordsPresenter - 排序并输出前10个词
class TopWordsPresenter(Subject):
def notify(self, words,*args, **kwargs):
n = args[0]
top_words = words.most_common(n)
print_word_freqs( top_words )
# 主程序逻辑
def main():
loader = TextLoader()
counter = WordCounter()
presenter = TopWordsPresenter()
# 注册事件处理器
loader.register_handler(counter.notify)
counter.register_handler( presenter.notify,10 )
# 触发加载文本并开始流程
loader.notify(testfilepath)
if __name__ == "__main__":
main()

@ -0,0 +1,86 @@
################ 待整理
'''
注册者 = 观察者
每个组件提供注册消息接口和注册消息动作
在其它单元上注册自己对于特定事件消息的响应函数
同时负责自己的注册队列的序贯调用
Python 中有一个Callable类型可以用来判断是否是可以回调类型
from typing import Callable
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
from cppy.cp_util import *
#
# event_manager
#
class EventManager:
def __init__(self):
self.load_handlers = [] # 用于加载文件的事件处理器
self.process_handlers = [] # 用于处理数据的事件处理器
self.end_handlers = [] # 用于结束流程的事件处理器
def register_load_event(self, handler):
self.load_handlers.append(handler)
def register_process_event(self, handler):
self.process_handlers.append(handler)
def register_end_event(self, handler):
self.end_handlers.append(handler)
# 运行框架,按顺序执行注册的事件处理器
def run(self, file_path):
for handler in self.load_handlers: handler(file_path)
for handler in self.process_handlers: handler()
for handler in self.end_handlers: handler()
#
# 功能组件
#
# 定义数据存储类,用于模拟文件内容的加载和处理
class TextData:
_word_event_handlers = []
def __init__( self, event_manager ):
self._stop_words = get_stopwords()
event_manager.register_load_event(self.__load)
event_manager.register_process_event(self.__process_words)
def __load(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def __process_words(self):
for word in self._data:
if word not in self._stop_words:
for handler in self._word_event_handlers:
handler(word)
def register_word_event(self, handler):
self._word_event_handlers.append(handler)
class WordFrequencyCounter:
def __init__(self, event_manager, data_storage):
self._word_freqs = defaultdict(int) # 存储单词频率
data_storage.register_word_event(self.__increment_count) # 注册单词事件
event_manager.register_end_event(self.__print_freqs) # 注册结束事件
def __increment_count(self, word):
self._word_freqs[word] += 1
def __print_freqs(self):
print_word_freqs ( sort_dict (self._word_freqs) )
if __name__ == '__main__':
em = EventManager()
data_storage = TextData(em)
word_freq_counter = WordFrequencyCounter(em, data_storage)
em.run(testfilepath)

@ -0,0 +1,107 @@
################ 待整理
from cppy.cp_util import *
'''
订阅者 = 注册者 = 观察者
注册回调的一个变体
要点是中心化统一化
为了简化消息订阅可能形成的复杂性
提供一个中心消息管理器统一负责消息的订阅和回调
各个功能组件只是完成自己的功能
在中心管理器上订阅消息挂到自己响应的处理函数上
总结相比较的改变
- 注册的时候通过提供一个类型字段标识不同消息
- 其它实体不做注册和做回调统一这两个功能到一个中心单元
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
#################################################
# Event Manager
#################################################
class EventManager:
def __init__(self):
self._subs = defaultdict(list)
def subscribe(self, event_type, handler):
self._subs[event_type].append(handler)
def publish(self, event):
event_type = event[0]
for handle in self._subs.get(event_type, []):
handle(event)
#################################################
# Application Entities
#################################################
class DataStorage:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self._load)
self._event_manager.subscribe('start', self.produce_words)
def _load(self, event):
self._data = extract_file_words( event[1] )
def produce_words(self, _):
for word in self._data:
self._event_manager.publish(('word', word ))
self._event_manager.publish(('eof', None))
class StopWordFilter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load_stop_words)
self._event_manager.subscribe('word', self.filter_word)
self._stop_words = set()
def load_stop_words(self, _ ):
self._stop_words = set( get_stopwords() )
def filter_word(self, event):
word = event[1]
if word not in self._stop_words:
self._event_manager.publish(('valid_word', word))
class WordFrequencyCounter:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('valid_word', self.count_word)
self._event_manager.subscribe('print', self.print_freqs)
self._word_freqs = {}
def count_word(self, event):
word = event[1]
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def print_freqs(self, _ ):
print_word_freqs ( sort_dict (self._word_freqs) )
class WordFrequencyApp:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('run', self.start_application)
self._event_manager.subscribe('eof', self.stop_application)
def start_application(self, event):
path_to_file = event[1]
self._event_manager.publish(('load', path_to_file))
self._event_manager.publish(('start', ))
def stop_application(self, _ ):
self._event_manager.publish(('print', ))
def main():
event_manager = EventManager()
DataStorage( event_manager )
StopWordFilter( event_manager )
WordFrequencyCounter( event_manager )
WordFrequencyApp( event_manager )
event_manager.publish(('run', testfilepath ))
if __name__ == "__main__":
main()

@ -0,0 +1,9 @@
注册
- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。
- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。
- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。
- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。
- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。

@ -0,0 +1,98 @@
################ 待整理
'''
应用场景针对各个组件的 notify 方法发指令来驱动所有工作
这是一个示例性质的原型具体分布式环境下需要调整
notify 用了四种写法是和本主题无关的测试
'''
from cppy.cp_util import *
from collections import defaultdict
badmsg = lambda : exec (''' raise Exception("Message not understood " , action ) ''')
class fff:
def __init__(self, d):
self._data = defaultdict( badmsg )
self._data.update(d)
def __getitem__(self, key):
return self._data[key]
class DataStorageMod():
def __init__(self):
self._data = []
def notify(self, action, *args):
return {
'init': lambda : self._init,
'words': lambda : self._words
}.get( action , badmsg )()(*args)
def _init(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
class StopWordMod():
_stop_words = []
def notify(self, action, *args):
return { 'init': self._init,
'is_stop_word': self._is_stop_word
}[ action ](*args)
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, wordx):
return wordx in self._stop_words
class WordFrequencyMod():
_word_freqs = {}
def notify(self, action, *args):
return fff( {
'increment_count': lambda : self._increment_count,
'sorted': lambda : self._sorted
})[ action ]()(*args)
def _increment_count(self, word):
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sort_dict(self._word_freqs)
class ScenarioManager():
def notify(self, action, *args):
if action == 'init':
return self._init( *args)
elif action == 'run':
return self._run()
else:
raise Exception("Message not understood " + action )
def _init(self, path_to_file):
self._storage_manager = DataStorageMod()
self._stop_word_manager = StopWordMod()
self._word_freq_manager = WordFrequencyMod()
self._storage_manager.notify('init', path_to_file)
self._stop_word_manager.notify('init')
def _run(self):
for word in self._storage_manager.notify('words'):
if not self._stop_word_manager.notify('is_stop_word', word):
self._word_freq_manager.notify('increment_count', word )
word_freqs = self._word_freq_manager.notify('sorted')
print_word_freqs(word_freqs)
if __name__ == '__main__':
sm = ScenarioManager()
sm.notify('init', testfilepath)
sm.notify('run')

@ -0,0 +1,24 @@
from cppy.cp_util import *
# 这个例子没有实际意义,是用来帮助理解其他例子
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
def readfile(file_path, func):
data = read_file(file_path)
func(data, frequencies)
def extractwords(str_data,func):
func(extract_str_words(str_data), sort)
def frequencies(word_list, func):
wf = get_frequencies(word_list)
func(wf, printall)
def sort(wf, func):
func(sort_dict(wf), None)
def printall(word_freqs, _ ):
print_word_freqs(word_freqs)
if __name__ == "__main__":
readfile(testfilepath, extractwords)

@ -0,0 +1,102 @@
'''
后续组件挂载到前序组件后续链上
仅提供 self.next_observer 的抽象关系
后续组件接到指令和数据自己决定动作
理论上每个组件可以参与到多个生产队列
本例使用了类来封装消息相对于字符串理论上提供了更丰富的扩展可能
这是一个示例性质的原型具体环境下需要调整
'''
from collections import Counter
from typing import List, Dict
from cppy.cp_util import *
# 定义消息类型
class Message:
def __init__(self, data):
self.data = data
class TokenizedText(Message):
pass
class FilteredText(Message):
pass
class WordFrequency(Message):
pass
# 定义观察者接口
class Observer:
def notify(self, message: Message):
pass
# 切词订阅者
class TokenizerSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message.data, str):
return
tokenized_text = re_split(message.data)
self.next_observer.notify(TokenizedText(tokenized_text))
# 停用词订阅者
class StopWordsRemoverSubscriber(Observer):
def __init__(self, next_observer: Observer, stop_words: List[str]):
self.next_observer = next_observer
self.stop_words = set(stop_words)
def notify(self, message: Message):
if not isinstance(message, TokenizedText):
return
filtered_text = [word for word in message.data if word not in self.stop_words and len(word)>2 ]
self.next_observer.notify(FilteredText(filtered_text))
# 词频统计订阅者
class WordFrequencyCalculatorSubscriber(Observer):
def __init__(self, next_observer: Observer):
self.next_observer = next_observer
def notify(self, message: Message):
if not isinstance(message, FilteredText):
return
word_freq = Counter(message.data)
self.next_observer.notify( WordFrequency(word_freq) )
# 输出前N个词订阅者
class TopNWordsDisplaySubscriber(Observer):
def __init__(self, n: int):
self.n = n
def notify(self, message: Message):
if not isinstance(message, WordFrequency):
return
print_word_freqs( message.data.most_common(self.n) )
# 模拟发布者
def publish_text(text: str, observers: List[Observer]):
for observer in observers:
observer.notify(Message(text))
# 主函数
def main():
text = read_file()
stop_words = get_stopwords()
# 创建订阅者链
display_subscriber = TopNWordsDisplaySubscriber( n=10 )
freq_subscriber = WordFrequencyCalculatorSubscriber(display_subscriber)
stop_words_subscriber = StopWordsRemoverSubscriber(freq_subscriber, stop_words)
tokenizer_subscriber = TokenizerSubscriber(stop_words_subscriber)
# 发布文本
publish_text(text, [tokenizer_subscriber])
if __name__ == "__main__":
main()

@ -1,26 +1,34 @@
################ 待整理
'''
多线程各个模块比较乱的但是协作序贯的完成了数据处理
各个组件完全不能互操作仅依靠队列发消息进行协作
适合环节多数据可分块有IO-计算性能设计考量要求让各个模块自己适应调整
在某些情况下可以避免复杂的控制流设计使代码简洁
'''
from threading import Thread from threading import Thread
from queue import Queue from queue import Queue
from cppy.cp_util import * from cppy.cp_util import *
class ActiveWFObject(Thread): class ThreadObject(Thread):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.queue = Queue() self.queue = Queue()
self._stopMe = False self._over = False
self.start() self.start()
def run(self): def run(self):
while not self._stopMe: while not self._over:
message = self.queue.get() message = self.queue.get()
self._dispatch(message) self._dispatch(message)
if message[0] == 'die': if message[0] == 'over':
self._stopMe = True break
def send(receiver, message): def send(receiver, message):
receiver.queue.put(message) receiver.queue.put(message)
class DataStorageManager(ActiveWFObject): class TxtManager(ThreadObject):
""" Models the contents of the file """
_data = '' _data = ''
def _dispatch(self, message): def _dispatch(self, message):
@ -29,22 +37,20 @@ class DataStorageManager(ActiveWFObject):
elif message[0] == 'send_word_freqs': elif message[0] == 'send_word_freqs':
self._process_words(message[1:]) self._process_words(message[1:])
else: else:
# forward
send(self._stop_word_manager, message) send(self._stop_word_manager, message)
def _init(self, message): def _init(self, message):
path_to_file = message[0] self._data = extract_file_words(message[0])
self._stop_word_manager = message[1] self._stop_word_manager = message[1]
self._data = extract_file_words(path_to_file)
def _process_words(self, message): def _process_words(self, message):
recipient = message[0] recipient = message[0]
for w in self._data: for w in self._data:
send(self._stop_word_manager, ['filter', w]) send(self._stop_word_manager, ['filter', w])
send(self._stop_word_manager, ['top10', recipient]) send(self._stop_word_manager, ['topWord', recipient])
class StopWordManager(ActiveWFObject):
""" Models the stop word filter """ class FilterManager(ThreadObject):
_stop_words = [] _stop_words = []
def _dispatch(self, message): def _dispatch(self, message):
@ -53,7 +59,6 @@ class StopWordManager(ActiveWFObject):
elif message[0] == 'filter': elif message[0] == 'filter':
return self._filter(message[1:]) return self._filter(message[1:])
else: else:
# forward
send(self._word_freqs_manager, message) send(self._word_freqs_manager, message)
def _init(self, message): def _init(self, message):
@ -65,31 +70,29 @@ class StopWordManager(ActiveWFObject):
if word not in self._stop_words: if word not in self._stop_words:
send(self._word_freqs_manager, ['word', word]) send(self._word_freqs_manager, ['word', word])
class WordFrequencyManager(ActiveWFObject): class WFManager(ThreadObject):
""" Keeps the word frequency data """
_word_freqs = {} _word_freqs = {}
def _dispatch(self, message): def _dispatch(self, message):
if message[0] == 'word': if message[0] == 'word':
self._increment_count(message[1:]) self._increment_count(message[1:])
elif message[0] == 'top10': elif message[0] == 'topWord':
self._top10(message[1:]) self._topWord(message[1:])
def _increment_count(self, message): def _increment_count(self, message):
word, = message word, = message
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1 self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def _top10(self, message): def _topWord(self, message):
recipient = message[0] recipient = message[0]
freqs_sorted = sort_dict ( self._word_freqs ) freqs_sorted = sort_dict ( self._word_freqs )
send(recipient, ['top10', freqs_sorted]) send(recipient, ['topWord', freqs_sorted])
class WordFrequencyController(ActiveWFObject):
class MyController(ThreadObject):
def _dispatch(self, message): def _dispatch(self, message):
if message[0] == 'run': if message[0] == 'run':
self._run(message[1:]) self._run(message[1:])
elif message[0] == 'top10': elif message[0] == 'topWord':
self._display(message[1:]) self._display(message[1:])
else: else:
raise Exception("Message not understood " + message[0]) raise Exception("Message not understood " + message[0])
@ -101,20 +104,20 @@ class WordFrequencyController(ActiveWFObject):
def _display(self, message): def _display(self, message):
word_freqs, = message word_freqs, = message
print_word_freqs( word_freqs) print_word_freqs( word_freqs)
send(self._storage_manager, ['die']) send(self._storage_manager, ['over'])
self._stopMe = True self._over = True
if __name__ == '__main__': if __name__ == '__main__':
word_freq_manager = WordFrequencyManager() word_freq_manager = WFManager()
stop_word_manager = StopWordManager() stop_word_manager = FilterManager()
storage_manager = DataStorageManager() storage_manager = TxtManager()
wfcontroller = MyController()
send(stop_word_manager, ['init', word_freq_manager])
send(storage_manager, ['init', testfilepath, stop_word_manager]) send(storage_manager, ['init', testfilepath, stop_word_manager])
send(stop_word_manager, ['init', word_freq_manager])
wfcontroller = WordFrequencyController()
send(wfcontroller, ['run', storage_manager]) send(wfcontroller, ['run', storage_manager])
# Wait for the active objects to finish # 等待所有管理器完成工作
[t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]] threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]
for thread in threads: thread.join()

@ -0,0 +1,25 @@
import requests
from cppy.cp_util import *
def main():
# 读测试文件的内容
content = read_file()
# 抽词
tokenize_response = requests.post("http://localhost:7770/tokenize", json={"text": content})
words = tokenize_response.json()["words"]
# 计算词频
count_response = requests.post("http://localhost:7771/count", json={"words": words})
word_count = count_response.json()["word_count"]
# 排序
sort_response = requests.post("http://localhost:7772/sort", json={"word_count": word_count})
top_10_words = sort_response.json()["top_10_words"]
print("Top 10 words:")
print_word_freqs(top_10_words)
if __name__ == "__main__":
main()

@ -0,0 +1,14 @@
from fastapi import FastAPI
from collections import Counter
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/count")
async def count(words_list: dict): # {"words": ["word1", "word2", ...]}
word_count = Counter(words_list["words"])
return {"word_count": dict(word_count)}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7771)

@ -0,0 +1,13 @@
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.post("/sort")
async def sort(word_count_dict: dict):
sorted_word_count = sorted(word_count_dict["word_count"].items(), key=lambda x: x[1], reverse=True)
top_10_words = sorted_word_count[:10]
return {"top_10_words": top_10_words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7772)

@ -0,0 +1,13 @@
from fastapi import FastAPI
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/tokenize")
async def tokenize(text: str):
words = extract_str_words(text)
return {"words": words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7770)

@ -0,0 +1,5 @@
[Plugins]
;; Options: plugins/f1.pyc, plugins/f2.pyc
frequencies = plugins/f2.pyc

@ -0,0 +1,30 @@
import configparser, importlib.machinery
from cppy.cp_util import *
class PluginManager:
def __init__(self):
self.plugins = {}
def load_plugins(self):
_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(_dir)
config = configparser.ConfigParser()
config.read("config.ini")
frequencies_plugin = config.get("Plugins", "frequencies")
# 加载插件
self.plugins['word_freqs'] = importlib.machinery.SourcelessFileLoader('', frequencies_plugin).load_module()
def get_plugin(self, name):
return self.plugins.get(name)
# 创建 PluginManager 实例
plugin_manager = PluginManager()
plugin_manager.load_plugins()
wordlist = extract_file_words(testfilepath) # 提取文件中的单词
word_freqs = plugin_manager.get_plugin('word_freqs').top_word(wordlist) # 调用实例方法
print_word_freqs(word_freqs) # 打印词频

@ -0,0 +1,28 @@
import py_compile
py_compile.compile('f1.py')
py_compile.compile('f2.py')
import os
import shutil
# 设置源目录和目标目录
source_dir = os.path.join(os.path.dirname(__file__), '__pycache__') # 当前目录下的 __pycache__ 目录
target_dir = os.path.join(os.path.dirname(__file__), '..', 'plugins') # 上一级目录下的 plugins 目录
# 确保目标目录存在
os.makedirs(target_dir, exist_ok=True)
# 遍历源目录中的所有 .pyc 文件
for filename in os.listdir(source_dir):
if filename.endswith('.pyc'):
# 提取文件名的前两个字符
new_filename = filename[:2]
# 构建源文件和目标文件的完整路径
source_file = os.path.join(source_dir, filename)
target_file = os.path.join(target_dir, new_filename + '.pyc')
# 拷贝文件
shutil.copyfile(source_file, target_file)
# 删除原始文件
os.remove(source_file)
print(f"Copied {filename} to {target_file} and removed original file.")

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import operator import operator
def top25(word_list): def top_word(word_list):
word_freqs = {} word_freqs = {}
for w in word_list: for w in word_list:
if w in word_freqs: if w in word_freqs:

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
import collections
def top_word(word_list):
counts = collections.Counter( word_list )
return counts.most_common(10)

@ -0,0 +1,16 @@
import cppy.cp_util as util
def extract_words(path_to_file:str) -> list:
return util.extract_file_words(path_to_file)
def frequencies( word_list:list ) -> dict :
return util.get_frequencies(word_list)
def sort(word_freq:dict) -> list :
return util.sort_dict(word_freq)
if __name__ == "__main__":
word_freqs = sort( frequencies(extract_words( util.testfilepath )) )
util.print_word_freqs(word_freqs)

@ -0,0 +1,36 @@
from cppy.cp_util import *
from dataclasses import dataclass
from collections import Counter
import re
@dataclass
class WordFrequency:
text: str
stop_words: set = None
def __post_init__(self):
# 如果未提供停用词表
if self.stop_words is None:
self.stop_words = get_stopwords()
def tokenize(self):
# 分词并去除停用词
words = re.findall(r'\b\w+\b', self.text.lower())
filtered_words = [word for word in words if word not in self.stop_words and len(word)>2]
return filtered_words
def get_top_n(self, n=10):
# 计算词频
word_freqs = Counter(self.tokenize())
return word_freqs.most_common(n)
# 使用示例
if __name__ == '__main__':
# 创建WordFrequency实例
text = read_file()
word_freq = WordFrequency( text )
# 获取并打印词频
top_words = word_freq.get_top_n()
print_word_freqs(top_words)

@ -3,25 +3,25 @@ from cppy.cp_util import *
def extract_words(path_to_file): def extract_words(path_to_file):
assert(type(path_to_file) is str), "I need a string!" assert(type(path_to_file) is str), "Must be a string!"
assert(path_to_file), "I need a non-empty string!" assert(path_to_file), "Must be a non-empty string!"
try: try:
with open(path_to_file,encoding='utf-8') as f: with open(path_to_file,encoding='utf-8') as f:
str_data = f.read() str_data = f.read()
except IOError as e: except IOError as e:
print("I/O error({0}) when opening {1}: {2}! I quit!".format(e.errno, path_to_file, e.strerror)) print("I/O error({0}) when opening {1}: {2}".format(e.errno, path_to_file, e.strerror))
raise e raise e
return re_split(str_data) return re_split(str_data)
def remove_stop_words(word_list): def remove_stop_words(word_list):
assert(type(word_list) is list), "I need a list!" assert(type(word_list) is list), "Must be a list!"
try: try:
stop_words = get_stopwords() stop_words = get_stopwords()
except IOError as e: except IOError as e:
print("I/O error({0}) opening stops_words.txt: {1}! I quit!".format(e.errno, e.strerror)) print("I/O error({0}) opening stops_words.txt: {1}".format(e.errno, e.strerror))
raise e raise e
return [w for w in word_list if not w in stop_words] return [w for w in word_list if not w in stop_words]

@ -0,0 +1,25 @@
from cppy.cp_util import *
def extractWords(path_to_file):
assert(type(path_to_file) is str), "Must be a string"
assert(path_to_file), "Must be a non-empty string"
return extract_file_words(path_to_file)
def frequencies(word_list):
assert(type(word_list) is list), "Must be a list"
assert(word_list != []), "Must be a non-empty list"
return get_frequencies(word_list)
def sort(word_freqs):
assert(type(word_freqs) is dict), "Must be a dictionary"
assert(word_freqs != {}), "Must be a non-empty dictionary"
return sort_dict(word_freqs)
if __name__ == '__main__':
try:
word_freqs = sort(frequencies(extractWords( testfilepath )))
print_word_freqs(word_freqs)
except Exception as e:
print(" Something wrong: {0}".format(e) )

@ -56,5 +56,4 @@ state_machine = WordFrequencyStateMachine(util.testfilepath)
word_frequencies = state_machine.run() word_frequencies = state_machine.run()
# 打印结果 # 打印结果
for word, freq in word_frequencies.most_common(10): util.print_word_freqs(word_frequencies.most_common(10))
print(f"{word}: {freq}")

@ -0,0 +1,192 @@
import site
import os, re, time
import string, operator
################################################################################
# 变量
################################################################################
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy', 'data', 'stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy', 'data', testfilename)
################################################################################
# 项目函数
################################################################################
def read_file(path_to_file):
"""
读取指定文件的内容
Args:
path_to_file (str): 文件路径
Returns:
str: 文件内容
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read()
return data
def re_split(data):
"""
使用正则表达式分割字符串将非字母字符替换为空格并将所有字符转换为小写
Args:
data (str): 输入字符串
Returns:
list: 分割后的单词列表
"""
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords(path_to_file=stopwordfilepath):
"""
获取停用词列表
Args:
path_to_file (str): 停用词文件路径默认为 stopwordfilepath
Returns:
list: 停用词列表
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read().split(',')
data.extend(list(string.ascii_lowercase))
return data
def get_chunks(file_path=testfilepath, chunk_size=1000):
"""
将文件内容分割成多个块
Args:
file_path (str): 文件路径默认为 testfilepath
chunk_size (int): 每个块的大小默认为 1000
Returns:
list: 分割后的块列表
"""
content = re_split(read_file(file_path))
chunks = [
content[i:i + chunk_size] for i in range(0, len(content), chunk_size)
]
return chunks
def extract_file_words(path_to_file):
"""
提取文件中的单词去除停用词和长度小于3的单词
Args:
path_to_file (str): 文件路径
Returns:
list: 提取后的单词列表
"""
word_list = re_split(read_file(path_to_file))
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def extract_str_words(data_str):
"""
提取字符串中的单词去除停用词和长度小于3的单词
Args:
data_str (str): 输入字符串
Returns:
list: 提取后的单词列表
"""
word_list = re_split(data_str)
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def count_word(word, word_freqs, stopwords):
"""
统计单词频率
Args:
word (str): 单词
word_freqs (dict): 单词频率字典
stopwords (list): 停用词列表
"""
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
"""
获取单词频率
Args:
word_list (list): 单词列表
Returns:
dict: 单词频率字典
"""
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict(word_freq):
"""
对字典进行排序
Args:
word_freq (dict): 单词频率字典
Returns:
list: 排序后的单词频率列表
"""
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
def print_word_freqs(word_freqs, n=10):
"""
打印单词频率
Args:
word_freqs (list): 单词频率列表
n (int): 打印的单词数量默认为 10
"""
for (w, c) in word_freqs[:n]:
print(w, '-', c)
################################################################################
# 通用工具
################################################################################
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 调用原始函数
end_time = time.time() # 记录结束时间
run_time = end_time - start_time # 计算运行时间
print(f"{func.__name__} 运行时间: {run_time*1000:.2f}")
return result
return wrapper
def test():
print('cppy welcome')

@ -0,0 +1,4 @@
## 任务
本项目的主要功能任务:做文本文件的分词,过滤常见词,求词频,并排序输出。

@ -0,0 +1,24 @@
# 目标
本节使用一个书城的各方面业务需求来展示面向对象的常见设计模式 。
# 任务
背景假设为一个综合书城,提供线上线下购买,还经营一个书吧、一个报告厅。
# 说明
面向对象的模式是把编程过程中的一些思路固定化,并给一个名字方便理解 。
它是软件工程中一组经过验证的、可重复使用的代码写法 。
所以,模式不是语法,而是编程思路 。
这样做的好处是,统一大家的代码形式,提高代码可读性、可维护性、可扩展性 。
那为啥,面向过程没有这么做
是因为这个思维提炼过程,充分利用了面向对象语言的特性:封装、继承、多态 。
面向过程语言,没有这些特性,所以,面向过程程序设计一般不谈设计模式 。
因为 Python 对象协议的机制,多态、接口概念发生了根本变化 。使得一些C++、Java 的模式没用了 。比如 “ 原型模式Prototype可以使用copy.deepcopy()非常简便来创建 。另外,很多模式中继承关系也没必要了。但,下面很多示例中依旧保持了基类 。一是致敬经典,二是起到一个工程上更工整和代码注释的作用 。
# 应用场景
面向对象设计模式在管理信息系统和图形用户界面系统应用比较广泛 。

@ -0,0 +1,17 @@
'''
全局只允许一个实例的办法
在该电商系统中全局只有一个数据库连接使用单例模式确保在整个应用程序内只创建一次数据库连接实例
'''
class DatabaseConnection:
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.connect_to_db()
return cls._instance
def connect_to_db(self):
# 连接到数据库的代码...
pass

@ -0,0 +1,31 @@
# 如果有一个选择结构来决定实现不同的类,再面向对象设计里面一般把这个选择做成一个类,叫做工厂模式
# 定义一个ProductFactory类用于创建不同类型的商品实例如电子产品、书籍等。具体的产品由子类实现。
#
class Product:
def __init__(self, name, price):
self.name = name
self.price = price
class Electronic(Product):
def __init__(self, name, price, brand):
super().__init__(name, price)
self.brand = brand
class Book(Product):
def __init__(self, name, price, author):
super().__init__(name, price)
self.author = author
class ProductFactory:
@staticmethod
def create_product(product_type, *args, **kwargs):
if product_type == 'electronic':
return Electronic(*args, **kwargs)
elif product_type == 'book':
return Book(*args, **kwargs)
else:
raise ValueError("Invalid product type")
# 使用工厂方法创建产品
product = ProductFactory.create_product('book', 'Python编程艺术', 50.0, 'Mark Lutz')

@ -0,0 +1,114 @@
'''
建造者模式Builder Pattern允许构建一个复杂对象的各个部分然后一步一步地返回这个对象的完整版本
将建造者模式应用于网购的下单和出库过程时我们可以设计一个Order类来表示订单
以及一个OrderBuilder类来构建订单的各个部分
此外我们还可以引入一个ShoppingCart类来表示购物车以及一个Inventory类来处理库存和出库逻辑
'''
######################################################################
# Order类它包含订单的基本信息如下单时间、用户信息、订单项列表
######################################################################
from datetime import datetime
class OrderItem:
def __init__(self, product_id, quantity):
self.product_id = product_id
self.quantity = quantity
class Order:
def __init__(self, user_id, order_items, order_time=None):
self.user_id = user_id
self.order_items = order_items
self.order_time = order_time or datetime.now()
self.status = "PLACED" # 初始状态为已下单
def __str__(self):
return f"Order for user {self.user_id} placed at {self.order_time}. Status: {self.status}"
def fulfill(self, inventory):
# 出库逻辑,这里简化处理
for item in self.order_items:
if not inventory.deduct_stock(item.product_id, item.quantity):
return False
self.status = "FULFILLED"
return True
######################################################################
# OrderBuilder类用于构建订单
######################################################################
class OrderBuilder:
def __init__(self):
self.reset()
def reset(self):
self._user_id = None
self._order_items = []
def for_user(self, user_id):
self._user_id = user_id
return self
def add_item(self, product_id, quantity):
self._order_items.append(OrderItem(product_id, quantity))
return self
def build(self):
if not self._user_id or not self._order_items:
raise ValueError("Order cannot be built without user and items.")
return Order(self._user_id, self._order_items)
######################################################################
# 购物车和库存类
######################################################################
class ShoppingCart:
def __init__(self, user_id):
self.user_id = user_id
self.items = {} # {product_id: quantity}
def add_to_cart(self, product_id, quantity):
self.items[product_id] = self.items.get(product_id, 0) + quantity
def checkout(self):
order_items = [OrderItem(product_id, quantity) for product_id, quantity in self.items.items()]
self.items.clear() # 清空购物车
return order_items
class Inventory:
def __init__(self):
self.stock = {} # {product_id: quantity}
def add_stock(self, product_id, quantity):
self.stock[product_id] = self.stock.get(product_id, 0) + quantity
def deduct_stock(self, product_id, quantity):
if self.stock.get(product_id, 0) >= quantity:
self.stock[product_id] -= quantity
return True
return False
######################################################################
# 模拟整个下单和出库过程
######################################################################
# 初始化库存和购物车
inventory = Inventory()
inventory.add_stock("book1", 10)
inventory.add_stock("book2", 5)
cart = ShoppingCart(user_id="user123")
cart.add_to_cart("book1", 2)
cart.add_to_cart("book2", 1)
# 使用OrderBuilder构建订单
order_items = cart.checkout() # 结账,获取订单项列表并清空购物车
order_builder = OrderBuilder().for_user("user123")
for item in order_items:
order_builder.add_item(item.product_id, item.quantity)
order = order_builder.build() # 构建订单对象
print(order) # 输出订单信息
# 出库处理
if order.fulfill(inventory):
print("Order has been fulfilled.")
else:
print("Order fulfillment failed due to insufficient stock.")

@ -0,0 +1,55 @@
'''
享元模式Flyweight Pattern可以用来减少对象的创建数量比如对于重复的书籍信息或者频繁请求的书籍分类可以通过享元模式来共享这些信息以提高内存使用效率和系统性能
在下面的代码中BookFlyweight 是享元抽象类它使用了一个类级别的字典 _books 来存储已经创建的书籍对象__new__ 方法被用来在创建新实例之前检查是否已经存在具有相同ISBN的书籍对象如果已经存在就返回那个对象的引用如果不存在就创建一个新对象并将其存储在 _books 字典中
请注意在这个例子中我故意尝试使用相同的ISBN但不同的标题来创建书籍对象以展示不正确的使用方式在真正的享元模式实现中一旦对象被创建并且其内在状态被设置在这个例子中是由ISBN标题和作者定义的就不应该再修改这些状态如果需要处理变化的状态通常会将这部分状态外部化并通过方法的参数传递给享元对象
另外要注意的是享元模式主要适用于大量细粒度对象且这些对象可以共享状态的情况在书籍的例子中ISBN是一个很好的共享状态的键但标题和作者通常不应该在对象创建后被改变因此这个例子更多的是为了展示享元模式的基本结构和原理而不是一个完全贴合实际的实现在实际应用中需要更仔细地设计享元对象的不可变状态和可变状态
'''
# 享元抽象类
class BookFlyweight:
_books = {}
def __new__(cls, isbn, title, author):
# 根据ISBN创建或获取书籍享元对象
if isbn not in cls._books:
cls._books[isbn] = super(BookFlyweight, cls).__new__(cls)
cls._books[isbn].set_book_info(title, author)
return cls._books[isbn]
def set_book_info(self, title, author):
self.title = title
self.author = author
def get_book_info(self):
return f"{self.title} by {self.author}"
# 享元工厂类
class BookFactory:
@staticmethod
def get_book(isbn, title, author):
return BookFlyweight(isbn, title, author)
# 客户端代码
if __name__ == "__main__":
# 使用相同的ISBN创建书籍对象它们应该是同一个对象的引用
book1 = BookFactory.get_book("123456789", "The Great Gatsby", "F. Scott Fitzgerald")
book2 = BookFactory.get_book("123456789", "The Same Book With Different Title?", "F. Scott Fitzgerald")
# 尽管我们试图设置不同的标题但因为ISBN相同所以它们是同一个对象
# 实际上,在这个实现中,我们应该确保在创建对象时就设置好所有必要的属性,并且之后不再修改它们。
# 这里为了演示,我们错误地修改了标题,这不是享元模式的典型用法。
# 在实际应用中,应该避免在享元对象创建后修改其内在状态(除了可能的状态复位)。
print(book1.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
print(book2.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
# 使用不同的ISBN创建书籍对象它们应该是不同的对象
book3 = BookFactory.get_book("987654321", "1984", "George Orwell")
print(book3.get_book_info()) # 输出1984 by George Orwell
# 验证是否是同一个对象
print(book1 is book2) # 输出True
print(book1 is book3) # 输出False

@ -0,0 +1,23 @@
# 装饰器模式允许我们在不修改原有类的基础上,动态地添加额外的功能。
# 就增加功能来说,装饰器模式比生成子类更为灵活。
# 餐吧的顾客可以选择为他们的咖啡添加额外的调料。
class Beverage:
def __init__(self, description):
self.description = description
self.price = 0.0
def cost(self):
return self.price
class CondimentDecorator(Beverage): # 进行装饰
def __init__(self, beverage, description, price_increase):
self.beverage = beverage
self.description = f"{beverage.description}, {description}"
self.price_increase = price_increase
def cost(self):
return self.beverage.cost() + self.price_increase
# 使用装饰器模式
coffee = Beverage("Espresso")
coffee_with_chocolate = CondimentDecorator(coffee, "Chocolate", 0.50)

@ -0,0 +1,42 @@
'''
适配器模式Adapter
应用将一个类的接口转换成客户期望的另一个接口使得原本由于接口不兼容而无法一起工作的类能够一起工作
'''
########################################################################
# 定义一个目标接口Target和一个与之不兼容的类Adaptee
############################################################################
# 目标接口
class Target:
def request(self):
pass
# 需要适配的类
class Adaptee:
def specific_request(self):
print("Called Adaptee's specific_request.")
########################################################################
# 定义一个适配器类Adapter它实现了Target接口并且持有Adaptee的实例
# 从而能够在request方法中调用Adaptee的specific_request方法
# 一个继承,一个当参数加入构造函数
############################################################################
# 适配器
class Adapter(Target):
def __init__(self, adaptee):
self.adaptee = adaptee
def request(self):
# 调用Adaptee的specific_request方法
self.adaptee.specific_request()
if __name__ == "__main__":
# 创建Adaptee实例
adaptee = Adaptee()
# 创建Adapter实例将Adaptee实例作为参数传递
adapter = Adapter(adaptee)
# 客户端调用Target的request方法实际上调用的是Adaptee的specific_request方法
adapter.request()

@ -0,0 +1,60 @@
'''
代理模式Proxy Pattern为其他对象提供一种代理以控制对这个对象的访问
在书城的业务背景中代理模式Proxy Pattern可以应用于多种场景例如实现延迟加载访问控制远程代理等
下面示例展示如何使用代理模式来控制对书城中书籍对象的访问
假设我们有一个Book类代表书城中的书籍和一个BookProxy类作为Book的代理类来控制对书籍的访问
'''
# 书籍类
class Book:
def __init__(self, title, author, price):
self.title = title
self.author = author
self.price = price
self.is_loaded = False # 假设书籍内容初始时是未加载的
def load_content(self):
# 模拟加载书籍内容的过程,这里仅打印一条消息
print(f"Loading content for book '{self.title}' by {self.author}...")
self.is_loaded = True
def display(self):
if not self.is_loaded:
self.load_content()
print(f"Book Title: {self.title}")
print(f"Author: {self.author}")
print(f"Price: {self.price}")
print("Content is loaded and displayed.")
# 书籍代理类
class BookProxy:
def __init__(self, book):
self.book = book
def display(self):
# 在显示书籍信息之前,代理可以控制一些额外的操作
# 比如检查用户权限、记录访问日志等
# 这里我们模拟一个简单的访问控制
print("Checking access permissions...")
# 假设权限检查通过调用实际对象的display方法
self.book.display()
# 客户端代码
if __name__ == "__main__":
# 创建一个书籍对象(假设内容尚未加载)
book = Book("The Great Gatsby", "F. Scott Fitzgerald", 29.99)
# 创建一个书籍代理对象
book_proxy = BookProxy(book)
# 通过代理来访问书籍信息
book_proxy.display()
'''
在这个示例中Book类有一个load_content方法来模拟加载书籍内容的过程以及一个display方法来显示书籍的信息
在实际应用中load_content可能会执行更加复杂的操作如从数据库或远程服务器加载数据
BookProxy类作为代理包装了对Book对象的访问
在这个简单的例子中它在调用display方法之前执行了一个模拟的权限检查
在实际应用中代理类可以执行各种操作如缓存懒加载权限验证等
客户端代码通过创建BookProxy对象来间接访问Book对象而不是直接访问
这种方式提供了一种灵活的控制机制使得可以在不修改原始类的情况下增加额外的功能或控制逻辑
'''

@ -0,0 +1,75 @@
'''
在书城的业务背景中外观模式Facade Pattern可以用于提供一个简化的接口以隐藏系统的复杂性
假设书城提供了多种服务如用户认证购物车管理订单处理等外观模式可以将这些服务整合到一个统一的接口中
使客户端能够更方便地使用这些服务
下面是一个简单的实现代码示例展示如何使用外观模式来整合书城的不同服务
'''
# 用户服务类
class UserService:
def authenticate(self, username, password):
# 这里是用户认证的实现代码
print(f"Authenticating user {username}...")
return True # 假设认证总是成功
# 购物车服务类
class CartService:
def add_to_cart(self, user_id, book_id):
# 这里是将书籍添加到购物车的实现代码
print(f"User {user_id} added book {book_id} to the cart.")
def remove_from_cart(self, user_id, book_id):
# 这里是从购物车中移除书籍的实现代码
print(f"User {user_id} removed book {book_id} from the cart.")
# 订单服务类
class OrderService:
def create_order(self, user_id, cart_items):
# 这里是创建订单的实现代码
print(f"Creating order for user {user_id} with items {cart_items}...")
return "OrderID123" # 假设返回一个订单ID
# 书城外观类
class BookstoreFacade:
def __init__(self):
self.user_service = UserService()
self.cart_service = CartService()
self.order_service = OrderService()
def login_and_add_to_cart(self, username, password, book_id):
if self.user_service.authenticate(username, password):
print("Login successful.")
# 假设用户ID为1实际应用中应该通过认证服务获取
user_id = 1
self.cart_service.add_to_cart(user_id, book_id)
else:
print("Login failed.")
def checkout(self, username, password):
if self.user_service.authenticate(username, password):
print("Login successful.")
# 假设用户ID为1实际应用中应该通过认证服务获取
user_id = 1
# 假设获取购物车项目的方法存在(实际应用中需要实现)
cart_items = self.get_cart_items(user_id)
if cart_items:
order_id = self.order_service.create_order(user_id, cart_items)
print(f"Order created with ID: {order_id}")
else:
print("Your cart is empty.")
else:
print("Login failed.")
def get_cart_items(self, user_id):
# 这里应该有一个方法来获取购物车中的项目,但为了简化示例,我们直接返回一个列表
return [1, 2, 3] # 假设的书籍ID列表
# 客户端代码
if __name__ == "__main__":
bookstore = BookstoreFacade()
# 用户登录并添加书籍到购物车
bookstore.login_and_add_to_cart("alice", "password123", "book456")
# 用户结账创建订单
bookstore.checkout("alice", "password123")

@ -0,0 +1,85 @@
'''
在书城的业务背景中组合模式Composite Pattern可以用于构建树形结构比如书籍的分类结构
每个分类可以包含子分类也可以包含具体的书籍通过这种方式可以方便地管理和遍历整个书籍分类体系
下面是一个简单的实现代码示例展示如何使用组合模式来构建书城的书籍分类结构
'''
from abc import ABC, abstractmethod
# 组件抽象类
class BookComponent(ABC):
@abstractmethod
def add(self, component):
pass
@abstractmethod
def remove(self, component):
pass
@abstractmethod
def display(self, depth):
pass
# 叶子节点:书籍类
class Book(BookComponent):
def __init__(self, title, author):
self.title = title
self.author = author
def add(self, component):
print("Cannot add to a leaf node")
def remove(self, component):
print("Cannot remove from a leaf node")
def display(self, depth):
print("-" * depth + f" {self.title} by {self.author}")
# 复合节点:书籍分类类
class BookCategory(BookComponent):
def __init__(self, name):
self.name = name
self.children = []
def add(self, component):
self.children.append(component)
def remove(self, component):
self.children.remove(component)
def display(self, depth):
print("-" * depth + self.name)
for child in self.children:
child.display(depth + 1)
# 客户端代码
if __name__ == "__main__":
# 创建书籍分类和书籍对象
fiction = BookCategory("Fiction")
non_fiction = BookCategory("Non-Fiction")
novel = Book("The Great Gatsby", "F. Scott Fitzgerald")
biography = Book("Steve Jobs", "Walter Isaacson")
programming = Book("Clean Code", "Robert C. Martin")
# 构建书籍分类结构
fiction.add(novel)
non_fiction.add(biography)
non_fiction.add(programming)
# 创建一个根分类,并将其他分类添加到其中
root = BookCategory("Root")
root.add(fiction)
root.add(non_fiction)
# 显示整个书籍分类结构
root.display(0)
'''
在这个示例中BookComponent 是一个抽象类定义了所有组件无论是分类还是书籍都应该有的方法addremove display
Book 类是叶子节点代表具体的书籍它实现了 BookComponent 接口
add remove 方法对于书籍来说是不适用的因此它们只是打印一条错误消息
BookCategory 类是复合节点代表书籍的分类它可以包含其他分类或书籍因此它实现了 add remove 方法来管理子节点并且实现了 display 方法来显示分类及其子节点的信息
客户端代码创建了一些书籍和分类对象并构建了一个书籍分类结构最后通过调用根分类的 display 方法可以显示整个书籍分类结构
'''

@ -0,0 +1,94 @@
'''
桥接模式Bridge Pattern可以将抽象与实现解耦让它们可以独立变化
这在处理多种分类的书籍时特别有用比如你想在不同的平台上展示这些书籍同时这些书籍还分属不同的分类
下面是一个简单的实现代码示例展示如何使用桥接模式来构建书城的书籍分类与展示平台
'''
# 定义书籍接口
class IBook:
def get_title(self):
pass
def get_author(self):
pass
# 具体书籍实现
class NovelBook(IBook):
def __init__(self, title, author):
self.title = title
self.author = author
def get_title(self):
return self.title
def get_author(self):
return self.author
# 定义抽象分类
class BookCategory:
def __init__(self, name):
self.name = name
self.books = []
def add_book(self, book):
self.books.append(book)
def get_books(self):
return self.books
# 定义抽象展示平台
class DisplayPlatform:
def display(self, book):
pass
# 具体展示平台实现
class WebDisplayPlatform(DisplayPlatform):
def display(self, book):
return f"On the web: {book.get_title()} by {book.get_author()}"
class MobileDisplayPlatform(DisplayPlatform):
def display(self, book):
return f"On mobile: {book.get_title()} by {book.get_author()}"
# 桥接类,将分类与展示平台连接起来
class BookShop:
def __init__(self, category, platform):
self.category = category
self.platform = platform
def show_books(self):
for book in self.category.get_books():
print(self.platform.display(book))
# 客户端代码
if __name__ == "__main__":
# 创建书籍
novel1 = NovelBook("The Great Gatsby", "F. Scott Fitzgerald")
novel2 = NovelBook("1984", "George Orwell")
# 创建分类
fiction_category = BookCategory("Fiction")
fiction_category.add_book(novel1)
fiction_category.add_book(novel2)
# 创建展示平台
web_platform = WebDisplayPlatform()
mobile_platform = MobileDisplayPlatform()
# 创建书城并展示书籍
web_bookshop = BookShop(fiction_category, web_platform)
web_bookshop.show_books()
mobile_bookshop = BookShop(fiction_category, mobile_platform)
mobile_bookshop.show_books()
'''
在这个示例中
IBook 是一个接口定义了书籍应有的行为比如获取标题和作者
NovelBook 是一个具体书籍类实现了 IBook 接口
BookCategory 是一个书籍分类类它可以包含多个书籍实例
DisplayPlatform 是一个抽象展示平台类定义了如何展示书籍
WebDisplayPlatform MobileDisplayPlatform 是具体展示平台类分别实现了 DisplayPlatform 接口以提供不同的展示方式
BookShop 是一个桥接类它将书籍分类与展示平台连接起来通过 show_books 方法可以展示分类中的所有书籍
'''

@ -0,0 +1,66 @@
# 和工厂模式类似,不过这里的结果只是产生不同的类方法
# 设想书店有多种折扣策略,比如“普通会员折扣”、“金牌会员折扣”和“无折扣”。每种折扣策略都是一个具体的策略实现。
from abc import ABC, abstractmethod
########################################################
# 创建折扣策略接口
########################################################
class DiscountStrategy(ABC):
@abstractmethod
def calculate_discount(self, book_price):
pass
########################################################
# 创建实现了DiscountStrategy接口的具体折扣策略类
########################################################
class NoDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price # 无折扣,原价返回
class RegularMemberDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price * 0.9 # 普通会员9折
class GoldMemberDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price * 0.8 # 金牌会员8折
########################################################
# 定义Book类和Bookstore类。Book类包含书籍的信息和价格Bookstore类则使用折扣策略来计算书籍的折后价
########################################################
class Book:
def __init__(self, title, price):
self.title = title
self.price = price
class Bookstore:
def __init__(self, discount_strategy):
self.discount_strategy = discount_strategy
def set_discount_strategy(self, discount_strategy):
self.discount_strategy = discount_strategy
def calculate_final_price(self, book):
discounted_price = self.discount_strategy.calculate_discount(book.price)
return discounted_price
if __name__ == "__main__":
# 创建书籍对象
book = Book("The Great Gatsby", 30.0)
# 创建折扣策略对象
no_discount = NoDiscountStrategy()
regular_discount = RegularMemberDiscountStrategy()
gold_discount = GoldMemberDiscountStrategy()
# 创建书店对象,并设置不同的折扣策略
bookstore = Bookstore(no_discount)
print(f"No Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")
bookstore.set_discount_strategy(regular_discount)
print(f"Regular Member Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")
bookstore.set_discount_strategy(gold_discount)
print(f"Gold Member Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")

@ -0,0 +1,45 @@
# 观察者模式允许一个对象(观察者)监听另一个对象(主题)的状态变化,并在状态变化时得到通知。
# 主类信息发生变化,通知登记的各个对象(把自己当当参数传过去)自行处理变化
# 当购物车中的商品数量发生变化时,库存系统和价格计算系统需要实时更新。
# 观察者模式Observer或发布-订阅模式Publish-Subscribe
import abc
class Observer(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update(self, cart):
pass
class InventorySystem(Observer):
def update(self, cart):
# 更新库存逻辑...
pass
class PriceCalculator(Observer):
def update(self, cart):
# 重新计算总价逻辑...
pass
class ShoppingCart:
def __init__(self):
self._items = {}
self.observers = []
def add_item(self, item_id, quantity):
# 添加商品到购物车并通知所有观察者
self._items[item_id] = quantity
for observer in self.observers:
observer.update(self)
def attach(self, observer):
self.observers.append(observer)
# 创建购物车并添加观察者
cart = ShoppingCart()
inventory_system = InventorySystem()
price_calculator = PriceCalculator()
cart.attach(inventory_system)
cart.attach(price_calculator)
cart.add_item('item1', 2) # 当添加商品时,库存系统和价格计算器都会收到更新通知

@ -0,0 +1,80 @@
'''
状态模式State Pattern允许一个对象在其内部状态改变时改变它的行为
这个模式将状态封装成独立的类并将状态转换的逻辑分散到这些类中从而减少相互间的依赖
以下是一个使用状态模式的简单示例我们将创建一个订单类Order它有几个状态
Placed已下单Paid已支付Fulfilled已履行和Delivered已交付
每个状态都是一个类它们继承自一个抽象状态类OrderState
'''
################################################################################
# 定义抽象状态类和一些具体的状态类
################################################################################
class OrderState:
def handle(self, order):
pass
class PlacedState(OrderState):
def handle(self, order):
print("Order placed. Waiting for payment...")
order.set_state(order.get_paid_state())
class PaidState(OrderState):
def handle(self, order):
print("Order paid. Preparing for fulfillment...")
order.set_state(order.get_fulfilled_state())
class FulfilledState(OrderState):
def handle(self, order):
print("Order fulfilled. Preparing for delivery...")
order.set_state(order.get_delivered_state())
class DeliveredState(OrderState):
def handle(self, order):
print("Order delivered. Process completed.")
################################################################################
# 定义Order类它包含一个对当前状态的引用并且能够通过set_state方法改变其状态
################################################################################
class Order:
def __init__(self):
self._state = None
self.set_state(self.get_placed_state())
def set_state(self, state):
self._state = state
def get_state(self):
return self._state
def get_placed_state(self):
return PlacedState()
def get_paid_state(self):
return PaidState()
def get_fulfilled_state(self):
return FulfilledState()
def get_delivered_state(self):
return DeliveredState()
def process(self):
self._state.handle(self)
################################################################################
# 创建一个Order对象并模拟其状态转换
################################################################################
if __name__ == "__main__":
order = Order()
# 模拟订单处理流程
order.process() # 初始状态为Placed执行后将变为Paid
order.process() # 当前状态为Paid执行后将变为Fulfilled
order.process() # 当前状态为Fulfilled执行后将变为Delivered
order.process() # 当前状态为Delivered执行后不会改变因为Delivered是最终状态
################################################################################
# 这个例子中每个状态类都负责决定下一个状态是什么并在handle方法中触发状态转换。
# Order类不直接知道所有可能的状态转换这些逻辑被封装在状态类中。
# 这使得添加新的状态或修改现有状态的行为变得更加容易因为不需要修改Order类本身。
################################################################################

@ -0,0 +1,71 @@
'''
模板方法模式Template Method
定义算法的骨架而将一些步骤延迟到子类中实现
'''
from abc import ABC, abstractmethod
class AbstractClass(ABC):
def template_method(self):
# 这是一个模板方法,它定义了一个算法的骨架
self.base_operation1()
self.required_operations1()
self.base_operation2()
self.hook1()
self.required_operations2()
self.base_operation3()
self.hook2()
@abstractmethod
def base_operation1(self):
pass
@abstractmethod
def base_operation2(self):
pass
@abstractmethod
def base_operation3(self):
pass
@abstractmethod
def required_operations1(self):
pass
@abstractmethod
def required_operations2(self):
pass
def hook1(self):
pass # 钩子操作,子类可以选择是否覆盖
def hook2(self):
pass # 另一个钩子操作
class ConcreteClass(AbstractClass):
def base_operation1(self):
print("AbstractClass says: I am doing the bulk of the work")
def base_operation2(self):
print("AbstractClass says: But I let subclasses override some operations")
def base_operation3(self):
print("AbstractClass says: But I am doing the bulk of the work anyway")
def required_operations1(self):
print("ConcreteClass says: Implemented Operation1")
def required_operations2(self):
print("ConcreteClass says: Implemented Operation2")
def hook1(self):
print("ConcreteClass says: Overridden Hook1")
def hook2(self):
# 没有覆盖hook2所以它将执行AbstractClass中的空实现
pass
if __name__ == "__main__":
concrete_class = ConcreteClass()
concrete_class.template_method()

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save