Compare commits

...

124 Commits
dev ... dev

Author SHA1 Message Date
zj3D 74b622ee04 526-7
2 weeks ago
zj3D 27c8e450c9 525-23
2 weeks ago
zj3D 464471282f 525-22
2 weeks ago
zj3D ca6f8d97ee 99
2 weeks ago
zj3D a8200ff225 525-22
2 weeks ago
zj3D 56d6c04878 0525-18
2 weeks ago
zj3D 7d03c9e5f0 524-20
2 weeks ago
zj3D 068c0f9bb8 524-3
2 weeks ago
zj3D 3c919aa5b6 524-2
2 weeks ago
zj3D f2729b414f 0524_1
2 weeks ago
zj3D 2398743ab9 532-18
2 weeks ago
zj3D 6b1604413a 0523-1
2 weeks ago
zj3D c29f01ece2 052123
3 weeks ago
zj3D cc841ffaae 0521-4
3 weeks ago
zj3D a270414ff0 0521-3
3 weeks ago
zj3D 8e7c4a3117 521_2
3 weeks ago
zj3D e7f73ef2f4 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
3 weeks ago
zj3D d727f0cba2 0521
3 weeks ago
p46318075 dd11216128 Update readme.MD
3 weeks ago
zj3D 9845e7f38c g04
3 weeks ago
zj3D 29c7d4a635 g02
3 weeks ago
zj3D aaba98303b grok01
3 weeks ago
zj3D 29f26846a0 426
1 month ago
zj3D 3f03b9014d 05
3 months ago
zj3D d782d65fb5 Remove .gitignore file from Git tracking
3 months ago
zj3D 5e83716c4d 04
3 months ago
zj3D d339ef454b 03
3 months ago
zj3D 654dabbb69 02
3 months ago
zj3D 59ea2b479b 01
3 months ago
zj3D 0c531354ce BC01
3 months ago
zj3D dc44e0be4b 04
3 months ago
zj3D 3ea3cbaa23 031
3 months ago
zj3D 0eb30e470f 03
3 months ago
zj3D 4abad6851f 02
3 months ago
zj3D 129f1aaa3f 01
3 months ago
zj3D 617465cec6 07
3 months ago
zj3D 35e58525f1 06
3 months ago
zj3D e5b9607dce 05
3 months ago
zj3D 49bf182cf8 04
3 months ago
zj3D ea53899bbd 04
3 months ago
zj3D 0606fc586c 03
3 months ago
zj3D b77d297f3e 02
3 months ago
zj3D 1712e964cf 02
3 months ago
zj3D 8b9e813ee2 2501
3 months ago
zj3D 7365ebb312 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
3 months ago
zj3D 524a65e492 ABC
3 months ago
p46318075 94800c4b9e Update readme.MD
4 months ago
p46318075 3d0220d49b Merge pull request 'dev' (#17) from pcz4qfnkl/CodePattern:dev into dev
10 months ago
Yao 36afa1d669 refactor: 优化代码,提高可读性和效率
10 months ago
Yao 15736d7393 refactor(code): 优化代码,提高可读性和效率
10 months ago
Yao f170c936d8 feat: 添加了根据关键词爬取天水市人民政府网站上指定日期内新闻标题的功能,并提供了多线程、多进程、协程和异步四种实现方式。
10 months ago
zj3D ceb9955051 ABC
10 months ago
zj3D 4606a87618 0803
10 months ago
zj3D 2a27a2c748 拉萨饭店
1 year ago
zj3D 26b6f4c88b 结构调整25
1 year ago
zj3D fa3e01dedc 大修 12
1 year ago
zj3D 850a3eb772 注册消息
1 year ago
zj3D 8d5c578da8 插件简化
1 year ago
zj3D 88606f2bce 修订 11
1 year ago
zj3D ebe28f7670 修 12
1 year ago
zj3D c8946209bf 大修 10
1 year ago
zj3D 2d46194636 大修 11
1 year ago
zj3D 5099345721 大修10
1 year ago
zj3D cd8186dd68 大修 10
1 year ago
zj3D 50952795a8 大修 9
1 year ago
zj3D 2cac3f2788 over
1 year ago
zj3D 44b0c00567 大修 9
1 year ago
zj3D 83c156a3d5 大修 8
1 year ago
zj3D 31a4dfc8e5 非中心化
1 year ago
zj3D e27ecadb25 流式调用
1 year ago
zj3D 9d74a5c184 大修 8
1 year ago
zj3D a3bc46dae3 大修6
1 year ago
zj3D f2ff5c8d4e 清理
1 year ago
zj3D 18f3901592 大修改6
1 year ago
zj3D 44c1f9eb1e 大修改4
1 year ago
zj3D b86f626e94 大调整2
1 year ago
zj3D fe94d8ed1b 享元修订
1 year ago
zj3D b15c7505f6 调整
1 year ago
zj3D cab45b3281 词频对象设计模式修订
1 year ago
zj3D 4aa6f8469d 调整
1 year ago
p46318075 f8f3f10d2e Add readme.MD
1 year ago
zj3D 7db531d2fc 大调整
1 year ago
zj3D 41a14b6705 设计模式
1 year ago
zj3D ac7fb13827 设计模式
1 year ago
zj3D 1920e47a1c 设计模型
1 year ago
p46318075 c5932334fa Merge pull request '观察者模式' (#15) from p26zockiw/CodePattern:master into dev
1 year ago
zj3D 239c0188d0 调整
1 year ago
p26zockiw c22c921cf8 观察者模式
1 year ago
zj3D bfcaab3439 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
1 year ago
zj3D f52645e7b2 test
1 year ago
p46318075 b6fc9ef4c3 Merge pull request '修改restful模式' (#14) from pbr4nzfkh/CodePattern:dev into dev
1 year ago
pbr4nzfkh 28f60e8216 restful 服务端
1 year ago
pbr4nzfkh fdf6166100 Delete '基本结构/042 restful/tf-35-app.py'
1 year ago
pbr4nzfkh ffdae7d329 restful 客户端
1 year ago
pbr4nzfkh 0b9d4a63d6 Delete '基本结构/042 restful/tf-35-request.py'
1 year ago
pbr4nzfkh ada14b9a7b restful 服务端
1 year ago
pbr4nzfkh c8cd7bbc0c Delete '基本结构/042 restful/tf-35-app.py'
1 year ago
zj3D c99a655997 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
1 year ago
zj3D 950cb41e08 debug
1 year ago
p46318075 f131c63ff4 Merge pull request '修改map-reduce模式' (#13) from pbr4nzfkh/CodePattern:dev into dev
1 year ago
zj3D 2518a5cd85 结构调整
1 year ago
pbr4nzfkh e993c23ed1 Delete '计算设备/map-reduce/tf-32.py'
1 year ago
pbr4nzfkh fb95636bb1 tf-31 map-reduce模式
1 year ago
pbr4nzfkh 285b016a30 tf-92多进程模式
1 year ago
pbr4nzfkh 1ebf2a45fe tf-91多线程模式
1 year ago
pbr4nzfkh 740f5aabff Delete '计算设备/map-reduce/tf_92.py'
1 year ago
pbr4nzfkh 2288c18e8a Delete '计算设备/map-reduce/tf_91.py'
1 year ago
pbr4nzfkh 028c7ddb07 Delete '计算设备/map-reduce/tf-31.py'
1 year ago
zj3D 9bf690d62c 风格统一
1 year ago
zj3D 041fced368 patch
1 year ago
p46318075 254c11c3c9 Merge pull request '修改restful模式' (#12) from pbr4nzfkh/CodePattern:dev into dev
1 year ago
pbr4nzfkh 29dbff26cc restful_app
1 year ago
pbr4nzfkh 4134d794ab Delete '基本结构/042 restful/tf-35-app.py'
1 year ago
pbr4nzfkh d54c43b459 restful-app
1 year ago
pbr4nzfkh 365c8bb76a Delete '基本结构/042 restful/test.txt'
1 year ago
pbr4nzfkh f8055c0044 restful-request
1 year ago
pbr4nzfkh 726a8795c7 Delete '基本结构/042 restful/tf-34.py'
1 year ago
zj3D b4a280c55c 类型申明
1 year ago
zj3D bfbc1120ec 运行时间装饰器
1 year ago
zj3D 3c439ef8d7 update
1 year ago
zj3D 445088fde8 增加一种终端模式
1 year ago
zj3D 0e55cabe5c print
1 year ago
zj3D 856fdcc1e1 1
1 year ago
zj3D 8545ada6c2 111
1 year ago

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
from cppy.cp_util import stopwordfilepath, testfilepath
# 准备停用词表
with open(stopwordfilepath, encoding='utf-8') as f:
stop_words = f.read().split(',')
for letter in 'abcdefghijklmnopqrstuvwxyz':
stop_words.append(letter)
# 读文件,逐行扫描文本,发现词,确定不是停用词,计数
word_freqs = []
for line in open(testfilepath, encoding='utf-8'):
start_char = None
i = 0
for c in line:
if start_char is None:
if c.isalnum():
# 一个单词开始
start_char = i
else:
if not c.isalnum():
# 一个单词结束
found = False
word = line[start_char:i].lower()
# 跳过停用词
if word not in stop_words:
pair_index = 0
# 单词是否第一次出现
for pair in word_freqs:
if word == pair[0]:
pair[1] += 1
found = True
break
pair_index += 1
if not found:
word_freqs.append([word, 1])
# 重置开始标记
start_char = None
i += 1
# 使用冒泡排序对词频进行排序
n = len(word_freqs)
for i in range(n):
for j in range(0, n - i - 1):
if word_freqs[j][1] < word_freqs[j + 1][1]:
word_freqs[j], word_freqs[j + 1] = word_freqs[j + 1], word_freqs[j]
# 打印频率最高的前10个词
for tf in word_freqs[:10]:
print(tf[0], '-', tf[1])
'''
想到哪里写到哪里只会Python语言最基础的语法C语言编程风格
所有逻辑所有细节一盘大棋摆在面前
这样的代码,很难维护后来的阅读者会进入一个一望无际的泥塘.
'''

@ -1,4 +1,5 @@
from cppy.cp_util import * from cppy.cp_util import stopwordfilepath, testfilepath
import string
from collections import Counter from collections import Counter
# 准备词和停用词表 # 准备词和停用词表
@ -7,7 +8,7 @@ stop_words.update(list(string.ascii_lowercase))
# 读取文件并计算单词频率 # 读取文件并计算单词频率
word_freqs = Counter() word_freqs = Counter()
with open(testfilepath,encoding = 'utf8') as f: with open(testfilepath, encoding='utf8') as f:
for line_num, line in enumerate(f, 1): for line_num, line in enumerate(f, 1):
start_char = None start_char = None
for i, c in enumerate(line): for i, c in enumerate(line):
@ -22,10 +23,9 @@ with open(testfilepath,encoding = 'utf8') as f:
# 打印前10个最常见的单词 # 打印前10个最常见的单词
for word, freq in word_freqs.most_common(10): for word, freq in word_freqs.most_common(10):
print(f"{word}-{freq}") print(f"{word}-{freq}")
''' '''
相比 A01
使用collections.Counter来计数单词频率从而简化了代码并提高了效率 使用collections.Counter来计数单词频率从而简化了代码并提高了效率
使用enumerate来获取行号和行内容使用set来存储停用词都有助于提高代码的性能和可读性 使用enumerate来获取行号和行内容使用set来存储停用词都有助于提高代码的性能和可读性
使用most_common方法来获取最常见的单词使输出更为简洁 使用most_common方法来获取最常见的单词使输出更为简洁
''' '''

@ -0,0 +1,16 @@
import re
import collections
from cppy.cp_util import stopwordfilepath, testfilepath
stopwords = set(open(stopwordfilepath, encoding='utf8').read().split(','))
words = re.findall('[a-z]{2,}',
open(testfilepath, encoding='utf8').read().lower())
counts = collections.Counter(w for w in words if w not in stopwords)
for (w, c) in counts.most_common(10):
print(w, '-', c)
'''
熟练的软件工程师会如此简单完成任务
后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法
我们假设是要解决一个相对复杂的问题
'''

@ -1,9 +1,14 @@
'''
给代码片段命名打包成函数用函数名概括问题求解的过程方便阅读理解
'''
import string import string
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
################################
# data # data
data = [] ################################
data = ''
words = [] words = []
word_freqs = [] word_freqs = []
@ -13,17 +18,12 @@ word_freqs = []
def read_file(path_to_file): def read_file(path_to_file):
global data global data
with open(path_to_file,encoding='utf-8') as f: with open(path_to_file,encoding='utf-8') as f:
data = data + list(f.read()) data = f.read()
def filter_chars_and_normalize(): def extractwords():
global data global data
global words global words
for i in range(len(data)): words = data.lower().split()
data[i] = ' ' if not data[i].isalnum() else data[i].lower()
data_str = ''.join(data)
words = words + data_str.split()
with open(stopwordfilepath) as f: with open(stopwordfilepath) as f:
stop_words = set(f.read().split(',')) stop_words = set(f.read().split(','))
stop_words.update(string.ascii_lowercase) stop_words.update(string.ascii_lowercase)
@ -41,7 +41,7 @@ def sort():
if __name__ == "__main__": if __name__ == "__main__":
read_file( testfilepath ) read_file( testfilepath )
filter_chars_and_normalize() extractwords()
frequencies() frequencies()
sort() sort()

@ -0,0 +1,33 @@
'''
消除全局变量限定函数只有输入输出来和外界打交道
如果消除无用的中间变量那么就是典型的数据处理流水线风格:我要在一个函数中完成某个操作然后调用下一个函数进行处理
流水线风格阅读方便提供栈性能优化的可能
如果需要加参数那么需要柯里化方法即Python里面的闭包函数结构
'''
import re
from cppy.cp_util import *
def extractwords(str_data):
pattern = re.compile('[\W_]+')
word_list = pattern.sub(' ', str_data).lower().split()
stop_words = get_stopwords()
return [w for w in word_list if not w in stop_words]
def frequencies(word_list):
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort(word_freq):
return sorted( word_freq.items(), key=lambda x: x[1], reverse=True )
if __name__ == "__main__":
txtcontent = read_file( testfilepath )
word_list = extractwords( txtcontent )
word_freqs = frequencies( word_list )
word_sorts = sort ( word_freqs )
for tf in word_sorts[:10]:
print(tf[0], '-', tf[1])

@ -1,3 +1,8 @@
'''
把一些公共函数分离出去保留中间变量来增加代码可读性
jupyter notebook 做分析常常如此
'''
import re import re
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *

@ -0,0 +1,32 @@
''' 递归是基础编程思维产生的代码结构 。注意递归深度限制只能解决小规模问题 '''
from cppy.cp_util import *
from collections import Counter
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def process_chunks( chunks,word_freqs,x,max ):
next = x + 1
if next < max:
process_chunks(chunks,word_freqs,next,max)
word_list = process_chunk(chunks[x])
word_freqs += Counter(word_list)
# def process_chunks( chunks,word_freqs,x,max ):
# word_list = process_chunk(chunks[x])
# word_freqs += Counter(word_list)
# next = x + 1
# if next < max:
# process_chunks(chunks,word_freqs,next,max)
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,2000)
word_freqs = Counter()
process_chunks( chunks,word_freqs,0,len(chunks) )
print_word_freqs( word_freqs.most_common(10) )

@ -0,0 +1,108 @@
'''
对象化的视角看待问题是抽象出问题中的实体
实体有方法属性程序执行过程就是调动这些实体的交互行为
作为彻底的面向对象的风格主逻辑常常也写做一个类init 产生各个工具run 运行逻辑
本例中实体可理解为几个工具
'''
from collections import Counter
from cppy.cp_util import *
class DataStorageManager:
"""
数据模型读取文件内容并将内容分割成单词
Attributes:
_data: 单词列表
Methods:
_words (self): 返回分割后的单词列表
"""
def __init__(self, path_to_file):
self._data = re_split(read_file(path_to_file))
def words(self):
"""返回分割后的单词列表。"""
return self._data
class StopWordManager:
"""
停用词模型
Attributes:
_stop_words: 停用词列表
Methods:
is_stop_word (self, word): 判断给定单词是否为停用词
"""
def __init__(self):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
"""判断给定单词是否为停用词。"""
return word in self._stop_words
class WordFrequencyManager:
"""
词频模型计算并管理单词的频率
Attributes:
_word_freqs: 使用 Counter 存储单词及其出现次数
Methods:
increment_count (self, word): 计算词频
sorted(self): 返回按出现次数排序的单词列表
"""
def __init__(self):
self._word_freqs = Counter()
def increment_count(self, word):
"""计算词频。"""
self._word_freqs[word] += 1
def sorted(self):
"""返回按出现次数排序的单词列表。"""
return self._word_freqs.most_common()
class WordFrequencyController:
"""
控制器控制整个流程读取文件处理停用词计算词频并输出结果
Attributes:
_storage_manager: DataStorageManager 实例用于读取和处理文件内容
_stop_word_manager: StopWordManager 实例用于管理停用词
_word_freq_manager: WordFrequencyManager 实例用于计算和存储单词频率
Methods:
run(self): 运行方法遍历单词列表过滤掉停用词并计算每个单词的频率最后输出结果
"""
def __init__(self, path_to_file):
self._storage_manager = DataStorageManager(path_to_file)
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
def run(self):
"""运行方法,遍历单词列表,过滤掉停用词,并计算每个单词的频率,最后输出结果。"""
for w in self._storage_manager.words():
if not self._stop_word_manager.is_stop_word(w):
self._word_freq_manager.increment_count(w)
word_freqs = self._word_freq_manager.sorted()
print_word_freqs(word_freqs)
if __name__ == '__main__':
WordFrequencyController(testfilepath).run()
'''
函数输入参数调用后你的马上接住返回值
类输入参数后实例化后你可以需要的时候去访问你需要的数据实例属性
'''

@ -0,0 +1,58 @@
from cppy.cp_util import *
'''
如果认为基于数据和函数的封装就是对象化编程那么类并不是必要的扩展的函数字典也有这个能力
函数利用属性存数据字典有的value是函数有的是value是数据
字典进行对象化程序设计只是缺乏权限控制机制而继承方面可以是通过一个代理模式包装实现
简单使用字典也能满足对象的一些应用场景而且更省资源
'''
def extract_words(obj, path_to_file):
"""
从文件中提取单词并存储在对象的 'data' 字段中
Args:
obj (dict): 存储数据的字典对象
path_to_file (str): 文件路径
"""
obj['data'] = extract_file_words(path_to_file)
def increment_count(obj, w):
"""
增加单词的计数如果单词不存在则将其计数设置为1
参数:
obj (dict): 存储单词频率的字典对象
w (str): 单词
"""
obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w] + 1
# 数据存储对象,包含初始化和获取单词的方法
data_storage_obj = {
'data': [], # 存储单词列表
'init': lambda path_to_file: extract_words(data_storage_obj, path_to_file
), # 初始化方法,提取文件中的单词
'words': lambda: data_storage_obj['data'] # 获取单词列表的方法
}
# 单词频率对象,包含增加计数和排序的方法
word_freqs_obj = {
'freqs': {}, # 存储单词频率的字典
'increment_count':
lambda w: increment_count(word_freqs_obj, w), # 增加单词计数的方法
'sorted': lambda: sort_dict(word_freqs_obj['freqs']) # 获取排序后的单词频率的方法
}
if __name__ == '__main__':
# 初始化数据存储对象,提取文件中的单词
data_storage_obj['init'](testfilepath)
# 遍历单词列表,增加单词的计数
for word in data_storage_obj['words']():
word_freqs_obj['increment_count'](word)
# 获取排序后的单词频率并打印
word_freqs = word_freqs_obj['sorted']()
print_word_freqs(word_freqs)

@ -1,9 +1,13 @@
import abc, re import abc, re
from cppy.cp_util import * from cppy.cp_util import *
# '''
# 接口 如果在基类层面上进行设计这样就能获得多个子类同一层面上的抽象操作
# 在一些语言里面这种模式叫基于接口的编程接口要求纯抽象类
Python 因为对象协议的缘故所以抽象接口设计不是必须
沿用抽象接口设计是为工程性规整
'''
class IDataStorage (metaclass=abc.ABCMeta): class IDataStorage (metaclass=abc.ABCMeta):
@abc.abstractmethod @abc.abstractmethod
def words(self): def words(self):
@ -27,7 +31,7 @@ class IWordFrequencyCounter(metaclass=abc.ABCMeta):
# 类实现 # 类实现
# #
class DataStorageManager1: class DataStorageManager1:
def __init__(self, path_to_file): def __init__(self, path_to_file):
self._data = read_file(path_to_file) self._data = read_file(path_to_file)
self._data = re_split(self._data) self._data = re_split(self._data)
@ -74,8 +78,7 @@ IWordFrequencyCounter.register(subclass=WordFrequencyManager)
# 应用类 # 应用类
# #
class WordFrequencyController: class WordFrequencyController:
def __init__(self, path_to_file): def __init__(self, path_to_file):
# self._storage = DataStorageManager1(path_to_file)
self.storage = DataStorageManager2(path_to_file) self.storage = DataStorageManager2(path_to_file)
self.stop_word_manager = StopWordManager() self.stop_word_manager = StopWordManager()
self.word_freq_counter = WordFrequencyManager() self.word_freq_counter = WordFrequencyManager()

@ -0,0 +1,39 @@
from cppy.cp_util import *
from dataclasses import dataclass
from collections import Counter
import re
# 对象属性是现代 Python编程喜欢的风格
# 这里使用了dataclass来简化代码
@dataclass
class WordFrequency:
text: str
stop_words: set = None
def __post_init__(self):
# 如果未提供停用词表
if self.stop_words is None:
self.stop_words = get_stopwords()
def tokenize(self):
# 分词并去除停用词
words = re.findall(r'\b\w+\b', self.text.lower())
filtered_words = [word for word in words if word not in self.stop_words and len(word)>2]
return filtered_words
def get_top_n(self, n=10):
# 计算词频
word_freqs = Counter(self.tokenize())
return word_freqs.most_common(n)
# 使用示例
if __name__ == '__main__':
# 创建WordFrequency实例
text = read_file()
word_freq = WordFrequency( text )
# 获取并打印词频
top_words = word_freq.get_top_n()
print_word_freqs(top_words)

@ -0,0 +1,26 @@
from cppy.cp_util import *
from collections import Counter
# 定义一个带计数器的元类
class CounterMetaclass(type):
def __new__(mcs, name, bases, attrs):
attrs['_counter'] = Counter()
return super().__new__(mcs, name, bases, attrs)
# 基于元类创建类
class Word( metaclass=CounterMetaclass ):
def __init__(self, word):
self.word = word
self._counter[self.word] += 1
@classmethod
def get_word_freqs(cls,n) -> Counter:
return cls._counter.most_common(n)
for word in extract_file_words ( testfilepath ) : Word(word)
print_word_freqs(Word.get_word_freqs(10))
'''
常用于将依赖项如服务或配置自动注入到类中
'''

@ -0,0 +1,20 @@
from cppy.cp_util import *
#
# 生成器 是一种简单异步实现
#
def non_stop_words(testfilepath):
stopwords = get_stopwords()
data_str = read_file(testfilepath)
wordlist = re_split( data_str )
for word in wordlist:
if word not in stopwords:
yield word # 弹出一个非停用词
freqs = {}
for word in non_stop_words(testfilepath):
freqs[word] = freqs.get(word, 0) + 1
data = sort_dict(freqs)
print_word_freqs(data)

@ -3,6 +3,19 @@ import aiofiles
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
'''
异步指程序运行期间具有挂起然后等待唤醒继续执行的能力
典型应用场景如数据
源源不断产生,比如网络监听
太大,可能大于内存
有连续处理动作当某一环得到想要结果立刻终止后续数据不需要处理
本例读文件的Io还是太快的爬虫
async 是当前版本 Python 官方异步编程的标准库
aiofiles 是一个第三方库提供异步文件操作的功能
'''
async def read_file(file_path): async def read_file(file_path):
async with aiofiles.open(file_path, 'r', encoding='utf-8') as file: async with aiofiles.open(file_path, 'r', encoding='utf-8') as file:
content = await file.read() content = await file.read()
@ -21,7 +34,8 @@ async def main():
top_words = await count_words(text) top_words = await count_words(text)
wordfreqs += top_words wordfreqs += top_words
for word, count in wordfreqs.most_common(10): for word, count in wordfreqs.most_common(10):
print(f"{word}: {count//10}") print(f"{word}: {count//10}") # 突出 Io 的提升价值
# 运行异步主函数 # 运行异步主函数
asyncio.run(main()) asyncio.run(main())

@ -1,5 +1,7 @@
from cppy.cp_util import * from cppy.cp_util import *
# 内省Introspection在编程中是指程序在运行时检查对象变量类型或属性的能力。
# 这是一种动态地了解对象的方法,非常有利于调试、优化代码、实现泛型编程等场景。
class Calculator: class Calculator:
def frequencies(self,word_list): def frequencies(self,word_list):

@ -1,3 +1,8 @@
'''
反射是一种动态地访问或修改对象状态和行为的能力包括其属性方法等
有被 Hacker 攻击的风险
'''
import cppy.cp_util as util import cppy.cp_util as util
import operator import operator

@ -19,19 +19,17 @@ class sortTaskHandler:
# 应用框架 # 应用框架
########################################## ##########################################
def handle_task(task_type,*args): def handle_task(task_type,*args):
handler_class_name = f"{task_type}TaskHandler" # 构建处理器类名 handler_class_name = f"{task_type}TaskHandler" # 构建处理器类名
# 使用globals()获取当前全局符号表
handler_class = globals().get(handler_class_name) handler_class = globals().get(handler_class_name)
if handler_class: if handler_class:
handler = handler_class() # 实例化处理器类 handler = handler_class() # 实例化处理器类
return handler.handle(*args) # 调用处理方法 return handler.handle(*args) # 调用处理方法
else: else:
print(f"No handler found for task type: {task_type}") print(f"No found for task type: {task_type}")
if __name__ == '__main__': word_list = handle_task("words",util.testfilepath)
word_list = handle_task("words",util.testfilepath) word_freq = handle_task("frequencies",word_list)
word_freq = handle_task("frequencies",word_list) word_sort = handle_task("sort",word_freq)
word_sort = handle_task("sort",word_freq) util.print_word_freqs(word_sort)
util.print_word_freqs(word_sort)

@ -0,0 +1,3 @@
from cppy.cp_util import *
print_word_freqs( sort_dict ( get_frequencies ( extract_file_words(testfilepath) )))

@ -0,0 +1,24 @@
from cppy.cp_util import *
# 这个例子没有实际意义,是用来帮助理解其他例子
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
def readfile(file_path, func):
data = read_file(file_path)
func(data, frequencies)
def extractwords(str_data,func):
func(extract_str_words(str_data), sort)
def frequencies(word_list, func):
wf = get_frequencies(word_list)
func(wf, printall)
def sort(wf, func):
func(sort_dict(wf), None)
def printall(word_freqs, _ ):
print_word_freqs(word_freqs)
if __name__ == "__main__":
readfile(testfilepath, extractwords)

@ -0,0 +1,28 @@
from cppy.cp_util import *
# 如果有连续的对数据加工操作,而且总是把共同加工数据对象当第一个参数,可以用本文件夹方法提升阅读体验
# 框架类
class FunBind:
def bind(self, func,*args, **kwargs):
try:
self.data = func(self.data,*args, **kwargs)
except:
self.data = func(*args, **kwargs)
return self
data = FunBind()\
.bind(extract_file_words,testfilepath)\
.bind(get_frequencies)\
.bind(sort_dict)\
.bind(print_word_freqs,10)\
.data
print(data)
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''

@ -0,0 +1,28 @@
from cppy.cp_util import *
'''
函数是自由函数,还是正常的函数写法
使用
- 列举函数名首部参数外的其它参数
- 调用 data 得到最后数据
'''
class FunPipe:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
def __or__(self, other):
_data = self.func(*self.args, **self.kwargs)
return FunPipe( other.func,_data,*other.args,**other.kwargs)
@property
def data(self):
return self.func(*self.args, **self.kwargs)
# 模仿管道
pipe = FunPipe(extract_file_words,testfilepath) | FunPipe(get_frequencies) | FunPipe(sort_dict) | FunPipe(print_word_freqs, 10)
pipe.data

@ -0,0 +1,29 @@
from cppy.cp_util import *
class Flow:
def extract_file_words(self, filepath):
self.data = extract_file_words(filepath)
return self
def get_frequencies(self):
self.data = get_frequencies(self.data)
return self
def sort_dict(self):
self.data = sort_dict(self.data)
return self
def print_word_freqs(self, n):
print_word_freqs(self.data, n)
return self
# 顺序调用
Flow().extract_file_words(testfilepath).get_frequencies().sort_dict().print_word_freqs(10)
'''
连续方法调用看起来比较舒服
但是需要假设
- 每一个类方法返回 self 否则没法连续
- 类方法默认不写第一个参数数据都在 .data 里面
'''

@ -0,0 +1,50 @@
from cppy.cp_util import *
# 装饰器改写类
# - 找到以f_开头的方法
# - 将方法函数的返回值赋值给对象的data属性
# - 返回对象自身
def return_self_decorator(cls):
def return_self(func):
# 定义一个闭包函数,用于接收参数
def wrapper(self, *args, **kwargs):
self.data = func(self, *args, **kwargs)
return self # 返回类自身
return wrapper
for name, method in cls.__dict__.items():
# 判断属性是否可调用且属性名以f_开头
if callable(method) and name.startswith('f_'):
# 为类改写属性,将封装后的函数赋值
setattr(cls, name, return_self(method))
return cls
@return_self_decorator
class Flow():
def test(self):
return 'test'
def f_extract_file_words(self, filepath):
return extract_file_words(filepath)
def f_get_frequencies(self):
return get_frequencies(self.data)
def f_sort_dict(self):
return sort_dict(self.data)
def f_print_word_freqs(self, n):
print_word_freqs(self.data, n)
# 顺序调用
Flow().f_extract_file_words(testfilepath).f_get_frequencies().f_sort_dict().f_print_word_freqs(10)
'''
改写后参与 function flow 功能的方法
- 需要以 'f_' 开头
- 类方法默认不写第一个参数数据都在 .data 里面
仍旧需要特殊的方法写法
所以还是 12种方法比较自然
'''

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from functools import reduce
stop_words = get_stopwords()
# map - reduce
def process_chunk(chunk): # 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(count1,count2):
return count1 + count2
@timing_decorator
def main():
# 读数据按1000个词一组分片
chunks = get_chunks(testfilepath,1000)
# 使用 map 方法和 process_chunk 函数处理每个分区
counts_list = list(map(process_chunk, chunks))
# 使用 reduce 和 merge_counts 函数统计所有分区的词频
total_counts = (reduce(merge_counts,counts_list))
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()
'''
简单的利用函数式风格
'''

@ -0,0 +1 @@
尾调用方式调用函数做连续的数据处理。代码好读方便栈资源优化一些语言已经实现了这种调用方式释放前面的函数栈但Python当前还没做如此优化

@ -0,0 +1 @@
异常主要发生在参数传递和代码块执行过程。一种原则是:软件不能挂掉。检查参数合理性、检查代码块执行可能的错误,并进行合理结果补齐,保持程序继续运行【 1 软件不能挂掉 】,另外一种情况是发生异常就抛出然后终止程序【 2 时间停止在那一刻 】,或者由上层函数接住,集中统一处理。【 3 预判可能的错误 】。

@ -1,27 +1,29 @@
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
#
# 遇到异常,退出程序
#
def extract_words(path_to_file): def extract_words(path_to_file):
assert(type(path_to_file) is str), "I need a string!" assert(type(path_to_file) is str), "Must be a string!"
assert(path_to_file), "I need a non-empty string!" assert(path_to_file), "Must be a non-empty string!"
try: try:
with open(path_to_file,encoding='utf-8') as f: with open(path_to_file,encoding='utf-8') as f:
str_data = f.read() str_data = f.read()
except IOError as e: except IOError as e:
print("I/O error({0}) when opening {1}: {2}! I quit!".format(e.errno, path_to_file, e.strerror)) print("I/O error({0}) when opening {1}: {2}".format(e.errno, path_to_file, e.strerror))
raise e raise e
return re_split(str_data) return re_split(str_data)
def remove_stop_words(word_list): def remove_stop_words(word_list):
assert(type(word_list) is list), "I need a list!" assert(type(word_list) is list), "Must be a list!"
try: try:
stop_words = get_stopwords() stop_words = get_stopwords()
except IOError as e: except IOError as e:
print("I/O error({0}) opening stops_words.txt: {1}! I quit!".format(e.errno, e.strerror)) print("I/O error({0}) opening stops_words.txt: {1}".format(e.errno, e.strerror))
raise e raise e
return [w for w in word_list if not w in stop_words] return [w for w in word_list if not w in stop_words]

@ -2,7 +2,7 @@ import re, operator, string
from cppy.cp_util import * from cppy.cp_util import *
# #
# The functions # 遇到异常,给出一个能让程序继续运行下去的默认值
# #
def extract_words(path_to_file): def extract_words(path_to_file):
try: try:

@ -0,0 +1,27 @@
from cppy.cp_util import *
#
# 用断言从事发点给出出错的准确信息
#
def extractWords(path_to_file):
assert(type(path_to_file) is str), "Must be a string"
assert(path_to_file), "Must be a non-empty string"
return extract_file_words(path_to_file)
def frequencies(word_list):
assert(type(word_list) is list), "Must be a list"
assert(word_list != []), "Must be a non-empty list"
return get_frequencies(word_list)
def sort(word_freqs):
assert(type(word_freqs) is dict), "Must be a dictionary"
assert(word_freqs != {}), "Must be a non-empty dictionary"
return sort_dict(word_freqs)
if __name__ == '__main__':
try:
word_freqs = sort(frequencies(extractWords( testfilepath )))
print_word_freqs(word_freqs)
except Exception as e:
print(" Something wrong: {0}".format(e) )

@ -1,7 +1,11 @@
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
# 缓存系统,使用字典来存储文件路径和对应的词频结果; 可以存到本地磁盘,就可重复利用计算结果 '''
如果函数设计干净没有修改全局变量那么相同的调用参数就会得到必然相同的结果
缓存把每次计算输入输出存起来如果后面有相同的任务调用参数相同就不用重复计算了
'''
word_freq_cache = {} word_freq_cache = {}
def calculate_word_frequency(file_path): def calculate_word_frequency(file_path):
@ -23,9 +27,9 @@ top_10_words = calculate_word_frequency(testfilepath)
print_word_freqs(top_10_words) print_word_freqs(top_10_words)
''' '''
python 提供了一种缓存调用函数的机制 Python 提供了一个缓存调用函数的装饰器
import functools import functools
# 使用 functools.lru_cache 缓存结果 # 使用 functools.lru_cache 缓存结果
@functools.lru_cache(maxsize=None) @functools.lru_cache(maxsize=None)
def calculate_word_frequency(file_path): def calculate_word_frequency(file_path):

@ -0,0 +1,36 @@
# 创建对象是消耗资源的,如果发现对象已经存在,可以返回引用,不创造新对象 。设计模式中这个做法叫享元
# 可以降低资源需求和提升响应速度。更常见的该模式使用场景是各种资源池。
from cppy.cp_util import *
#享元类
class WordFrequencyController():
def __init__(self, controllertype,filepath ):
word_list = extract_file_words(filepath)
word_freq = get_frequencies(word_list)
self.word_freq = sort_dict(word_freq)
self.number = controllertype
def print_word_freqs( self ):
print_word_freqs( self.word_freq,self.number)
#享元工厂
class WordFrequencyControllerFactory():
def __init__(self):
self.types = {}
def get_WordFrequencyController(self, number,testfilepath):
if number not in self.types:
self.types[number] = WordFrequencyController(number,testfilepath) # 创建新的对象
print('new obj: ','*'*30,number)
else:
print('ref obj: ','*'*30,number)
return self.types[number] # 重复使用已存在的对象
if __name__ == "__main__":
factory = WordFrequencyControllerFactory()
for number in [ 1,3,5,3,5,7 ]:
WordFrequency = factory.get_WordFrequencyController(number,testfilepath)
# print(flush=True)
WordFrequency.print_word_freqs()

@ -0,0 +1,64 @@
'''
模板方法模式 (Template Method Pattern)
适用场景定义词频统计的整体流程骨架允许子类自定义某些步骤如分词或输出格式
如何应用
定义一个抽象类 WordFrequencyAnalyzer包含固定的流程读取文件 -> 分词 -> 统计 -> 输出
具体步骤如分词或输出由子类实现
优点保证流程一致子类只关注具体实现
知识点
理解解继承与抽象类的使用
如何定义固定流程允许子类灵活实现强调流程与实现的分离
'''
from abc import ABC, abstractmethod
from typing import List, Dict
from collections import Counter
class WordFrequencyAnalyzer(ABC):
def analyze(self, directory: str, top_n: int = 10):
texts = self.read_texts(directory)
word_counts = self.count_words(texts)
self.print_results(word_counts, top_n)
@abstractmethod
def read_texts(self, directory: str) -> List[str]:
pass
@abstractmethod
def count_words(self, texts: List[str]) -> Dict[str, int]:
pass
@abstractmethod
def print_results(self, word_counts: Dict[str, int], top_n: int):
pass
class SimpleAnalyzer(WordFrequencyAnalyzer):
def read_texts(self, directory: str) -> List[str]:
import os
texts = []
for filename in os.listdir(directory):
if filename.endswith(".txt"):
with open(os.path.join(directory, filename), "r", encoding="utf-8") as f:
texts.append(f.read())
return texts
def count_words(self, texts: List[str]) -> Dict[str, int]:
import jieba
word_counts = Counter()
for text in texts:
words = jieba.cut(text)
word_counts.update([word for word in words if word.strip()])
return dict(word_counts)
def print_results(self, word_counts: Dict[str, int], top_n: int):
top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
for word, count in top_words:
print(f"{word}: {count}")
# 使用
analyzer = SimpleAnalyzer()
analyzer.analyze("data", 10)

@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
'''
设计说明
策略模式定义 Tokenizer 接口允许不同分词策略如中文分词英文分词
工厂模式TokenizerFactory 创建分词器隔离具体实现
解耦数据读取DataReader词频统计WordCounter结果输出ResultPrinter分离各自独立
可扩展支持添加新分词策略或输出格式
解耦
DataReader 只负责读取文件与分词和统计无关
WordCounter 依赖抽象的 Tokenizer不关心具体分词实现
ResultPrinter 只处理输出格式可扩展如输出到文件
策略模式
Tokenizer 接口允许切换分词策略如添加英文分词器
ChineseTokenizer 是具体实现易于替换
工厂模式
TokenizerFactory 封装分词器创建逻辑调用方无需知道具体类
简洁性代码结构清晰模块职责单一
扩展性
可添加新分词器如英文 EnglishTokenizer
可扩展 ResultPrinter 支持不同输出格式
实践可尝试添加英文分词器或新输出格式 CSV
'''
import os
import jieba
from collections import Counter
from abc import ABC, abstractmethod
from typing import List, Dict
# 策略接口:分词器
class Tokenizer(ABC):
@abstractmethod
def tokenize(self, text: str) -> List[str]:
pass
# 具体策略:中文分词(使用 jieba
class ChineseTokenizer(Tokenizer):
def tokenize(self, text: str) -> List[str]:
return [word for word in jieba.cut(text) if word.strip()]
# 工厂:创建分词器
class TokenizerFactory:
@staticmethod
def create_tokenizer(language: str = "chinese") -> Tokenizer:
if language == "chinese":
return ChineseTokenizer()
raise ValueError(f"Unsupported language: {language}")
# 数据读取模块
class DataReader:
def __init__(self, directory: str):
self.directory = directory
def read_texts(self) -> List[str]:
texts = []
for filename in os.listdir(self.directory):
if filename.endswith(".txt"):
with open(os.path.join(self.directory, filename), "r", encoding="utf-8") as f:
texts.append(f.read())
return texts
# 词频统计模块
class WordCounter:
def __init__(self, tokenizer: Tokenizer):
self.tokenizer = tokenizer
def count_words(self, texts: List[str]) -> Dict[str, int]:
word_counts = Counter()
for text in texts:
words = self.tokenizer.tokenize(text)
word_counts.update(words)
return dict(word_counts)
# 结果输出模块
class ResultPrinter:
@staticmethod
def print_top_words(word_counts: Dict[str, int], top_n: int = 10):
top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
for word, count in top_words:
print(f"{word}: {count}")
# 主程序
def main():
# 配置
data_dir = "data"
top_n = 10
# 创建组件
reader = DataReader(data_dir)
tokenizer = TokenizerFactory.create_tokenizer("chinese")
counter = WordCounter(tokenizer)
printer = ResultPrinter()
# 执行流程
texts = reader.read_texts()
word_counts = counter.count_words(texts)
printer.print_top_words(word_counts, top_n)
if __name__ == "__main__":
main()

@ -0,0 +1,105 @@
'''
把观察者挂到自己的处理队列上
'''
import os
import re
import threading
from queue import Queue
from collections import Counter
from abc import ABC, abstractmethod
# 观察者接口
class Observer(ABC):
@abstractmethod
def update(self, word_counts: Counter):
pass
# 具体观察者:打印前 10 高频词
class PrintTopWordsObserver(Observer):
def update(self, word_counts: Counter):
print("Top 10 高频词:")
for word, count in word_counts.most_common(10):
print(f"{word}: {count}")
# 具体观察者:保存词频到文件
class SaveToFileObserver(Observer):
def __init__(self, output_file):
self.output_file = output_file
def update(self, word_counts: Counter):
try:
with open(self.output_file, 'w', encoding='utf-8') as f:
for word, count in word_counts.most_common(10):
f.write(f"{word}: {count}\n")
print(f"词频已保存到 {self.output_file}")
except Exception as e:
print(f"保存失败: {e}")
# 词频统计器(主题)
class WordFrequencyCounter:
def __init__(self):
self.observers = []
self.counter = Counter()
self.queue = Queue()
self.lock = threading.Lock()
def add_observer(self, observer: Observer):
self.observers.append(observer)
def remove_observer(self, observer: Observer):
self.observers.remove(observer)
def notify_observers(self):
for observer in self.observers:
observer.update(self.counter)
def process_file(self):
while True:
try:
file_path = self.queue.get_nowait()
except:
break
try:
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read().lower()
words = re.findall(r'\b\w+\b', text)
with self.lock:
self.counter.update(words)
except Exception as e:
print(f"Error processing {file_path}: {e}")
finally:
self.queue.task_done()
def count_words(self, files, num_threads=4):
# 将文件路径放入队列
for file_path in files:
self.queue.put(file_path)
# 创建并启动线程
threads = [threading.Thread(target=self.process_file) for _ in range(num_threads)]
for t in threads:
t.start()
for t in threads:
t.join()
# 通知所有观察者
self.notify_observers()
def main():
# 获取文件列表
data_dir = 'data'
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
# 创建词频统计器
counter = WordFrequencyCounter()
# 添加观察者
counter.add_observer(PrintTopWordsObserver())
counter.add_observer(SaveToFileObserver("word_frequency.txt"))
# 统计词频并通知观察者
counter.count_words(files)
if __name__ == '__main__':
main()

@ -0,0 +1,5 @@
[Plugins]
;; Options: plugins/f1.pyc, plugins/f2.pyc
frequencies = plugins/f2.pyc

@ -0,0 +1,34 @@
# 插件模式提供一种个别扩展性开发和系统核心开发无关的松耦合结构。
# 简单说,第三方开发者在没有核心框架源码下也能扩展或者改造系统功能
import configparser, importlib.machinery
from cppy.cp_util import *
class PluginManager:
def __init__(self):
self.plugins = {}
def load_plugins(self):
_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(_dir)
config = configparser.ConfigParser()
config.read("config.ini")
frequencies_plugin = config.get("Plugins", "frequencies")
# 加载插件
self.plugins['word_freqs'] = importlib.machinery.SourcelessFileLoader('', frequencies_plugin).load_module()
def get_plugin(self, name):
return self.plugins.get(name)
# 创建 PluginManager 实例
plugin_manager = PluginManager()
plugin_manager.load_plugins()
wordlist = extract_file_words(testfilepath) # 提取文件中的单词
word_freqs = plugin_manager.get_plugin('word_freqs').top_word(wordlist) # 调用实例方法
print_word_freqs(word_freqs) # 打印词频

@ -0,0 +1,28 @@
import py_compile
py_compile.compile('f1.py')
py_compile.compile('f2.py')
import os
import shutil
# 设置源目录和目标目录
source_dir = os.path.join(os.path.dirname(__file__), '__pycache__') # 当前目录下的 __pycache__ 目录
target_dir = os.path.join(os.path.dirname(__file__), '..', 'plugins') # 上一级目录下的 plugins 目录
# 确保目标目录存在
os.makedirs(target_dir, exist_ok=True)
# 遍历源目录中的所有 .pyc 文件
for filename in os.listdir(source_dir):
if filename.endswith('.pyc'):
# 提取文件名的前两个字符
new_filename = filename[:2]
# 构建源文件和目标文件的完整路径
source_file = os.path.join(source_dir, filename)
target_file = os.path.join(target_dir, new_filename + '.pyc')
# 拷贝文件
shutil.copyfile(source_file, target_file)
# 删除原始文件
os.remove(source_file)
print(f"Copied {filename} to {target_file} and removed original file.")

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import operator import operator
def top25(word_list): def top_word(word_list):
word_freqs = {} word_freqs = {}
for w in word_list: for w in word_list:
if w in word_freqs: if w in word_freqs:

@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
import collections
def top_word(word_list):
counts = collections.Counter( word_list )
return counts.most_common(10)

@ -0,0 +1,80 @@
'''
REST 风格是把数据资源拓展到了互联网环境用Web协议访问
组件可以用任何环境任何语言开发, 只需要遵循共同的 rest 协议
'''
# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify, abort
from functools import lru_cache
from cppy.cp_util import *
from functools import cache
app = Flask(__name__)
# 模拟数据库
books_db = []
# 用于缓存用户数据库的装饰器
@lru_cache(maxsize=None)
def get_books_db():
return books_db
#查询所有资源
@app.route('/books', methods=['GET'])
def get_books():
return jsonify(get_books_db())
#查询某个资源
@app.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = next((book for book in get_books_db() if book['id'] == book_id), None)
if book is None:
abort(404)
return jsonify(book['content'])
# 创建或更新新资源
@app.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
global books_db
book_to_update = request.json
print(book_to_update)
books_db = get_books_db()
book = next((book for book in books_db if book['id'] == book_id), None)
if book is None:
# 如果资源不存在,创建新资源
books_db.append(book_to_update)
else:
# 如果资源存在,更新资源
book.update(book_to_update)
# 清除缓存的数据库
cache.delete(get_books_db)
return jsonify(books_db), 200
#操作一个资源
@app.route('/books/<int:book_id>/word_frequency', methods=['GET'])
def word_frequency(book_id):
global books_db
book = next((book for book in get_books_db() if book['id'] == book_id), None)
filepath = book['content']
word_list = extract_file_words(filepath)
word_frequency = get_frequencies(word_list)
word_frequency = sort_dict(word_frequency)
print_word_freqs(word_frequency)
return jsonify(word_frequency), 200
@app.route('/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
global books_db
books_db = [book for book in books_db if book['id'] != book_id]
if len(books_db) == len([l for l in books_db if l['id'] == book_id]):
abort(404) # 用户不存在
return jsonify({'message': f'book {book_id} deleted'}), 200
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
import requests
from cppy.cp_util import *
# 查询资源,得到空列表
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 创建一个1号资源
print('创建一个1号资源')
book_1 = {"id": 1, "title": "Python编程:从入门到实践", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/1'
response = requests.put(url,json=book_1)
time.sleep(2)
# - 创建一个2号资源修改testfilepaht变量
print('创建一个2号资源')
testfilepath = testfilepath.replace('Prey.txt','Pride-and-Prejudice.txt')
book_2 = {"id": 2, "title": "深入浅出计算机组成原理", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/2'
response = requests.put(url,json=book_2)
time.sleep(2)
# - 创建一个3号资源修改testfilepaht变量正好有3个文件
print('创建一个3号资源')
testfilepath = testfilepath.replace('Pride-and-Prejudice.txt','test.txt')
book_3 = {"id": 3, "title": "算法导论", "content": testfilepath}
url = 'http://127.0.0.1:5000/books/3'
response = requests.put(url,json=book_3)
time.sleep(2)
# - 查询资源,看到结果
print('查询资源,看到结果')
url = 'http://127.0.0.1:5000//books'
response = requests.get(url)
print(response.json())
time.sleep(2)
# - 操作1号资源得到词频
print('操作1号资源得到词频')
url = 'http://127.0.0.1:5000/books/1/word_frequency'
response = requests.get(url)
print_word_freqs(response.json())

@ -0,0 +1,25 @@
import requests
from cppy.cp_util import *
def main():
# 读测试文件的内容
content = read_file()
# 抽词
tokenize_response = requests.post("http://localhost:7770/tokenize", json={"text": content})
words = tokenize_response.json()["words"]
# 计算词频
count_response = requests.post("http://localhost:7771/count", json={"words": words})
word_count = count_response.json()["word_count"]
# 排序
sort_response = requests.post("http://localhost:7772/sort", json={"word_count": word_count})
top_10_words = sort_response.json()["top_10_words"]
print("Top 10 words:")
print_word_freqs(top_10_words)
if __name__ == "__main__":
main()

@ -0,0 +1,14 @@
from fastapi import FastAPI
from collections import Counter
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/count")
async def count(words_list: dict): # {"words": ["word1", "word2", ...]}
word_count = Counter(words_list["words"])
return {"word_count": dict(word_count)}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7771)

@ -0,0 +1,13 @@
from fastapi import FastAPI
import uvicorn
app = FastAPI()
@app.post("/sort")
async def sort(word_count_dict: dict):
sorted_word_count = sorted(word_count_dict["word_count"].items(), key=lambda x: x[1], reverse=True)
top_10_words = sorted_word_count[:10]
return {"top_10_words": top_10_words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7772)

@ -0,0 +1,13 @@
from fastapi import FastAPI
from cppy.cp_util import *
import uvicorn
app = FastAPI()
@app.post("/tokenize")
async def tokenize(text: str):
words = extract_str_words(text)
return {"words": words}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port= 7770)

@ -1,23 +1,20 @@
'''
以查询和独立组件并发操作数据为中心的思路建数据库围绕数据库做查询操作
现代开发模式中编程语言很少直接用 SQL 操作数据库了常常有个隔离具体数据库更方便编程的ORM层
用工具把表数据映射为对象(ORM)
createDb 创建数据对象processData 处理数据DataQuery 查询输出数据相对 共享内存数据 它慢一些
'''
import sqlite3, os.path import sqlite3, os.path
from cppy.cp_util import testfilepath,db_filename,extract_file_words from cppy.cp_util import testfilepath,db_filename,extract_file_words
# 数据库表结构 # 数据库表结构
TABLES = { TABLES = {
'documents': '''CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL
)''',
'words': '''CREATE TABLE IF NOT EXISTS words ( 'words': '''CREATE TABLE IF NOT EXISTS words (
doc_id INTEGER NOT NULL, doc_name INTEGER NOT NULL,
value TEXT NOT NULL, value TEXT NOT NULL
FOREIGN KEY (doc_id) REFERENCES documents (id) )''',
)''',
'characters': '''CREATE TABLE IF NOT EXISTS characters (
word_id INTEGER NOT NULL,
value TEXT NOT NULL,
FOREIGN KEY (word_id) REFERENCES words (id)
)'''
} }
@ -33,15 +30,10 @@ def create_db_schema(connection):
def load_file_into_database(path_to_file, connection): def load_file_into_database(path_to_file, connection):
words = extract_file_words( path_to_file ) words = extract_file_words( path_to_file )
c = connection.cursor() doc_name = os.path.basename(testfilepath).split('.')[0]
c.execute("INSERT INTO documents (name) VALUES (?)", (path_to_file,)) c = connection.cursor()
doc_id = c.lastrowid
for w in words: for w in words:
c.execute("INSERT INTO words (doc_id, value) VALUES (?, ?)", (doc_id, w)) c.execute("INSERT INTO words (doc_name, value) VALUES (?, ?)", (doc_name, w))
word_id = c.lastrowid
for char in w:
c.execute("INSERT INTO characters (word_id, value) VALUES (?, ?)", (word_id, char))
connection.commit() connection.commit()
c.close() c.close()
@ -49,11 +41,9 @@ def load_file_into_database(path_to_file, connection):
# 建数据库,处理数据入库 # 建数据库,处理数据入库
####################################################### #######################################################
# 获取当前文件所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 构造数据库文件的完整路径 # 构造数据库文件的完整路径
current_dir = os.path.dirname(os.path.abspath(__file__))
db_file_path = os.path.join(current_dir, db_filename) db_file_path = os.path.join(current_dir, db_filename)
if os.path.exists(db_file_path): if os.path.exists(db_file_path):
os.remove(db_file_path) os.remove(db_file_path)
@ -68,4 +58,11 @@ with sqlite3.connect(db_file_path) as connection:
c = connection.cursor() c = connection.cursor()
c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10") c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10")
for row in c.fetchall(): for row in c.fetchall():
print(row[0], '-', row[1]) print(row[0], '-', row[1])
'''
也可以把数据库看做解决共享数据的竞争死锁的办法
不过本例中的计算太快
用数据库共享数据成本太高
'''

@ -1,26 +1,33 @@
'''
每个对象运行于独立线程每个对象只暴露一个队伍消息队列的接口
各个组件完全不能互操作仅依靠队列发消息进行协作/
多线程模式下消息队列来共享数据空间相比较传统数据库共享方式访问更快
这种模式改造下适合跨进程系统的后台响应对象设计.
'''
from threading import Thread from threading import Thread
from queue import Queue from queue import Queue
from cppy.cp_util import * from cppy.cp_util import *
class ActiveWFObject(Thread): class ThreadObject(Thread):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.queue = Queue() self.queue = Queue()
self._stopMe = False self._over = False
self.start() self.start()
def run(self): def run(self):
while not self._stopMe: while not self._over:
message = self.queue.get() message = self.queue.get()
self._dispatch(message) self._dispatch(message)
if message[0] == 'die': if message[0] == 'over':
self._stopMe = True break
def send(receiver, message): def send(receiver, message):
receiver.queue.put(message) receiver.queue.put(message)
class DataStorageManager(ActiveWFObject): class TxtManager(ThreadObject):
""" Models the contents of the file """
_data = '' _data = ''
def _dispatch(self, message): def _dispatch(self, message):
@ -28,23 +35,21 @@ class DataStorageManager(ActiveWFObject):
self._init(message[1:]) self._init(message[1:])
elif message[0] == 'send_word_freqs': elif message[0] == 'send_word_freqs':
self._process_words(message[1:]) self._process_words(message[1:])
else: else:
# forward
send(self._stop_word_manager, message) send(self._stop_word_manager, message)
def _init(self, message): def _init(self, message):
path_to_file = message[0] self._data = extract_file_words(message[0])
self._stop_word_manager = message[1] self._stop_word_manager = message[1]
self._data = extract_file_words(path_to_file)
def _process_words(self, message): def _process_words(self, message):
recipient = message[0] recipient = message[0]
for w in self._data: for w in self._data:
send(self._stop_word_manager, ['filter', w]) send(self._stop_word_manager, ['filter', w])
send(self._stop_word_manager, ['top10', recipient]) send(self._stop_word_manager, ['topWord', recipient])
class StopWordManager(ActiveWFObject): class FilterManager(ThreadObject):
""" Models the stop word filter """
_stop_words = [] _stop_words = []
def _dispatch(self, message): def _dispatch(self, message):
@ -52,8 +57,7 @@ class StopWordManager(ActiveWFObject):
self._init(message[1:]) self._init(message[1:])
elif message[0] == 'filter': elif message[0] == 'filter':
return self._filter(message[1:]) return self._filter(message[1:])
else: else:
# forward
send(self._word_freqs_manager, message) send(self._word_freqs_manager, message)
def _init(self, message): def _init(self, message):
@ -65,31 +69,29 @@ class StopWordManager(ActiveWFObject):
if word not in self._stop_words: if word not in self._stop_words:
send(self._word_freqs_manager, ['word', word]) send(self._word_freqs_manager, ['word', word])
class WordFrequencyManager(ActiveWFObject): class WFManager(ThreadObject):
""" Keeps the word frequency data """
_word_freqs = {} _word_freqs = {}
def _dispatch(self, message): def _dispatch(self, message):
if message[0] == 'word': if message[0] == 'word':
self._increment_count(message[1:]) self._increment_count(message[1:])
elif message[0] == 'top10': elif message[0] == 'topWord':
self._top10(message[1:]) self._topWord(message[1:])
def _increment_count(self, message): def _increment_count(self, message):
word, = message word, = message
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1 self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def _top10(self, message): def _topWord(self, message):
recipient = message[0] recipient = message[0]
freqs_sorted = sort_dict ( self._word_freqs ) freqs_sorted = sort_dict ( self._word_freqs )
send(recipient, ['top10', freqs_sorted]) send(recipient, ['topWord', freqs_sorted])
class WordFrequencyController(ActiveWFObject):
class MyController(ThreadObject):
def _dispatch(self, message): def _dispatch(self, message):
if message[0] == 'run': if message[0] == 'run':
self._run(message[1:]) self._run(message[1:])
elif message[0] == 'top10': elif message[0] == 'topWord':
self._display(message[1:]) self._display(message[1:])
else: else:
raise Exception("Message not understood " + message[0]) raise Exception("Message not understood " + message[0])
@ -101,20 +103,20 @@ class WordFrequencyController(ActiveWFObject):
def _display(self, message): def _display(self, message):
word_freqs, = message word_freqs, = message
print_word_freqs( word_freqs) print_word_freqs( word_freqs)
send(self._storage_manager, ['die']) send(self._storage_manager, ['over'])
self._stopMe = True self._over = True
if __name__ == '__main__': if __name__ == '__main__':
word_freq_manager = WordFrequencyManager() word_freq_manager = WFManager()
stop_word_manager = StopWordManager() stop_word_manager = FilterManager()
storage_manager = DataStorageManager() storage_manager = TxtManager()
wfcontroller = MyController()
send(stop_word_manager, ['init', word_freq_manager])
send(storage_manager, ['init', testfilepath, stop_word_manager]) send(storage_manager, ['init', testfilepath, stop_word_manager])
send(stop_word_manager, ['init', word_freq_manager])
wfcontroller = WordFrequencyController()
send(wfcontroller, ['run', storage_manager]) send(wfcontroller, ['run', storage_manager])
# Wait for the active objects to finish # 等待所有管理器完成工作
[t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]] threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]
for thread in threads: thread.join()

@ -0,0 +1,52 @@
'''
装饰器模式 (Decorator Pattern)
适用场景动态为词频统计添加功能如过滤停用词忽略标点或添加词性标注
如何应用
定义核心 WordCounter 接口负责基本词频统计
使用装饰器类动态添加功能 StopWordFilterDecorator 过滤停用词
优点功能扩展不修改核心逻辑符合开闭原则
知识点
如何动态扩展功能保持核心代码不变
体会组合优于继承的原则
'''
from abc import ABC, abstractmethod
from typing import List, Dict
from collections import Counter
class WordCounter(ABC):
@abstractmethod
def count_words(self, texts: List[str]) -> Dict[str, int]:
pass
class BasicWordCounter(WordCounter):
def count_words(self, texts: List[str]) -> Dict[str, int]:
import jieba
word_counts = Counter()
for text in texts:
words = jieba.cut(text)
word_counts.update([word for word in words if word.strip()])
return dict(word_counts)
class WordCounterDecorator(WordCounter, ABC):
def __init__(self, counter: WordCounter):
self.counter = counter
class StopWordFilterDecorator(WordCounterDecorator):
def __init__(self, counter: WordCounter, stop_words: List[str]):
super().__init__(counter)
self.stop_words = stop_words
def count_words(self, texts: List[str]) -> Dict[str, int]:
word_counts = self.counter.count_words(texts)
return {word: count for word, count in word_counts.items() if word not in self.stop_words}
# 使用
counter = StopWordFilterDecorator(BasicWordCounter(), stop_words=["", "", ""])
word_counts = counter.count_words(["这是一段测试文本。"])
print(word_counts)

@ -1,7 +1,10 @@
# 自己设计类装饰器,类方法做进行类型申明和检查
from collections import Counter from collections import Counter
from cppy.cp_util import * from cppy.cp_util import *
class AcceptTypes: class TypesCheck:
def __init__(self, *args): def __init__(self, *args):
self._args = args self._args = args
@ -9,22 +12,23 @@ class AcceptTypes:
def wrapped_f(*args, **kwargs): def wrapped_f(*args, **kwargs):
for i, arg_type in enumerate(self._args): for i, arg_type in enumerate(self._args):
if not isinstance(args[i], arg_type): if not isinstance(args[i], arg_type):
raise TypeError(f"Argument {i} expected {arg_type}, got {type(args[i])}") raise TypeError(f" {i} expected {arg_type}, got {type(args[i])}")
return f(*args, **kwargs) return f(*args, **kwargs)
return wrapped_f return wrapped_f
@AcceptTypes(str) @TypesCheck(str)
def extract_words_(path_to_file): def extract_words_(path_to_file):
return extract_file_words(path_to_file) return extract_file_words(path_to_file)
@AcceptTypes(list) @TypesCheck(list)
def frequencies_(word_list): def frequencies_(word_list):
return Counter(word_list) return Counter(word_list)
@AcceptTypes(Counter) @TypesCheck(Counter)
def sort_(word_freq): def sort_(word_freq):
return word_freq.most_common() return word_freq.most_common()
if __name__ == '__main__': if __name__ == '__main__':
word_freqs = sort_(frequencies_(extract_words_( testfilepath ))) word_freqs = sort_(frequencies_(extract_words_( testfilepath )))
print_word_freqs(word_freqs) print_word_freqs(word_freqs)

@ -1,6 +1,9 @@
import time import time
import cppy.cp_util as util import cppy.cp_util as util
'''
用反射实现装饰器的效果
'''
# 工具函数 # 工具函数
def extract_words(path_to_file): def extract_words(path_to_file):

@ -0,0 +1,20 @@
# Python 作为弱类型语言希望拥有强类型语言类似的规范工整工程性的优点,牺牲一些代码的自由度。
# 可以理解为更好的代码注释和更多的工程约束 。
import cppy.cp_util as util
def extract_words(path_to_file:str) -> list:
return util.extract_file_words(path_to_file)
def frequencies( word_list:list ) -> dict :
return util.get_frequencies(word_list)
def sort(word_freq:dict) -> list :
return util.sort_dict(word_freq)
if __name__ == "__main__":
word_freqs = sort( frequencies(extract_words( util.testfilepath )) )
util.print_word_freqs(word_freqs)

@ -0,0 +1,48 @@
import sys
import re
from collections import Counter
# 提供带参数的运行命令行环境
# 使用 python command_line_1.py testfilepath 10
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 主函数
def main():
# 检查命令行参数数量
if len(sys.argv) != 3:
print("Usage: python command_line_1.py <file_path> <n>")
sys.exit(1)
file_path = sys.argv[1]
n = int(sys.argv[2])
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"File not found: {file_path}")
except ValueError as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,51 @@
import re
from collections import Counter
# 提供一个命令行交互方式来驱动程序运行
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 交互式提示用户输入文件路径和前n个单词的数量
def interactive_mode():
file_path = input("请输入文件路径 >> ")
try:
n = int(input("请输入你想要输出的前n个最常见单词的数量 >> "))
if n <= 0:
raise ValueError("数量必须大于0。")
except ValueError as e:
print(f"输入错误:{e}")
return
try:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"文件未找到: {file_path}")
except Exception as e:
print(f"发生错误: {e}")
# 主函数
def main():
print("欢迎使用词频统计工具。")
interactive_mode()
if __name__ == "__main__":
main()

@ -1,3 +1,7 @@
# 命令行菜单驱动程序
# while 循环体中出现一个 case 菜单,每个菜单调用对应函数
import os import os
import cppy.cp_util as util import cppy.cp_util as util

@ -1,3 +1,6 @@
# 应用基于一个窗口,窗口基于操作系统的消息机制
import sys import sys
from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QVBoxLayout, QTextEdit, QFileDialog from PyQt5.QtWidgets import QApplication, QWidget, QPushButton, QVBoxLayout, QTextEdit, QFileDialog
import cppy.cp_util as util import cppy.cp_util as util

@ -1,3 +1,13 @@
'''
Web的MVC三层结构是互联网最火的应用框架
系统需要切分为模型视图控制三层
Python 中最简单的 Web服务器是 flask
app.py - Flask应用的主文件控制器Controller
models.py - 模型Model文件包含词频统计的逻辑
templates/index.html - 视图View文件用于展示结果
'''
from flask import Flask, render_template, request from flask import Flask, render_template, request
from models import WordFrequencyModel from models import WordFrequencyModel

@ -0,0 +1,30 @@
from flask import Flask, render_template, request, redirect, url_for
from collections import Counter
from cppy.cp_util import *
import os
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 获取上传的文件
file = request.files['file']
# 保存临时文件并读取内容
filename = os.path.join('/temp', file.filename)
file.save(filename)
# 计算词频
words = extract_file_words(filename)
word_counts = Counter(words)
# 删除临时文件
os.remove(filename)
return render_template('result.html', word_counts=word_counts.most_common())
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,14 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Upload Text File</title>
</head>
<body>
<h1>Upload a Text File to Count Word Frequencies</h1>
<form action="/" method="post" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Submit">
</form>
</body>
</html>

@ -0,0 +1,16 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Word Frequencies</title>
</head>
<body>
<h1>Top Word Frequencies:</h1>
<ul>
{% for word, count in word_counts %}
<li>{{ word }}: {{ count }}</li>
{% endfor %}
</ul>
<a href="{{ url_for('index') }}">Back to Upload</a>
</body>
</html>

@ -1,60 +1,65 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import cppy.cp_util as util import cppy.cp_util as util
from collections import Counter from collections import Counter
'''
class WordFrequencyStateMachine: 状态机是计算机程序运行的基础理论
def __init__(self, file_path): 使用状态机的风格来处理文件并计算词频我们可以将整个过程分解为一系列状态转移
self.file_path = file_path 每个状态代表处理过程中的一个阶段比如读取文件分割单词计算词频
self.content = None 这种方法在Python中并不常见但它展示了如何使用状态机来管理程序的状态和流程
self.words = None '''
self.word_freq = None
self.state = 'IDLE' class WordFrequencyStateMachine:
def __init__(self, file_path):
def transition_to_read_file(self): self.file_path = file_path
try: self.content = None
with open(self.file_path, 'r', encoding='utf-8') as file: self.words = None
self.content = file.read() self.word_freq = None
self.state = 'WORDS_SPLIT' self.state = 'IDLE'
except FileNotFoundError:
print(f"文件 {self.file_path} 未找到。") def transition_to_read_file(self):
except Exception as e: try:
print(f"读取文件时发生错误: {e}") with open(self.file_path, 'r', encoding='utf-8') as file:
self.content = file.read()
def transition_to_split_words(self): self.state = 'WORDS_SPLIT'
if self.content is not None: except FileNotFoundError:
self.words = util.extract_str_words(self.content) print(f"文件 {self.file_path} 未找到。")
self.state = 'CALCULATE_FREQ' except Exception as e:
else: print(f"读取文件时发生错误: {e}")
print("文件内容为空,无法分割单词。")
def transition_to_split_words(self):
def transition_to_calculate_freq(self): if self.content is not None:
if self.words is not None: self.words = util.extract_str_words(self.content)
self.word_freq = Counter(self.words) self.state = 'CALCULATE_FREQ'
self.state = 'DONE' else:
else: print("文件内容为空,无法分割单词。")
print("单词列表为空,无法计算词频。")
def transition_to_calculate_freq(self):
def run(self): if self.words is not None:
while self.state != 'DONE': self.word_freq = Counter(self.words)
if self.state == 'IDLE': self.state = 'DONE'
self.transition_to_read_file() else:
elif self.state == 'WORDS_SPLIT': print("单词列表为空,无法计算词频。")
self.transition_to_split_words()
elif self.state == 'CALCULATE_FREQ': def run(self):
self.transition_to_calculate_freq() while self.state != 'DONE':
else: if self.state == 'IDLE':
print(f"未知状态: {self.state}") self.transition_to_read_file()
break elif self.state == 'WORDS_SPLIT':
self.transition_to_split_words()
return self.word_freq elif self.state == 'CALCULATE_FREQ':
self.transition_to_calculate_freq()
# 使用状态机计算词频 else:
print(f"未知状态: {self.state}")
break
state_machine = WordFrequencyStateMachine(util.testfilepath)
word_frequencies = state_machine.run() return self.word_freq
# 打印结果 # 使用状态机计算词频
for word, freq in word_frequencies.most_common(10):
print(f"{word}: {freq}")
state_machine = WordFrequencyStateMachine(util.testfilepath)
word_frequencies = state_machine.run()
# 打印结果
util.print_word_freqs(word_frequencies.most_common(10))

@ -0,0 +1,192 @@
import site
import os, re, time
import string, operator
################################################################################
# 变量
################################################################################
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy', 'data', 'stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy', 'data', testfilename)
################################################################################
# 项目函数
################################################################################
def read_file(path_to_file):
"""
读取指定文件的内容
Args:
path_to_file (str): 文件路径
Returns:
str: 文件内容
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read()
return data
def re_split(data):
"""
使用正则表达式分割字符串将非字母字符替换为空格并将所有字符转换为小写
Args:
data (str): 输入字符串
Returns:
list: 分割后的单词列表
"""
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords(path_to_file=stopwordfilepath):
"""
获取停用词列表
Args:
path_to_file (str): 停用词文件路径默认为 stopwordfilepath
Returns:
list: 停用词列表
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read().split(',')
data.extend(list(string.ascii_lowercase))
return data
def get_chunks(file_path=testfilepath, chunk_size=1000):
"""
将文件内容分割成多个块
Args:
file_path (str): 文件路径默认为 testfilepath
chunk_size (int): 每个块的大小默认为 1000
Returns:
list: 分割后的块列表
"""
content = re_split(read_file(file_path))
chunks = [
content[i:i + chunk_size] for i in range(0, len(content), chunk_size)
]
return chunks
def extract_file_words(path_to_file):
"""
提取文件中的单词去除停用词和长度小于3的单词
Args:
path_to_file (str): 文件路径
Returns:
list: 提取后的单词列表
"""
word_list = re_split(read_file(path_to_file))
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def extract_str_words(data_str):
"""
提取字符串中的单词去除停用词和长度小于3的单词
Args:
data_str (str): 输入字符串
Returns:
list: 提取后的单词列表
"""
word_list = re_split(data_str)
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def count_word(word, word_freqs, stopwords):
"""
统计单词频率
Args:
word (str): 单词
word_freqs (dict): 单词频率字典
stopwords (list): 停用词列表
"""
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
"""
获取单词频率
Args:
word_list (list): 单词列表
Returns:
dict: 单词频率字典
"""
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict(word_freq):
"""
对字典进行排序
Args:
word_freq (dict): 单词频率字典
Returns:
list: 排序后的单词频率列表
"""
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
def print_word_freqs(word_freqs, n=10):
"""
打印单词频率
Args:
word_freqs (list): 单词频率列表
n (int): 打印的单词数量默认为 10
"""
for (w, c) in word_freqs[:n]:
print(w, '-', c)
################################################################################
# 通用工具
################################################################################
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 调用原始函数
end_time = time.time() # 记录结束时间
run_time = end_time - start_time # 计算运行时间
print(f"{func.__name__} 运行时间: {run_time*1000:.2f}")
return result
return wrapper
def test():
print('cppy welcome')

@ -0,0 +1,8 @@
## 任务
本项目的主要功能任务对指定文件读取文本在停用词过滤后按降序输出前10个高频词
我们研究这个问题的解法在各种情景下的表现形式 。
将 cppy整个目录转移到 anaconda3\Lib\site-packages 下面。里面放了一些共同的代码片段(函数),这样使得模式代码更短小简洁,便于更能集中理解各种模式的核心思想 。

@ -0,0 +1,24 @@
# 目标
本节使用一个书城的各方面业务需求来展示面向对象的常见设计模式 。
# 任务
背景假设为一个综合书城,提供线上线下购买,还经营一个书吧、一个报告厅。
# 说明
面向对象的模式是把编程过程中的一些思路固定化,并给一个名字方便理解 。
它是软件工程中一组经过验证的、可重复使用的代码写法 。
所以,模式不是语法,而是编程思路 。
这样做的好处是,统一大家的代码形式,提高代码可读性、可维护性、可扩展性 。
那为啥,面向过程没有这么做
是因为这个思维提炼过程,充分利用了面向对象语言的特性:封装、继承、多态 。
面向过程语言,没有这些特性,所以,面向过程程序设计一般不谈设计模式 。
因为 Python 对象协议的机制,多态、接口概念发生了根本变化 。使得一些C++、Java 的模式没用了 。比如 “ 原型模式Prototype可以使用copy.deepcopy()非常简便来创建 。另外,很多模式中继承关系也没必要了。但,下面很多示例中依旧保持了基类 。一是致敬经典,二是起到一个工程上更工整和代码注释的作用 。
# 应用场景
面向对象设计模式在管理信息系统和图形用户界面系统应用比较广泛 。

@ -0,0 +1,17 @@
'''
全局只允许一个实例的办法
在该电商系统中全局只有一个数据库连接使用单例模式确保在整个应用程序内只创建一次数据库连接实例
'''
class DatabaseConnection:
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.connect_to_db()
return cls._instance
def connect_to_db(self):
# 连接到数据库的代码...
pass

@ -0,0 +1,31 @@
# 如果有一个选择结构来决定实现不同的类,再面向对象设计里面一般把这个选择做成一个类,叫做工厂模式
# 定义一个ProductFactory类用于创建不同类型的商品实例如电子产品、书籍等。具体的产品由子类实现。
#
class Product:
def __init__(self, name, price):
self.name = name
self.price = price
class Electronic(Product):
def __init__(self, name, price, brand):
super().__init__(name, price)
self.brand = brand
class Book(Product):
def __init__(self, name, price, author):
super().__init__(name, price)
self.author = author
class ProductFactory:
@staticmethod
def create_product(product_type, *args, **kwargs):
if product_type == 'electronic':
return Electronic(*args, **kwargs)
elif product_type == 'book':
return Book(*args, **kwargs)
else:
raise ValueError("Invalid product type")
# 使用工厂方法创建产品
product = ProductFactory.create_product('book', 'Python编程艺术', 50.0, 'Mark Lutz')

@ -0,0 +1,114 @@
'''
建造者模式Builder Pattern允许构建一个复杂对象的各个部分然后一步一步地返回这个对象的完整版本
将建造者模式应用于网购的下单和出库过程时我们可以设计一个Order类来表示订单
以及一个OrderBuilder类来构建订单的各个部分
此外我们还可以引入一个ShoppingCart类来表示购物车以及一个Inventory类来处理库存和出库逻辑
'''
######################################################################
# Order类它包含订单的基本信息如下单时间、用户信息、订单项列表
######################################################################
from datetime import datetime
class OrderItem:
def __init__(self, product_id, quantity):
self.product_id = product_id
self.quantity = quantity
class Order:
def __init__(self, user_id, order_items, order_time=None):
self.user_id = user_id
self.order_items = order_items
self.order_time = order_time or datetime.now()
self.status = "PLACED" # 初始状态为已下单
def __str__(self):
return f"Order for user {self.user_id} placed at {self.order_time}. Status: {self.status}"
def fulfill(self, inventory):
# 出库逻辑,这里简化处理
for item in self.order_items:
if not inventory.deduct_stock(item.product_id, item.quantity):
return False
self.status = "FULFILLED"
return True
######################################################################
# OrderBuilder类用于构建订单
######################################################################
class OrderBuilder:
def __init__(self):
self.reset()
def reset(self):
self._user_id = None
self._order_items = []
def for_user(self, user_id):
self._user_id = user_id
return self
def add_item(self, product_id, quantity):
self._order_items.append(OrderItem(product_id, quantity))
return self
def build(self):
if not self._user_id or not self._order_items:
raise ValueError("Order cannot be built without user and items.")
return Order(self._user_id, self._order_items)
######################################################################
# 购物车和库存类
######################################################################
class ShoppingCart:
def __init__(self, user_id):
self.user_id = user_id
self.items = {} # {product_id: quantity}
def add_to_cart(self, product_id, quantity):
self.items[product_id] = self.items.get(product_id, 0) + quantity
def checkout(self):
order_items = [OrderItem(product_id, quantity) for product_id, quantity in self.items.items()]
self.items.clear() # 清空购物车
return order_items
class Inventory:
def __init__(self):
self.stock = {} # {product_id: quantity}
def add_stock(self, product_id, quantity):
self.stock[product_id] = self.stock.get(product_id, 0) + quantity
def deduct_stock(self, product_id, quantity):
if self.stock.get(product_id, 0) >= quantity:
self.stock[product_id] -= quantity
return True
return False
######################################################################
# 模拟整个下单和出库过程
######################################################################
# 初始化库存和购物车
inventory = Inventory()
inventory.add_stock("book1", 10)
inventory.add_stock("book2", 5)
cart = ShoppingCart(user_id="user123")
cart.add_to_cart("book1", 2)
cart.add_to_cart("book2", 1)
# 使用OrderBuilder构建订单
order_items = cart.checkout() # 结账,获取订单项列表并清空购物车
order_builder = OrderBuilder().for_user("user123")
for item in order_items:
order_builder.add_item(item.product_id, item.quantity)
order = order_builder.build() # 构建订单对象
print(order) # 输出订单信息
# 出库处理
if order.fulfill(inventory):
print("Order has been fulfilled.")
else:
print("Order fulfillment failed due to insufficient stock.")

@ -0,0 +1,55 @@
'''
享元模式Flyweight Pattern可以用来减少对象的创建数量比如对于重复的书籍信息或者频繁请求的书籍分类可以通过享元模式来共享这些信息以提高内存使用效率和系统性能
在下面的代码中BookFlyweight 是享元抽象类它使用了一个类级别的字典 _books 来存储已经创建的书籍对象__new__ 方法被用来在创建新实例之前检查是否已经存在具有相同ISBN的书籍对象如果已经存在就返回那个对象的引用如果不存在就创建一个新对象并将其存储在 _books 字典中
请注意在这个例子中我故意尝试使用相同的ISBN但不同的标题来创建书籍对象以展示不正确的使用方式在真正的享元模式实现中一旦对象被创建并且其内在状态被设置在这个例子中是由ISBN标题和作者定义的就不应该再修改这些状态如果需要处理变化的状态通常会将这部分状态外部化并通过方法的参数传递给享元对象
另外要注意的是享元模式主要适用于大量细粒度对象且这些对象可以共享状态的情况在书籍的例子中ISBN是一个很好的共享状态的键但标题和作者通常不应该在对象创建后被改变因此这个例子更多的是为了展示享元模式的基本结构和原理而不是一个完全贴合实际的实现在实际应用中需要更仔细地设计享元对象的不可变状态和可变状态
'''
# 享元抽象类
class BookFlyweight:
_books = {}
def __new__(cls, isbn, title, author):
# 根据ISBN创建或获取书籍享元对象
if isbn not in cls._books:
cls._books[isbn] = super(BookFlyweight, cls).__new__(cls)
cls._books[isbn].set_book_info(title, author)
return cls._books[isbn]
def set_book_info(self, title, author):
self.title = title
self.author = author
def get_book_info(self):
return f"{self.title} by {self.author}"
# 享元工厂类
class BookFactory:
@staticmethod
def get_book(isbn, title, author):
return BookFlyweight(isbn, title, author)
# 客户端代码
if __name__ == "__main__":
# 使用相同的ISBN创建书籍对象它们应该是同一个对象的引用
book1 = BookFactory.get_book("123456789", "The Great Gatsby", "F. Scott Fitzgerald")
book2 = BookFactory.get_book("123456789", "The Same Book With Different Title?", "F. Scott Fitzgerald")
# 尽管我们试图设置不同的标题但因为ISBN相同所以它们是同一个对象
# 实际上,在这个实现中,我们应该确保在创建对象时就设置好所有必要的属性,并且之后不再修改它们。
# 这里为了演示,我们错误地修改了标题,这不是享元模式的典型用法。
# 在实际应用中,应该避免在享元对象创建后修改其内在状态(除了可能的状态复位)。
print(book1.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
print(book2.get_book_info()) # 输出The Same Book With Different Title? by F. Scott Fitzgerald
# 使用不同的ISBN创建书籍对象它们应该是不同的对象
book3 = BookFactory.get_book("987654321", "1984", "George Orwell")
print(book3.get_book_info()) # 输出1984 by George Orwell
# 验证是否是同一个对象
print(book1 is book2) # 输出True
print(book1 is book3) # 输出False

@ -0,0 +1,23 @@
# 装饰器模式允许我们在不修改原有类的基础上,动态地添加额外的功能。
# 就增加功能来说,装饰器模式比生成子类更为灵活。
# 餐吧的顾客可以选择为他们的咖啡添加额外的调料。
class Beverage:
def __init__(self, description):
self.description = description
self.price = 0.0
def cost(self):
return self.price
class CondimentDecorator(Beverage): # 进行装饰
def __init__(self, beverage, description, price_increase):
self.beverage = beverage
self.description = f"{beverage.description}, {description}"
self.price_increase = price_increase
def cost(self):
return self.beverage.cost() + self.price_increase
# 使用装饰器模式
coffee = Beverage("Espresso")
coffee_with_chocolate = CondimentDecorator(coffee, "Chocolate", 0.50)

@ -0,0 +1,42 @@
'''
适配器模式Adapter
应用将一个类的接口转换成客户期望的另一个接口使得原本由于接口不兼容而无法一起工作的类能够一起工作
'''
########################################################################
# 定义一个目标接口Target和一个与之不兼容的类Adaptee
############################################################################
# 目标接口
class Target:
def request(self):
pass
# 需要适配的类
class Adaptee:
def specific_request(self):
print("Called Adaptee's specific_request.")
########################################################################
# 定义一个适配器类Adapter它实现了Target接口并且持有Adaptee的实例
# 从而能够在request方法中调用Adaptee的specific_request方法
# 一个继承,一个当参数加入构造函数
############################################################################
# 适配器
class Adapter(Target):
def __init__(self, adaptee):
self.adaptee = adaptee
def request(self):
# 调用Adaptee的specific_request方法
self.adaptee.specific_request()
if __name__ == "__main__":
# 创建Adaptee实例
adaptee = Adaptee()
# 创建Adapter实例将Adaptee实例作为参数传递
adapter = Adapter(adaptee)
# 客户端调用Target的request方法实际上调用的是Adaptee的specific_request方法
adapter.request()

@ -0,0 +1,60 @@
'''
代理模式Proxy Pattern为其他对象提供一种代理以控制对这个对象的访问
在书城的业务背景中代理模式Proxy Pattern可以应用于多种场景例如实现延迟加载访问控制远程代理等
下面示例展示如何使用代理模式来控制对书城中书籍对象的访问
假设我们有一个Book类代表书城中的书籍和一个BookProxy类作为Book的代理类来控制对书籍的访问
'''
# 书籍类
class Book:
def __init__(self, title, author, price):
self.title = title
self.author = author
self.price = price
self.is_loaded = False # 假设书籍内容初始时是未加载的
def load_content(self):
# 模拟加载书籍内容的过程,这里仅打印一条消息
print(f"Loading content for book '{self.title}' by {self.author}...")
self.is_loaded = True
def display(self):
if not self.is_loaded:
self.load_content()
print(f"Book Title: {self.title}")
print(f"Author: {self.author}")
print(f"Price: {self.price}")
print("Content is loaded and displayed.")
# 书籍代理类
class BookProxy:
def __init__(self, book):
self.book = book
def display(self):
# 在显示书籍信息之前,代理可以控制一些额外的操作
# 比如检查用户权限、记录访问日志等
# 这里我们模拟一个简单的访问控制
print("Checking access permissions...")
# 假设权限检查通过调用实际对象的display方法
self.book.display()
# 客户端代码
if __name__ == "__main__":
# 创建一个书籍对象(假设内容尚未加载)
book = Book("The Great Gatsby", "F. Scott Fitzgerald", 29.99)
# 创建一个书籍代理对象
book_proxy = BookProxy(book)
# 通过代理来访问书籍信息
book_proxy.display()
'''
在这个示例中Book类有一个load_content方法来模拟加载书籍内容的过程以及一个display方法来显示书籍的信息
在实际应用中load_content可能会执行更加复杂的操作如从数据库或远程服务器加载数据
BookProxy类作为代理包装了对Book对象的访问
在这个简单的例子中它在调用display方法之前执行了一个模拟的权限检查
在实际应用中代理类可以执行各种操作如缓存懒加载权限验证等
客户端代码通过创建BookProxy对象来间接访问Book对象而不是直接访问
这种方式提供了一种灵活的控制机制使得可以在不修改原始类的情况下增加额外的功能或控制逻辑
'''

@ -0,0 +1,75 @@
'''
在书城的业务背景中外观模式Facade Pattern可以用于提供一个简化的接口以隐藏系统的复杂性
假设书城提供了多种服务如用户认证购物车管理订单处理等外观模式可以将这些服务整合到一个统一的接口中
使客户端能够更方便地使用这些服务
下面是一个简单的实现代码示例展示如何使用外观模式来整合书城的不同服务
'''
# 用户服务类
class UserService:
def authenticate(self, username, password):
# 这里是用户认证的实现代码
print(f"Authenticating user {username}...")
return True # 假设认证总是成功
# 购物车服务类
class CartService:
def add_to_cart(self, user_id, book_id):
# 这里是将书籍添加到购物车的实现代码
print(f"User {user_id} added book {book_id} to the cart.")
def remove_from_cart(self, user_id, book_id):
# 这里是从购物车中移除书籍的实现代码
print(f"User {user_id} removed book {book_id} from the cart.")
# 订单服务类
class OrderService:
def create_order(self, user_id, cart_items):
# 这里是创建订单的实现代码
print(f"Creating order for user {user_id} with items {cart_items}...")
return "OrderID123" # 假设返回一个订单ID
# 书城外观类
class BookstoreFacade:
def __init__(self):
self.user_service = UserService()
self.cart_service = CartService()
self.order_service = OrderService()
def login_and_add_to_cart(self, username, password, book_id):
if self.user_service.authenticate(username, password):
print("Login successful.")
# 假设用户ID为1实际应用中应该通过认证服务获取
user_id = 1
self.cart_service.add_to_cart(user_id, book_id)
else:
print("Login failed.")
def checkout(self, username, password):
if self.user_service.authenticate(username, password):
print("Login successful.")
# 假设用户ID为1实际应用中应该通过认证服务获取
user_id = 1
# 假设获取购物车项目的方法存在(实际应用中需要实现)
cart_items = self.get_cart_items(user_id)
if cart_items:
order_id = self.order_service.create_order(user_id, cart_items)
print(f"Order created with ID: {order_id}")
else:
print("Your cart is empty.")
else:
print("Login failed.")
def get_cart_items(self, user_id):
# 这里应该有一个方法来获取购物车中的项目,但为了简化示例,我们直接返回一个列表
return [1, 2, 3] # 假设的书籍ID列表
# 客户端代码
if __name__ == "__main__":
bookstore = BookstoreFacade()
# 用户登录并添加书籍到购物车
bookstore.login_and_add_to_cart("alice", "password123", "book456")
# 用户结账创建订单
bookstore.checkout("alice", "password123")

@ -0,0 +1,85 @@
'''
在书城的业务背景中组合模式Composite Pattern可以用于构建树形结构比如书籍的分类结构
每个分类可以包含子分类也可以包含具体的书籍通过这种方式可以方便地管理和遍历整个书籍分类体系
下面是一个简单的实现代码示例展示如何使用组合模式来构建书城的书籍分类结构
'''
from abc import ABC, abstractmethod
# 组件抽象类
class BookComponent(ABC):
@abstractmethod
def add(self, component):
pass
@abstractmethod
def remove(self, component):
pass
@abstractmethod
def display(self, depth):
pass
# 叶子节点:书籍类
class Book(BookComponent):
def __init__(self, title, author):
self.title = title
self.author = author
def add(self, component):
print("Cannot add to a leaf node")
def remove(self, component):
print("Cannot remove from a leaf node")
def display(self, depth):
print("-" * depth + f" {self.title} by {self.author}")
# 复合节点:书籍分类类
class BookCategory(BookComponent):
def __init__(self, name):
self.name = name
self.children = []
def add(self, component):
self.children.append(component)
def remove(self, component):
self.children.remove(component)
def display(self, depth):
print("-" * depth + self.name)
for child in self.children:
child.display(depth + 1)
# 客户端代码
if __name__ == "__main__":
# 创建书籍分类和书籍对象
fiction = BookCategory("Fiction")
non_fiction = BookCategory("Non-Fiction")
novel = Book("The Great Gatsby", "F. Scott Fitzgerald")
biography = Book("Steve Jobs", "Walter Isaacson")
programming = Book("Clean Code", "Robert C. Martin")
# 构建书籍分类结构
fiction.add(novel)
non_fiction.add(biography)
non_fiction.add(programming)
# 创建一个根分类,并将其他分类添加到其中
root = BookCategory("Root")
root.add(fiction)
root.add(non_fiction)
# 显示整个书籍分类结构
root.display(0)
'''
在这个示例中BookComponent 是一个抽象类定义了所有组件无论是分类还是书籍都应该有的方法addremove display
Book 类是叶子节点代表具体的书籍它实现了 BookComponent 接口
add remove 方法对于书籍来说是不适用的因此它们只是打印一条错误消息
BookCategory 类是复合节点代表书籍的分类它可以包含其他分类或书籍因此它实现了 add remove 方法来管理子节点并且实现了 display 方法来显示分类及其子节点的信息
客户端代码创建了一些书籍和分类对象并构建了一个书籍分类结构最后通过调用根分类的 display 方法可以显示整个书籍分类结构
'''

@ -0,0 +1,94 @@
'''
桥接模式Bridge Pattern可以将抽象与实现解耦让它们可以独立变化
这在处理多种分类的书籍时特别有用比如你想在不同的平台上展示这些书籍同时这些书籍还分属不同的分类
下面是一个简单的实现代码示例展示如何使用桥接模式来构建书城的书籍分类与展示平台
'''
# 定义书籍接口
class IBook:
def get_title(self):
pass
def get_author(self):
pass
# 具体书籍实现
class NovelBook(IBook):
def __init__(self, title, author):
self.title = title
self.author = author
def get_title(self):
return self.title
def get_author(self):
return self.author
# 定义抽象分类
class BookCategory:
def __init__(self, name):
self.name = name
self.books = []
def add_book(self, book):
self.books.append(book)
def get_books(self):
return self.books
# 定义抽象展示平台
class DisplayPlatform:
def display(self, book):
pass
# 具体展示平台实现
class WebDisplayPlatform(DisplayPlatform):
def display(self, book):
return f"On the web: {book.get_title()} by {book.get_author()}"
class MobileDisplayPlatform(DisplayPlatform):
def display(self, book):
return f"On mobile: {book.get_title()} by {book.get_author()}"
# 桥接类,将分类与展示平台连接起来
class BookShop:
def __init__(self, category, platform):
self.category = category
self.platform = platform
def show_books(self):
for book in self.category.get_books():
print(self.platform.display(book))
# 客户端代码
if __name__ == "__main__":
# 创建书籍
novel1 = NovelBook("The Great Gatsby", "F. Scott Fitzgerald")
novel2 = NovelBook("1984", "George Orwell")
# 创建分类
fiction_category = BookCategory("Fiction")
fiction_category.add_book(novel1)
fiction_category.add_book(novel2)
# 创建展示平台
web_platform = WebDisplayPlatform()
mobile_platform = MobileDisplayPlatform()
# 创建书城并展示书籍
web_bookshop = BookShop(fiction_category, web_platform)
web_bookshop.show_books()
mobile_bookshop = BookShop(fiction_category, mobile_platform)
mobile_bookshop.show_books()
'''
在这个示例中
IBook 是一个接口定义了书籍应有的行为比如获取标题和作者
NovelBook 是一个具体书籍类实现了 IBook 接口
BookCategory 是一个书籍分类类它可以包含多个书籍实例
DisplayPlatform 是一个抽象展示平台类定义了如何展示书籍
WebDisplayPlatform MobileDisplayPlatform 是具体展示平台类分别实现了 DisplayPlatform 接口以提供不同的展示方式
BookShop 是一个桥接类它将书籍分类与展示平台连接起来通过 show_books 方法可以展示分类中的所有书籍
'''

@ -0,0 +1,66 @@
# 和工厂模式类似,不过这里的结果只是产生不同的类方法
# 设想书店有多种折扣策略,比如“普通会员折扣”、“金牌会员折扣”和“无折扣”。每种折扣策略都是一个具体的策略实现。
from abc import ABC, abstractmethod
########################################################
# 创建折扣策略接口
########################################################
class DiscountStrategy(ABC):
@abstractmethod
def calculate_discount(self, book_price):
pass
########################################################
# 创建实现了DiscountStrategy接口的具体折扣策略类
########################################################
class NoDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price # 无折扣,原价返回
class RegularMemberDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price * 0.9 # 普通会员9折
class GoldMemberDiscountStrategy(DiscountStrategy):
def calculate_discount(self, book_price):
return book_price * 0.8 # 金牌会员8折
########################################################
# 定义Book类和Bookstore类。Book类包含书籍的信息和价格Bookstore类则使用折扣策略来计算书籍的折后价
########################################################
class Book:
def __init__(self, title, price):
self.title = title
self.price = price
class Bookstore:
def __init__(self, discount_strategy):
self.discount_strategy = discount_strategy
def set_discount_strategy(self, discount_strategy):
self.discount_strategy = discount_strategy
def calculate_final_price(self, book):
discounted_price = self.discount_strategy.calculate_discount(book.price)
return discounted_price
if __name__ == "__main__":
# 创建书籍对象
book = Book("The Great Gatsby", 30.0)
# 创建折扣策略对象
no_discount = NoDiscountStrategy()
regular_discount = RegularMemberDiscountStrategy()
gold_discount = GoldMemberDiscountStrategy()
# 创建书店对象,并设置不同的折扣策略
bookstore = Bookstore(no_discount)
print(f"No Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")
bookstore.set_discount_strategy(regular_discount)
print(f"Regular Member Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")
bookstore.set_discount_strategy(gold_discount)
print(f"Gold Member Discount: The final price of '{book.title}' is {bookstore.calculate_final_price(book)}")

@ -0,0 +1,45 @@
# 观察者模式允许一个对象(观察者)监听另一个对象(主题)的状态变化,并在状态变化时得到通知。
# 主类信息发生变化,通知登记的各个对象(把自己当当参数传过去)自行处理变化
# 当购物车中的商品数量发生变化时,库存系统和价格计算系统需要实时更新。
# 观察者模式Observer或发布-订阅模式Publish-Subscribe
import abc
class Observer(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update(self, cart):
pass
class InventorySystem(Observer):
def update(self, cart):
# 更新库存逻辑...
pass
class PriceCalculator(Observer):
def update(self, cart):
# 重新计算总价逻辑...
pass
class ShoppingCart:
def __init__(self):
self._items = {}
self.observers = []
def add_item(self, item_id, quantity):
# 添加商品到购物车并通知所有观察者
self._items[item_id] = quantity
for observer in self.observers:
observer.update(self)
def attach(self, observer):
self.observers.append(observer)
# 创建购物车并添加观察者
cart = ShoppingCart()
inventory_system = InventorySystem()
price_calculator = PriceCalculator()
cart.attach(inventory_system)
cart.attach(price_calculator)
cart.add_item('item1', 2) # 当添加商品时,库存系统和价格计算器都会收到更新通知

@ -0,0 +1,80 @@
'''
状态模式State Pattern允许一个对象在其内部状态改变时改变它的行为
这个模式将状态封装成独立的类并将状态转换的逻辑分散到这些类中从而减少相互间的依赖
以下是一个使用状态模式的简单示例我们将创建一个订单类Order它有几个状态
Placed已下单Paid已支付Fulfilled已履行和Delivered已交付
每个状态都是一个类它们继承自一个抽象状态类OrderState
'''
################################################################################
# 定义抽象状态类和一些具体的状态类
################################################################################
class OrderState:
def handle(self, order):
pass
class PlacedState(OrderState):
def handle(self, order):
print("Order placed. Waiting for payment...")
order.set_state(order.get_paid_state())
class PaidState(OrderState):
def handle(self, order):
print("Order paid. Preparing for fulfillment...")
order.set_state(order.get_fulfilled_state())
class FulfilledState(OrderState):
def handle(self, order):
print("Order fulfilled. Preparing for delivery...")
order.set_state(order.get_delivered_state())
class DeliveredState(OrderState):
def handle(self, order):
print("Order delivered. Process completed.")
################################################################################
# 定义Order类它包含一个对当前状态的引用并且能够通过set_state方法改变其状态
################################################################################
class Order:
def __init__(self):
self._state = None
self.set_state(self.get_placed_state())
def set_state(self, state):
self._state = state
def get_state(self):
return self._state
def get_placed_state(self):
return PlacedState()
def get_paid_state(self):
return PaidState()
def get_fulfilled_state(self):
return FulfilledState()
def get_delivered_state(self):
return DeliveredState()
def process(self):
self._state.handle(self)
################################################################################
# 创建一个Order对象并模拟其状态转换
################################################################################
if __name__ == "__main__":
order = Order()
# 模拟订单处理流程
order.process() # 初始状态为Placed执行后将变为Paid
order.process() # 当前状态为Paid执行后将变为Fulfilled
order.process() # 当前状态为Fulfilled执行后将变为Delivered
order.process() # 当前状态为Delivered执行后不会改变因为Delivered是最终状态
################################################################################
# 这个例子中每个状态类都负责决定下一个状态是什么并在handle方法中触发状态转换。
# Order类不直接知道所有可能的状态转换这些逻辑被封装在状态类中。
# 这使得添加新的状态或修改现有状态的行为变得更加容易因为不需要修改Order类本身。
################################################################################

@ -0,0 +1,71 @@
'''
模板方法模式Template Method
定义算法的骨架而将一些步骤延迟到子类中实现
'''
from abc import ABC, abstractmethod
class AbstractClass(ABC):
def template_method(self):
# 这是一个模板方法,它定义了一个算法的骨架
self.base_operation1()
self.required_operations1()
self.base_operation2()
self.hook1()
self.required_operations2()
self.base_operation3()
self.hook2()
@abstractmethod
def base_operation1(self):
pass
@abstractmethod
def base_operation2(self):
pass
@abstractmethod
def base_operation3(self):
pass
@abstractmethod
def required_operations1(self):
pass
@abstractmethod
def required_operations2(self):
pass
def hook1(self):
pass # 钩子操作,子类可以选择是否覆盖
def hook2(self):
pass # 另一个钩子操作
class ConcreteClass(AbstractClass):
def base_operation1(self):
print("AbstractClass says: I am doing the bulk of the work")
def base_operation2(self):
print("AbstractClass says: But I let subclasses override some operations")
def base_operation3(self):
print("AbstractClass says: But I am doing the bulk of the work anyway")
def required_operations1(self):
print("ConcreteClass says: Implemented Operation1")
def required_operations2(self):
print("ConcreteClass says: Implemented Operation2")
def hook1(self):
print("ConcreteClass says: Overridden Hook1")
def hook2(self):
# 没有覆盖hook2所以它将执行AbstractClass中的空实现
pass
if __name__ == "__main__":
concrete_class = ConcreteClass()
concrete_class.template_method()

@ -0,0 +1,86 @@
'''
中介者模式Mediator Pattern是一种行为型设计模式它定义了一个中介对象来封装一系列对象之间的交互
中介者使各对象不需要显式地相互引用从而使其耦合松散而且可以独立地改变它们之间的交互
设想书店管理系统其中包含了多个组件比如库存管理订单处理顾客管理等
这些组件之间需要相互通信来协同工作我们可以使用中介者模式来减少这些组件之间的直接依赖
在这个示例中BookstoreMediator 充当了中介者的角色它协调了库存管理订单处理和顾客管理之间的交互
当某个事件发生时比如顾客下订单相应的组件会通过中介者发送通知中介者再根据事件类型协调其他组件的响应
'''
from abc import ABC, abstractmethod
#定义一个中介者接口和具体的中介者实现
# 中介者接口
class Mediator(ABC):
@abstractmethod
def notify(self, sender, event):
pass
# 具体的中介者实现
class BookstoreMediator(Mediator):
def __init__(self, inventory, order, customer):
self.inventory = inventory
self.order = order
self.customer = customer
def notify(self, sender, event):
if event == 'book_ordered':
print("BookstoreMediator: Order notification received. Checking inventory...")
if self.inventory.has_stock():
print("BookstoreMediator: Inventory has stock. Processing order...")
self.order.process_order()
self.customer.notify_order_success()
else:
print("BookstoreMediator: Inventory out of stock. Cancelling order...")
self.order.cancel_order()
self.customer.notify_order_failure()
# 可以添加更多的事件处理逻辑
# 库存管理组件
class Inventory:
def has_stock(self):
# 这里简化逻辑,直接返回有库存
return True
# 订单处理组件
class Order:
def process_order(self):
print("Order: Order is being processed...")
def cancel_order(self):
print("Order: Order is being cancelled...")
# 顾客管理组件
class Customer:
def notify_order_success(self):
print("Customer: Order successful. Notifying customer...")
def notify_order_failure(self):
print("Customer: Order failed. Notifying customer...")
# 组件之间的交互通过中介者进行
class BookstoreComponent:
def __init__(self, mediator):
self.mediator = mediator
def send_notification(self, event):
self.mediator.notify(self, event)
# 示例使用
if __name__ == "__main__":
# 创建组件
inventory = Inventory()
order = Order()
customer = Customer()
# 创建中介者并注入组件
mediator = BookstoreMediator(inventory, order, customer)
# 创建与中介者关联的书店组件(这里以订单为例)
order_component = BookstoreComponent(mediator)
# 触发事件(比如:下订单)
order_component.send_notification('book_ordered')

@ -0,0 +1,96 @@
'''
职责链模式Chain of Responsibility Pattern
一种行为设计模式它允许你将请求沿着处理者链进行发送
收到请求后每个处理者均可对请求进行处理或将其传递给链上的下个处理者
在书城的业务背景中可以想象有这样的场景
客户下了一个订单该订单需要经历多个处理步骤比如验证库存计算价格生成发货通知等
这些处理步骤按照一定的顺序组织成一个处理链
'''
class Order:
def __init__(self, book_id, quantity):
self.book_id = book_id
self.quantity = quantity
class Handler:
def __init__(self, successor=None):
self.successor = successor
def handle_request(self, order):
raise NotImplementedError("Subclasses must implement handle_request()")
def successor_handle_request(self, order):
if self.successor is not None:
return self.successor.handle_request(order)
class InventoryHandler(Handler):
def handle_request(self, order):
if check_inventory(order.book_id, order.quantity):
print("InventoryHandler: Inventory checked, enough stock.")
return self.successor_handle_request(order)
else:
print("InventoryHandler: Inventory check failed, not enough stock.")
return False
class PricingHandler(Handler):
def handle_request(self, order):
price = calculate_price(order.book_id, order.quantity)
print(f"PricingHandler: Price calculated, total: {price}")
return self.successor_handle_request(order)
class NotificationHandler(Handler):
def handle_request(self, order):
send_notification(order)
print("NotificationHandler: Notification sent to customer.")
# Since this is the end of the chain, no successor to call.
return True
# Dummy functions to simulate the bookstore logic
def check_inventory(book_id, quantity):
return True # Simulate enough stock
def calculate_price(book_id, quantity):
return 10.0 * quantity # Simulate a fixed price per book
def send_notification(order):
pass # Simulate sending a notification to the customer
# Setting up the Chain of Responsibility
inventory_handler = InventoryHandler()
pricing_handler = PricingHandler()
notification_handler = NotificationHandler()
inventory_handler.successor = pricing_handler
pricing_handler.successor = notification_handler
# Client code
order = Order(book_id="12345", quantity=2)
result = inventory_handler.handle_request(order)
if result:
print("Order processing completed successfully.")
else:
print("Order processing failed.")
'''
在这个例子中我们有一个Handler基类它定义了一个处理请求的基本框架
具体的处理逻辑则在子类InventoryHandlerPricingHandler和NotificationHandler中实现
这些子类形成了职责链通过successor引用链接在一起
客户下了一个订单后这个订单请求会从库存处理器InventoryHandler开始处理
如果库存足够请求将被传递给价格处理器PricingHandler来计算总价
然后价格处理器再将请求传递给通知处理器NotificationHandler
最后发送通知给客户
如果任何处理器无法处理请求例如库存不足处理将在那里终止并且返回相应的结果
这个模式的好处是可以很容易地改变处理的顺序或者增加/删除处理器而不需要修改已有的代码
'''

@ -0,0 +1,75 @@
'''
命令模式Command Pattern是一种行为设计模式它封装了一个请求作为一个对象从而让你使用不同的请求把客户端与服务端操作解耦在书城的业务背景中命令模式可以用来实现例如添加购物车项结算订单发送通知等操作
下面是一个简单的使用Python实现的命令模式示例以书城的购物车操作为例
'''
# 购物车类
class ShoppingCart:
def __init__(self):
self.items = []
def add_item(self, book_id, quantity):
self.items.append({'book_id': book_id, 'quantity': quantity})
print(f"Added {quantity} of book ID {book_id} to the cart.")
def remove_item(self, book_id):
self.items = [item for item in self.items if item['book_id'] != book_id]
print(f"Removed book ID {book_id} from the cart.")
# 命令接口
class Command:
def execute(self):
pass
# 具体命令类 - 添加购物车项
class AddToCartCommand(Command):
def __init__(self, shopping_cart, book_id, quantity):
self.shopping_cart = shopping_cart
self.book_id = book_id
self.quantity = quantity
def execute(self):
self.shopping_cart.add_item(self.book_id, self.quantity)
# 具体命令类 - 移除购物车项
class RemoveFromCartCommand(Command):
def __init__(self, shopping_cart, book_id):
self.shopping_cart = shopping_cart
self.book_id = book_id
def execute(self):
self.shopping_cart.remove_item(self.book_id)
# 调用者Invoker
class Invoker:
def __init__(self):
self.commands = []
def store_and_execute_command(self, command):
self.commands.append(command)
command.execute()
# 客户端代码
if __name__ == "__main__":
cart = ShoppingCart()
invoker = Invoker()
# 创建添加购物车项命令
add_command = AddToCartCommand(cart, "12345", 2)
invoker.store_and_execute_command(add_command)
# 创建移除购物车项命令
remove_command = RemoveFromCartCommand(cart, "12345")
invoker.store_and_execute_command(remove_command)
'''
在这个例子中我们有一个ShoppingCart类它有两个方法add_item和remove_item用于添加和移除购物车项然后我们定义了一个命令接口Command和一个调用者InvokerInvoker可以存储并执行命令
具体命令类AddToCartCommand和RemoveFromCartCommand实现了Command接口并在execute方法中调用了ShoppingCart的相应操作
客户端代码创建了ShoppingCart实例和Invoker实例并创建了添加和移除购物车项的具体命令对象这些命令对象通过调用者的store_and_execute_command方法被存储和执行
这个命令模式的实现将购物车操作和实际的执行逻辑解耦允许客户端在不知道具体操作的情况下存储和执行命令这对于支持撤销操作记录命令历史排队命令等高级功能非常有用
'''

@ -0,0 +1,75 @@
'''
备忘录模式Memento Pattern
在不破坏封装的前提下捕获一个对象的内部状态并在该对象之外保存这个状态
备忘录模式Memento Pattern可以用来保存和恢复书籍对象的内部状态
例如用户可能想要保存他们当前阅读的书籍状态比如阅读进度高亮部分等以便之后可以恢复到这个状态
'''
# 书籍类,包含阅读状态
class Book:
def __init__(self, title, author):
self.title = title
self.author = author
self.page = 1 # 初始页码
self.highlights = [] # 高亮部分
def read(self, page):
self.page = page
print(f"Reading page {self.page} of '{self.title}' by {self.author}")
def highlight(self, text):
self.highlights.append(text)
print(f"Highlighted text: {text}")
def get_current_state(self):
return BookMemento(self.page, self.highlights[:])
def restore_state(self, memento):
self.page = memento.get_page()
self.highlights = memento.get_highlights()
# 备忘录类,用于保存书籍状态
class BookMemento:
def __init__(self, page, highlights):
self.page = page
self.highlights = highlights
def get_page(self):
return self.page
def get_highlights(self):
return self.highlights
# 客户端代码
if __name__ == "__main__":
# 创建书籍对象
book = Book("The Hobbit", "J.R.R. Tolkien")
# 阅读和高亮文本
book.read(50)
book.highlight("Gollum's precious!")
# 保存当前状态
memento = book.get_current_state()
# 改变状态
book.read(100)
book.highlight("The dragon is coming!")
# 恢复状态
print("Restoring state...")
book.restore_state(memento)
print(f"Current page is {book.page}")
print("Highlighted texts:")
for highlight in book.highlights:
print(highlight)
'''
在这个示例中Book 类代表了一本书包含了页码和高亮部分等状态信息BookMemento 类用于保存书籍的某一时刻的状态Book 类提供了 get_current_state 方法来创建并返回一个包含当前阅读状态的 BookMemento 对象同样地Book 类也提供了 restore_state 方法来从 BookMemento 对象中恢复状态
客户端代码创建了一个书籍对象对其进行了阅读和高亮操作然后保存了当前状态之后改变了书籍的状态并使用之前保存的状态恢复了书籍对象
这样用户就可以在需要的时候回到之前的阅读状态无论是在同一个会话中还是在不同的会话中这种模式在需要提供撤销恢复功能的应用中非常有用
'''

@ -0,0 +1,72 @@
'''
访问者模式Visitor Pattern
表示一个作用于某对象结构中的各元素的操作它使你可以在不改变各元素的类的前提下定义作用于这些元素的新操作
访问者模式Visitor Pattern可以用于将操作逻辑从对象结构中分离出来假设我们有一个书城系统其中包含不同类型的书籍如小说教材等我们想要对不同类型的书籍执行不同的操作如打印价格增加库存等访问者模式允许我们定义一个新的操作而无需改变书籍的类结构
'''
# 书籍类作为元素Element角色
class Book:
def __init__(self, title, price, category):
self.title = title
self.price = price
self.category = category
def accept(self, visitor):
visitor.visit(self)
# 小说类,继承自书籍类
class NovelBook(Book):
def __init__(self, title, price):
super().__init__(title, price, "Novel")
# 教材类,继承自书籍类
class Textbook(Book):
def __init__(self, title, price):
super().__init__(title, price, "Textbook")
# 访问者接口
class IVisitor:
def visit(self, book):
pass
# 具体的访问者类,实现访问者接口
class PricePrinter(IVisitor):
def visit(self, book):
print(f"Price of '{book.title}': ${book.price}")
class StockAdder(IVisitor):
def __init__(self, additional_stock):
self.additional_stock = additional_stock
def visit(self, book):
# 假设这里有一个增加库存的方法,实际上应该通过数据库或其他方式更新
print(f"Adding {self.additional_stock} copies of '{book.title}' to stock")
# 客户端代码
if __name__ == "__main__":
# 创建书籍对象
novel = NovelBook("The Great Gatsby", 19.99)
textbook = Textbook("Introduction to Algorithms", 49.99)
# 创建访问者对象
price_printer = PricePrinter()
stock_adder = StockAdder(5)
# 书籍接受访问者
novel.accept(price_printer)
textbook.accept(price_printer)
# 增加库存操作
novel.accept(stock_adder)
textbook.accept(stock_adder)
'''
在这个示例中Book 类是元素角色它包含一个 accept 方法该方法接受一个访问者对象NovelBook Textbook 是具体元素它们继承自 Book IVisitor 是访问者接口声明了一个 visit 方法PricePrinter StockAdder 是实现了 IVisitor 接口的具体访问者类它们分别用于打印书籍价格和增加书籍库存
客户端代码创建了书籍对象和访问者对象并通过调用书籍的 accept 方法来接受访问者的访问这样访问者就可以对书籍执行相应的操作
需要注意的是访问者模式的一个主要缺点是增加新的操作意味着增加新的访问者类这可能会导致类的数量增加此外如果需要在访问者中访问对象结构的复杂关系代码可能会变得难以理解和维护因此在使用访问者模式时需要权衡其优缺点
'''

@ -0,0 +1,61 @@
### 内存管理
段:存放全局变量和静态变量
栈:系统自动分配释放,函数参数值,局部变量,返回地址等在此
堆:存放动态分配的数据,由开发人员自行管理
不同操作系统进程和线程实现机制有不同。
虚拟内存技术,把进程虚拟地址空间划分成用户空间和内核空间。
在 32 位操作系统中4GB 的进程地址中用户空间为 03G内核地址空间为 34G
用户不能直接操作内核地址,只有通过系统调用的方式访问。
线程共享虚拟内存和全局变量等资源,线程拥有自己的私有数据比如栈和寄存器。
## 多任务
多任务就是可以同时运行多个任务。分为并行和并发两种。
并行是真在不同CPU核上同时执行并发是轮换在一个核上执行。
## 阻塞/非阻塞
等候消息的过程中能不能干其他事
## 同步/异步
一个任务完成后才能开始下一个任务是同步,
多个任务同时在运行状态是异步 。
通知调用者的三种方式,如下
状态:即监听被调用者的状态,调用者每隔一段时间检查一次是否完成(轮询)。
通知:当被调用者执行完成后,发出通知告知调用者。
回调:当被调用者执行完成后,调用调用者提供的回调函数 。
## 进程、线程
运行一个软件就是开了一个进程
比如,一个游戏启动后为一个进程
但一个游戏需要图形渲染,联网操作能同时运行
所以将其各个部分设计为线程
即一个进程有多个线程
从操作系统层面而言
进程是分配资源的基本单位
进程之间是独立的
一个进程无法访问另一个进程的空间
一个进程运行的失败也不会影响其他进程的运行
因为操作系统可以切换进程,所以并发的进程数会超过核数
当需要创建的子进程数量巨大时,可以创建进程池
进程间常通过消息队列程序实现数据传递
一个进程内可以包含多个线程
线程是程序执行的基本单位
线程是操作系统分配处理器时间的基本单元
线程之间没有单独的地址空间,一个线程死掉就等于整个进程死掉
一个进程下的多个线程可以共享该进程的资源,包括内存。
多个线程同时对同一个全局变量操作,会出现竞争问题,从而数据结果会不正确
解决办法:某个线程要更改数据时,先将其锁定,直到将状态变成“非锁定”,其他的线程才能锁该资源。
如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
可以用一些机制解决死锁,比如超时。

@ -0,0 +1,185 @@
Python的多线程时间切片间隔可以通过 sys.setswitchinterval() 设置。其他切换触发条件
- 当线程等待I/O操作如网络请求或磁盘读写
- 某些函数(如 time.sleep())会触发切换
- 线程主动释放GIL
异步编程通常比多线程控制更精细,但,多线程相对编程简单 。
以下场景更适合使用 **多线程**
### 场景:**GUI 应用程序**
在 GUI图形用户界面应用程序中主线程负责处理用户交互而其他任务如文件读写、网络请求需要在后台运行以避免阻塞主线程导致界面卡顿。多线程可以与 GUI 主线程共享内存方便更新界面状态。线程间通信简单。GUI 框架(如 PyQt、Tkinter通常有自己的事件循环用异步编程容易冲突。
```python
import sys
import requests
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QPushButton, QLabel
from PyQt5.QtCore import QThread, pyqtSignal
# 工作线程:负责下载文件
class DownloadThread(QThread):
# 自定义信号,用于通知主线程下载进度
progress_signal = pyqtSignal(str)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
self.progress_signal.emit("开始下载...")
try:
response = requests.get(self.url, stream=True)
total_size = int(response.headers.get("content-length", 0))
downloaded_size = 0
with open("downloaded_file", "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
file.write(chunk)
downloaded_size += len(chunk)
progress = f"已下载: {downloaded_size / 1024:.2f} KB / {total_size / 1024:.2f} KB"
self.progress_signal.emit(progress)
self.progress_signal.emit("下载完成!")
except Exception as e:
self.progress_signal.emit(f"下载失败: {str(e)}")
#### 主窗口
class MainWindow(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
self.setWindowTitle("多线程下载示例")
self.setGeometry(100, 100, 300, 150)
# 布局
layout = QVBoxLayout()
# 下载按钮
self.download_button = QPushButton("开始下载", self)
self.download_button.clicked.connect(self.start_download)
layout.addWidget(self.download_button)
# 状态标签
self.status_label = QLabel("点击按钮开始下载", self)
layout.addWidget(self.status_label)
self.setLayout(layout)
def start_download(self):
# 禁用按钮,防止重复点击
self.download_button.setEnabled(False)
self.status_label.setText("准备下载...")
# 创建工作线程
self.download_thread = DownloadThread("https://example.com/large_file.zip")
self.download_thread.progress_signal.connect(self.update_status)
self.download_thread.finished.connect(self.on_download_finished)
self.download_thread.start()
def update_status(self, message):
# 更新状态标签
self.status_label.setText(message)
def on_download_finished(self):
# 下载完成后启用按钮
self.download_button.setEnabled(True)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
```
### 场景:**与阻塞式 API 交互**
某些库或 API 是阻塞式的(如某些数据库驱动、硬件接口库),无法直接使用异步编程。在这种情况下,多线程可以避免阻塞主线程。
```python
import threading
import time
import sqlite3
def query_database():
# 模拟阻塞式数据库查询
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
print("查询完成,结果:", results)
conn.close()
def main():
print("主线程开始")
# 创建线程执行数据库查询
thread = threading.Thread(target=query_database)
thread.start()
# 主线程继续执行其他任务
for i in range(5):
print(f"主线程运行中... {i}")
time.sleep(1)
thread.join()
print("主线程结束")
main()
```
### 场景:**任务队列与线程池**
在需要处理大量短期任务的场景中(如 Web 服务器的请求处理),使用线程池可以简单编程实现高效管理任务。
特别有些任务是阻塞式的,不支持异步 。
```python
import concurrent.futures
import time
def process_task(task):
print(f"开始处理任务: {task}")
time.sleep(2) # 模拟任务处理时间
print(f"完成处理任务: {task}")
def main():
print("主线程开始")
tasks = ["task1", "task2", "task3", "task4", "task5"]
# 使用线程池处理任务
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(process_task, tasks)
print("主线程结束")
main()
````
### 场景:**与 C/C++ 扩展交互**
某些 Python 库是基于 C/C++ 扩展实现的(如 `numpy`、`pandas`),这些扩展可能释放了 GIL允许在多线程中并行运行。
多线程常常更快 。
```python
import threading
import numpy as np
def compute_task(data):
result = np.sum(data)
print(f"计算结果: {result}")
def main():
print("主线程开始")
data = np.random.rand(1000000) # 生成随机数据
# 创建多个线程并行计算
threads = []
for i in range(4):
thread = threading.Thread(target=compute_task, args=(data,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print("主线程结束")
main()
```

@ -0,0 +1,48 @@
### **JIT即时编译**
JITJust-In-Time Compilation即时编译是一种在程序运行时将代码编译为机器码的技术。与传统的 **AOTAhead-Of-Time Compilation提前编译** 不同JIT 在程序执行过程中动态编译代码。JIT 跨平台,生成适合当前平台的机器码。
JIT 的工作原理:**解释执行**:程序开始时,代码以解释方式执行(逐行解释字节码)。 **热点检测**JIT 编译器监控代码执行,识别频繁执行的代码段(称为“热点”)。 **动态编译**:将热点代码编译为机器码,后续执行直接运行机器码,避免解释执行的开销。 **优化**JIT 编译器可以根据运行时信息进行优化(如内联函数、消除死代码)。
在 Python 中利用 JIT 加速的方法包括:
- **PyPy**:通用的 Python 实现适合大多数场景。pypy your_script.py
- **Numba**:专注于数值计算,适合科学计算和数据分析。用 `@jit` 装饰器标记需要加速的函数。
- **Cython**:将 Python 代码编译为 C 代码,适合需要极致性能的场景。支持 JIT 和 AOT 编译 。
- **Taichi**:专注于高性能计算,适合图形学、物理仿真等领域。
### 异步编程生态系统中的几个概念
**异步编程**
异步编程是一种编程范式,允许任务并发执行。
在 Python 中,异步编程可以通过协程、回调、事件循环等多种方式实现。
异步编程适合高并发的 I/O 密集型任务(如 Web 服务器、爬虫、实时通信), 特别是大量并发连接的任务。
**协程**
协程是异步编程的一种实现方式,协程是一种在执行过程中可以暂停和恢复的函数。
协程运行在线程之上,协程的调度完全由用户控制 。
同回调等其他异步技术相比,协程维持了正常的代码流程,提高了代码可读性。
**Async**
Async 是 Python 3.5 引入的一个关键字用于定义异步函数即协程。async def 定义的函数可以暂停执行,使用 await 等待其他操作完成,它们构成了 Python 的异步编程语法。
**asyncio**
asyncio 是 Python 标准库中管理协程的框架。
Python 的协程实现历史
- 生成器协程yield/send
- 原生协程,使用 @asyncio.coroutine 和 yield from ,已废
- 原生协程,自 Python 3.5 起async/await 成为标准 。
- 第三方库 gevent 也有不短的历史 。
### 碎片
- 网络系统常用架构 :服务端用线程池,客户端用 asynico - 异步
- 分布式任务队列系统celery ,不用自己造车 lzuDataFactory
- thread 模块是比较底层的模块, threading 模块对 thread 做了一些包装

@ -0,0 +1,18 @@
队列应用,涉及简单算法到多线程、多进程,以及分布式。
应用场景匹配的相应队列:
- 简单算法或小型任务list列表作为队列,append() 入队pop(0) 出队,简单易用。
- 单线程高性能:用 collections.deque效率最高。
- 异步编程: 用 asyncio.Queue
- 多线程安全:用 queue.Queue
- 多线程优先级任务安全:用 queue.PriorityQueue ,确保高优先级的元素优先被取出。
- 多进程通信:用 multiprocessing.Queue如果进程内部存在线程竞争就需要加锁。
注意事项:
- 非线程安全的实现(如 deque,list在多线程场景中需加锁 。
- multiprocessing.Queue 利用了操作系统提供的进程间通信IPC, Inter-Process Communication机制具体实现取决于不同操作系统的支持。
其它可能用到的相关工具:
Redis作为中间件提供共享存储和跨进程协调机制处理缓存、消息队列、分布式锁、分布式状态共享。
Celery用于管理异步任务或后台任务一般用来做周期性任务或者长时间运行的任务。可用 Redis 做存储。

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save