Compare commits

..

24 Commits
dev ... dev

Author SHA1 Message Date
zj3D 3f03b9014d 05
2 weeks ago
zj3D d782d65fb5 Remove .gitignore file from Git tracking
2 weeks ago
zj3D 5e83716c4d 04
2 weeks ago
zj3D d339ef454b 03
2 weeks ago
zj3D 654dabbb69 02
2 weeks ago
zj3D 59ea2b479b 01
2 weeks ago
zj3D 0c531354ce BC01
2 weeks ago
zj3D dc44e0be4b 04
2 weeks ago
zj3D 3ea3cbaa23 031
2 weeks ago
zj3D 0eb30e470f 03
2 weeks ago
zj3D 4abad6851f 02
2 weeks ago
zj3D 129f1aaa3f 01
2 weeks ago
zj3D 617465cec6 07
2 weeks ago
zj3D 35e58525f1 06
2 weeks ago
zj3D e5b9607dce 05
2 weeks ago
zj3D 49bf182cf8 04
2 weeks ago
zj3D ea53899bbd 04
2 weeks ago
zj3D 0606fc586c 03
2 weeks ago
zj3D b77d297f3e 02
2 weeks ago
zj3D 1712e964cf 02
2 weeks ago
zj3D 8b9e813ee2 2501
2 weeks ago
zj3D 7365ebb312 Merge branch 'dev' of https://bdgit.educoder.net/p46318075/CodePattern into dev
2 weeks ago
zj3D 524a65e492 ABC
2 weeks ago
p46318075 94800c4b9e Update readme.MD
2 months ago

4
.gitignore vendored

@ -1,4 +0,0 @@
log.txt
/test
/.venv
__pycache__

@ -48,3 +48,9 @@ for i in range(n):
# 打印频率最高的前10个词 # 打印频率最高的前10个词
for tf in word_freqs[:10]: for tf in word_freqs[:10]:
print(tf[0], '-', tf[1]) print(tf[0], '-', tf[1])
'''
想到哪里写到哪里
用的最基础的编程思想没有使用 Python 高级语法特性数据结构和算法
'''

@ -23,6 +23,7 @@ with open(testfilepath, encoding='utf8') as f:
# 打印前10个最常见的单词 # 打印前10个最常见的单词
for word, freq in word_freqs.most_common(10): for word, freq in word_freqs.most_common(10):
print(f"{word}-{freq}") print(f"{word}-{freq}")
''' '''
相比 A01 相比 A01
使用collections.Counter来计数单词频率从而简化了代码并提高了效率 使用collections.Counter来计数单词频率从而简化了代码并提高了效率

@ -8,6 +8,7 @@ words = re.findall('[a-z]{2,}',
counts = collections.Counter(w for w in words if w not in stopwords) counts = collections.Counter(w for w in words if w not in stopwords)
for (w, c) in counts.most_common(10): for (w, c) in counts.most_common(10):
print(w, '-', c) print(w, '-', c)
''' '''
熟练的软件工程师会如此简单完成任务 熟练的软件工程师会如此简单完成任务
后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法 后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法

@ -0,0 +1,24 @@
# 目标
本节使用一个书城的各方面业务需求来展示面向对象的常见设计模式 。
# 任务
背景假设为一个综合书城,提供线上线下购买,还经营一个书吧、一个报告厅。
# 说明
面向对象的模式是把编程过程中的一些思路固定化,并给一个名字方便理解 。
它是软件工程中一组经过验证的、可重复使用的代码写法 。
所以,模式不是语法,而是编程思路 。
这样做的好处是,统一大家的代码形式,提高代码可读性、可维护性、可扩展性 。
那为啥,面向过程没有这么做
是因为这个思维提炼过程,充分利用了面向对象语言的特性:封装、继承、多态 。
面向过程语言,没有这些特性,所以,面向过程程序设计一般不谈设计模式 。
因为 Python 对象协议的机制,多态、接口概念发生了根本变化 。使得一些C++、Java 的模式没用了 。比如 “ 原型模式Prototype可以使用copy.deepcopy()非常简便来创建 。另外,很多模式中继承关系也没必要了。但,下面很多示例中依旧保持了基类 。一是致敬经典,二是起到一个工程上更工整和代码注释的作用 。
# 应用场景
面向对象设计模式在管理信息系统和图形用户界面系统应用比较广泛 。

@ -1,74 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为普通做法即使用requests库通过Post请求爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='普通爬取进度', unit='', ncols=80)
title_list = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,86 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多进程做法即使用多进程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import multiprocessing
import tqdm
lock = multiprocessing.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通做法")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
title_list = []
pbar = tqdm.tqdm(total=size * 10, desc='多进程爬取进度', unit='', ncols=80)
with multiprocessing.Pool(processes=5) as pool:
results = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
results.append(pool.apply_async(spider.fetch, (config, )))
for result in results:
data = result.get()
title_list += spider.parse(data)
lock.acquire()
pbar.update(size)
lock.release()
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,89 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用多线程并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import util
import logging
from typing import List
import tqdm
lock = threading.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行多线程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='多线程爬取进度', unit='', ncols=80)
title_list = []
tasks = []
with ThreadPoolExecutor(max_workers=5) as executor:
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
future = executor.submit(spider.fetch, config)
tasks.append(future)
# 更新进度条
lock.acquire()
pbar.update(size)
lock.release()
for future in as_completed(tasks):
data = future.result()
title_list += spider.parse(data)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,89 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为协程做法即使用gevent库通过协程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import gevent
from gevent import monkey
# 打补丁使标准库能够与gevent协同工作
monkey.patch_all()
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行协程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='协程爬取进度', unit='', ncols=80)
title_list = []
def fetch_and_parse(keyword, current):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
titles = spider.parse(data)
title_list.extend(titles)
pbar.update(size)
jobs = [
gevent.spawn(fetch_and_parse, keyword, current) for keyword in keywords
for current in range(1, 11)
]
gevent.joinall(jobs)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -1,85 +0,0 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用异步并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))
```
"""
import asyncio
import util
import logging
from typing import List
import tqdm
@util.timeit_async
async def main_async(keywords: List[str],
begin_date: str,
end_date: str,
size: int = 10):
"""
使用异步方式爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行异步爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='异步爬取进度', unit='', ncols=80)
title_list = []
tasks = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
task = asyncio.create_task(spider.fetch_async(config))
tasks.append(task)
for task in asyncio.as_completed(tasks):
data = await task
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))

@ -1,25 +0,0 @@
# 目标
本节使用一个爬虫任务来展示如何追求代码的性能 。
充分理解线程、协程、进程、同步、异步、阻塞、非阻塞等概念,并能够根据具体场景选择合适的并发模型。
主线问题如何解决IO和计算速度不匹配、如何任务分解、分发和协作 。
# 任务
# 讨论分析
普通做法连续进行了五次测试时间分别为34.231s、34.091s、34.164s、34.226s、33.958s平均时间为34.134s
多进程(进程数=5连续进行了五次测试时间分别为7.719s、7.716s、7.690s、7.730s、7.711s平均时间为7.7132s
多线程(线程数=5连续进行了五次测试时间分别为7.185s、7.964s、6.983s、6.969s、7.035s平均时间为7.2272s
协程连续进行了五次测试时间分别为3.775s、3.807s、3.733s、3.824s、3.744s平均时间为3.776s
异步连续进行了五次测试时间分别为6.975s、7.675s、7.018s、7.032s、7.049s平均时间为7.1498s
为保证公平性每一次Post请求后休眠3秒
可以看出,协程的性能最好,普通做法的性能最差,多线程、多进程和异步的性能介于两者之间。
考虑到多进程和多线程是故意开的5个进程和线程而协程是单线程所以协程的性能最好。
另外,异步的性能最差,可能是由于异步的并发模型需要频繁地切换线程,导致性能下降。
总的来说,协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。
# 总结
协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。

@ -1,188 +0,0 @@
"""
"""
import re
import time
import functools
import json
import asyncio
import requests
from typing import Any, Dict, List
class Spider:
"""
爬虫类
Args:
keywords (List[str]): 用于搜索新闻的关键词列表
begin_date (str): 开始日期用于搜索
end_date (str): 结束日期用于搜索
size (int): 一次请求返回的新闻或政策的最大数量
Attributes:
URL (str): 网址
"""
# 天水市人民政府网站
URL = ('https://www.tianshui.gov.cn/aop_component/'
'/webber/search/search/search/queryPage')
def __init__(self, keywords: List[str], begin_date: str, end_date: str,
size: int):
self.keywords = keywords
self.begin_date = begin_date
self.end_date = end_date
self.size = size
def get_config(self, keyword: str, current: int) -> Dict[str, Any]:
"""
获取配置信息
Args:
keyword (str): 关键词
size (int): 一次请求返回的新闻的最大数量
Returns:
Dict[str, Any]: 配置信息
"""
return {
"aliasName": "article_data,open_data,mailbox_data,article_file",
"keyWord": keyword,
"lastkeyWord": keyword,
"searchKeyWord": False,
"orderType": "score",
"searchType": "text",
"searchScope": "3",
"searchOperator": 0,
"searchDateType": "custom",
"searchDateName": f"{self.begin_date}-{self.end_date}",
"beginDate": self.begin_date,
"endDate": self.end_date,
"showId": "c2ee13065aae85d7a998b8a3cd645961",
"auditing": ["1"],
"owner": "1912126876",
"token": "tourist",
"urlPrefix": "/aop_component/",
"page": {
"current": current,
"size": self.size,
"pageSizes": [2, 5, 10, 20, 50, 100],
"total": 0,
"totalPage": 0,
"indexs": []
},
"advance": False,
"advanceKeyWord": "",
"lang": "i18n_zh_CN"
}
def generate_headers(self) -> dict:
"""
生成请求头
Returns:
dict: 请求头
"""
return {
'Authorization':
'tourist',
'User-Agent':
('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit'
'/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari'
'/537.36 Edg/124.0.0.0')
}
def fetch(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
普通做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
time.sleep(3)
return json.loads(response)
async def fetch_async(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
异步做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
await asyncio.sleep(3)
return json.loads(response)
def parse(self, data: Dict[str, Any]) -> List[str]:
"""
解析网页内容
Args:
data (Dict[str, Any]): 网页内容
Returns:
List[str]: 标题列表
"""
title_list = []
records = data['data']['page']['records']
for i in range(self.size):
title = records[i]['title']
title = re.sub('<[^>]*>', '', title) # 去除html标签
title_list.append(title)
# print(title)
return title_list
def save(self, title_list: List[str]):
"""
保存数据
"""
pass
# 时间装饰器
def timeit(func):
"""
计算函数运行时间
Args:
func: 函数
Return:
函数
"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper
def timeit_async(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper

@ -1,29 +0,0 @@
# 目标
本节使用一个书城的各种业务环节来展示面向对象的各种设计模式 。
# 任务
背景假设为一个综合书城,提供线上线下购买,还经营一个书吧、一个报告厅。
# 说明
面向对象的模式把编程过程中的一些思路固定化,并给一个名字方便理解 。
它是软件工程中一组经过验证的、可重复使用的代码写法 。
所以,模式不是语法,而是编程思路 。
这样做的好处是,统一大家的代码形式,提高代码可读性、可维护性、可扩展性 。
那为啥,面向过程没有这么做
是因为这个思维提炼过程,充分利用了面向对象语言的特性:封装、继承、多态 。
面向过程语言,没有这些特性,所以,面向过程语言没有面向对象模式 。
因为 Python 对象协议的机制,多态、接口概念发生了根本变化 。
很多模式中,类的继承关系没必要了。下面示例中很多依旧保持了基类 。
一是致敬经典,二是起到一个工程上更工整和强注释的作用 。
另外,Python的动态语言的特性 。使得一些C++、Java 的模式没用了 。
比如 “ 原型模式Prototype可以使用copy.deepcopy()非常简便来创建 。
# 应用场景
面向对象设计模式在管理信息系统和图形用户界面系统应用比较广泛 。

@ -0,0 +1,61 @@
### 内存管理
段:存放全局变量和静态变量
栈:系统自动分配释放,函数参数值,局部变量,返回地址等在此
堆:存放动态分配的数据,由开发人员自行管理
不同操作系统进程和线程实现机制有不同。
虚拟内存技术,把进程虚拟地址空间划分成用户空间和内核空间。
在 32 位操作系统中4GB 的进程地址中用户空间为 03G内核地址空间为 34G
用户不能直接操作内核地址,只有通过系统调用的方式访问。
线程共享虚拟内存和全局变量等资源,线程拥有自己的私有数据比如栈和寄存器。
## 多任务
多任务就是可以同时运行多个任务。分为并行和并发两种。
并行是真在不同CPU核上同时执行并发是轮换在一个核上执行。
## 阻塞/非阻塞
等候消息的过程中能不能干其他事
## 同步/异步
一个任务完成后才能开始下一个任务是同步,
多个任务同时在运行状态是异步 。
通知调用者的三种方式,如下
状态:即监听被调用者的状态,调用者每隔一段时间检查一次是否完成(轮询)。
通知:当被调用者执行完成后,发出通知告知调用者。
回调:当被调用者执行完成后,调用调用者提供的回调函数 。
## 进程、线程
运行一个软件就是开了一个进程
比如,一个游戏启动后为一个进程
但一个游戏需要图形渲染,联网操作能同时运行
所以将其各个部分设计为线程
即一个进程有多个线程
从操作系统层面而言
进程是分配资源的基本单位
进程之间是独立的
一个进程无法访问另一个进程的空间
一个进程运行的失败也不会影响其他进程的运行
因为操作系统可以切换进程,所以并发的进程数会超过核数
当需要创建的子进程数量巨大时,可以创建进程池
进程间常通过消息队列程序实现数据传递
一个进程内可以包含多个线程
线程是程序执行的基本单位
线程是操作系统分配处理器时间的基本单元
线程之间没有单独的地址空间,一个线程死掉就等于整个进程死掉
一个进程下的多个线程可以共享该进程的资源,包括内存。
多个线程同时对同一个全局变量操作,会出现竞争问题,从而数据结果会不正确
解决办法:某个线程要更改数据时,先将其锁定,直到将状态变成“非锁定”,其他的线程才能锁该资源。
如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
可以用一些机制解决死锁,比如超时。

@ -0,0 +1,186 @@
Python的多线程时间切片间隔可以通过 sys.setswitchinterval() 设置。其他切换触发条件
- 当线程等待I/O操作如网络请求或磁盘读写
- 某些函数(如 time.sleep())会触发切换
- 线程主动释放GIL
异步编程通常比多线程控制更精细,但,多线程相对编程简单 。
以下场景更适合使用 **多线程**
### 场景:**GUI 应用程序**
在 GUI图形用户界面应用程序中主线程负责处理用户交互而其他任务如文件读写、网络请求需要在后台运行以避免阻塞主线程导致界面卡顿。多线程可以与 GUI 主线程共享内存方便更新界面状态。线程间通信简单。GUI 框架(如 PyQt、Tkinter通常有自己的事件循环用异步编程容易冲突。
```python
import sys
import requests
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QPushButton, QLabel
from PyQt5.QtCore import QThread, pyqtSignal
# 工作线程:负责下载文件
class DownloadThread(QThread):
# 自定义信号,用于通知主线程下载进度
progress_signal = pyqtSignal(str)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
self.progress_signal.emit("开始下载...")
try:
response = requests.get(self.url, stream=True)
total_size = int(response.headers.get("content-length", 0))
downloaded_size = 0
with open("downloaded_file", "wb") as file:
for chunk in response.iter_content(chunk_size=1024):
file.write(chunk)
downloaded_size += len(chunk)
progress = f"已下载: {downloaded_size / 1024:.2f} KB / {total_size / 1024:.2f} KB"
self.progress_signal.emit(progress)
self.progress_signal.emit("下载完成!")
except Exception as e:
self.progress_signal.emit(f"下载失败: {str(e)}")
#### 主窗口
class MainWindow(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
self.setWindowTitle("多线程下载示例")
self.setGeometry(100, 100, 300, 150)
# 布局
layout = QVBoxLayout()
# 下载按钮
self.download_button = QPushButton("开始下载", self)
self.download_button.clicked.connect(self.start_download)
layout.addWidget(self.download_button)
# 状态标签
self.status_label = QLabel("点击按钮开始下载", self)
layout.addWidget(self.status_label)
self.setLayout(layout)
def start_download(self):
# 禁用按钮,防止重复点击
self.download_button.setEnabled(False)
self.status_label.setText("准备下载...")
# 创建工作线程
self.download_thread = DownloadThread("https://example.com/large_file.zip")
self.download_thread.progress_signal.connect(self.update_status)
self.download_thread.finished.connect(self.on_download_finished)
self.download_thread.start()
def update_status(self, message):
# 更新状态标签
self.status_label.setText(message)
def on_download_finished(self):
# 下载完成后启用按钮
self.download_button.setEnabled(True)
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
```
### 场景:**与阻塞式 API 交互**
某些库或 API 是阻塞式的(如某些数据库驱动、硬件接口库),无法直接使用异步编程。在这种情况下,多线程可以避免阻塞主线程。
```python
import threading
import time
import sqlite3
def query_database():
# 模拟阻塞式数据库查询
conn = sqlite3.connect("example.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
print("查询完成,结果:", results)
conn.close()
def main():
print("主线程开始")
# 创建线程执行数据库查询
thread = threading.Thread(target=query_database)
thread.start()
# 主线程继续执行其他任务
for i in range(5):
print(f"主线程运行中... {i}")
time.sleep(1)
thread.join()
print("主线程结束")
main()
```
### 场景:**任务队列与线程池**
在需要处理大量短期任务的场景中(如 Web 服务器的请求处理),使用线程池可以简单编程实现高效管理任务。
特别有些任务是阻塞式的,不支持异步 。
```python
import concurrent.futures
import time
def process_task(task):
print(f"开始处理任务: {task}")
time.sleep(2) # 模拟任务处理时间
print(f"完成处理任务: {task}")
def main():
print("主线程开始")
tasks = ["task1", "task2", "task3", "task4", "task5"]
# 使用线程池处理任务
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(process_task, tasks)
print("主线程结束")
main()
````
### 场景:**与 C/C++ 扩展交互**
某些 Python 库是基于 C/C++ 扩展实现的(如 `numpy`、`pandas`),这些扩展可能释放了 GIL允许在多线程中并行运行。
多线程常常更快 。
```python
import threading
import numpy as np
def compute_task(data):
result = np.sum(data)
print(f"计算结果: {result}")
def main():
print("主线程开始")
data = np.random.rand(1000000) # 生成随机数据
# 创建多个线程并行计算
threads = []
for i in range(4):
thread = threading.Thread(target=compute_task, args=(data,))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print("主线程结束")
main()
```

@ -0,0 +1,48 @@
### **JIT即时编译**
JITJust-In-Time Compilation即时编译是一种在程序运行时将代码编译为机器码的技术。与传统的 **AOTAhead-Of-Time Compilation提前编译** 不同JIT 在程序执行过程中动态编译代码。JIT 跨平台,生成适合当前平台的机器码。
JIT 的工作原理:**解释执行**:程序开始时,代码以解释方式执行(逐行解释字节码)。 **热点检测**JIT 编译器监控代码执行,识别频繁执行的代码段(称为“热点”)。 **动态编译**:将热点代码编译为机器码,后续执行直接运行机器码,避免解释执行的开销。 **优化**JIT 编译器可以根据运行时信息进行优化(如内联函数、消除死代码)。
在 Python 中利用 JIT 加速的方法包括:
- **PyPy**:通用的 Python 实现适合大多数场景。pypy your_script.py
- **Numba**:专注于数值计算,适合科学计算和数据分析。用 `@jit` 装饰器标记需要加速的函数。
- **Cython**:将 Python 代码编译为 C 代码,适合需要极致性能的场景。支持 JIT 和 AOT 编译 。
- **Taichi**:专注于高性能计算,适合图形学、物理仿真等领域。
### 异步编程生态系统中的几个概念
**异步编程**
异步编程是一种编程范式,允许任务并发执行。
在 Python 中,异步编程可以通过协程、回调、事件循环等多种方式实现。
异步编程适合高并发的 I/O 密集型任务(如 Web 服务器、爬虫、实时通信), 特别是大量并发连接的任务。
**协程**
协程是异步编程的一种实现方式,协程是一种在执行过程中可以暂停和恢复的函数。
协程运行在线程之上,协程的调度完全由用户控制 。
同回调等其他异步技术相比,协程维持了正常的代码流程,提高了代码可读性。
**Async**
Async 是 Python 3.5 引入的一个关键字用于定义异步函数即协程。async def 定义的函数可以暂停执行,使用 await 等待其他操作完成,它们构成了 Python 的异步编程语法。
**asyncio**
asyncio 是 Python 标准库中管理协程的框架。
Python 的协程实现历史
- 生成器协程yield/send
- 原生协程,使用 @asyncio.coroutine 和 yield from ,已废
- 原生协程,自 Python 3.5 起async/await 成为标准 。
- 第三方库 gevent 也有不短的历史 。
### 碎片
- 网络系统常用架构 :服务端用线程池,客户端用 asynico - 异步
- 分布式任务队列系统celery ,不用自己造车 lzuDataFactory
- thread 模块是比较底层的模块, threading 模块对 thread 做了一些包装

@ -0,0 +1,48 @@
# 案例研究:股票价格分析
## 案例背景
股票价格分析是金融领域常见任务涉及从互联网获取实时或历史价格数据I/O密集型以及执行复杂计算如傅里叶变换以分析趋势计算密集型
## 任务目标
假设任务是从 Yahoo Finance 获取股票价格数据如AAPL、MSFT、GOOG然后对每组数据执行傅里叶变换以分析频率成分。
各模型的特点和适用场景说明:
** 串行执行 **
特点:顺序执行所有任务, 编程简单。
性能:作为基准。
适用场景:任务量少、无性能要求时。
** 多线程执行 **
特点:线程池并发处理 I/O 任务,但受限于 Python 的 GIL全局解释器锁计算任务无法并行。
性能:在 I/O 密集型任务中显著优于串行执行,但在计算密集型任务中提升有限。
适用场景:网络请求、文件读写等 I/O 密集型任务。
** 多进程执行 **
特点:进程池并行执行任务,可充分利用多核 CPU。
性能:在计算密集型任务(如傅里叶变换)中表现优异,但在 I/O 任务中因进程开销可能不如多线程或异步。
适用场景:数学计算、数据处理等计算密集型任务。
** 异步执行 **
特点:单线程处理并发 I/O 任务 。
性能:在 I/O 密集型任务中效率高,但在计算密集型任务中无优势。
适用场景高并发网络请求、API 调用等 I/O 密集型任务。
** 混合执行 **
数据获取使用异步方法,计算部分使用多进程并行。
结合了异步 I/O 的高效性和多进程计算的并行性。
## 性能比较和分析
多线程和异步在 I/O 密集型任务中大幅缩短时间。
多进程在计算密集型任务中提升性能。
运行结果依据实际时间因网络和硬件而异,并不能直接说明执行方式的特点。
时间消耗测评可以追踪到更细化的步骤进行分析。
通过这个案例,读者可以:
理解 I/O 密集型和计算密集型任务的区别,
了解多线程、多进程和异步编程的实现方法。
## 其它
无代理环境,可以自行查阅资源用 request 直接爬股票数据 。

@ -0,0 +1,109 @@
import yfinance as yf
import numpy as np
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import time
import functools
####################################################################
# 获取股票数据I/O密集型任务
def fetch_stock_data(stock):
print(f"Fetching data for {stock}")
data = yf.download(stock, start="2020-01-01", end="2023-01-01",auto_adjust=False)
return data['Close'].values
# 异步获取股票数据
async def async_fetch_stock_data(stock):
print(f"Fetching data for {stock}")
data = await asyncio.to_thread(yf.download, stock, start="2020-01-01", end="2023-01-01",auto_adjust=False)
return data['Close'].values
# 傅里叶变换(计算密集型任务)
def fourier_transform(data):
print("Performing Fourier Transform")
fft_data = np.fft.fft(data)
return np.abs(fft_data)
def timeit(message):
def decorator(func):
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{message}: {end_time - start_time:.2f} seconds")
return result
async def async_wrapper(*args, **kwargs):
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
print(f"{message}: {end_time - start_time:.2f} seconds")
return result
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
####################################################################
# 股票列表
stocks = ['AAPL', 'MSFT', 'GOOG']
# 串行执行
@timeit("串行执行")
def serial_execution():
for stock in stocks:
data = fetch_stock_data(stock)
fft_data = fourier_transform(data)
# 可视化(等等...
# 多线程执行(优化 I/O 密集型任务)
@timeit("多线程执行")
def threaded_execution():
with ThreadPoolExecutor(max_workers=3) as executor:
data_list = list(executor.map(fetch_stock_data, stocks))
for data in data_list:
fft_data = fourier_transform(data)
# 可视化(等等...
# 多进程执行(优化计算密集型任务)
@timeit("多进程执行")
def multiprocessing_execution():
with ProcessPoolExecutor(max_workers=3) as executor:
data_list = list(executor.map(fetch_stock_data, stocks))
with ProcessPoolExecutor(max_workers=3) as executor:
fft_data_list = list(executor.map(fourier_transform, data_list))
# 可视化(等等...
# 异步执行(优化 I/O 密集型任务)
@timeit("异步执行")
async def async_execution():
tasks = [async_fetch_stock_data(stock) for stock in stocks]
data_list = await asyncio.gather(*tasks)
for data in data_list:
fft_data = fourier_transform(data)
# 可视化(等等...
@timeit("混合执行")
async def mixed_execution():
tasks = [async_fetch_stock_data(stock) for stock in stocks]
data_list = await asyncio.gather(*tasks)
with ProcessPoolExecutor(max_workers=3) as executor:
fft_data_list = list(executor.map(fourier_transform, data_list))
# 可视化(等等...
if __name__ == "__main__":
print("串行执行:"); serial_execution()
print("\n多线程执行:") ; threaded_execution()
print("\n多进程执行:"); multiprocessing_execution()
print("\n异步执行:"); asyncio.run(async_execution())
print("\n混合执行:") ; asyncio.run(mixed_execution())

@ -0,0 +1,41 @@
从计算机系统结构的角度,提高 Python 任务执行速度的核心在于:减少解释器开销(编译/JIT、提升并行性多核/GPU、优化内存访问缓存友好、降低 I/O 瓶颈以及适配硬件特性等。当前主要办法如下:
### 计算单元层面利用多核并行计算
对于 CPU 密集型任务,使用多进程,每个进程拥有独立的 Python 解释器和内存空间,运行在独立的内核上,实现并行计算。
### I/O 层面减少等待时间
- 异步编程针对I/O请求等待手工实现任务切换完成并发执行.
- 多线程解释器自动完成I/O请求的线程切换 。
- 批量处理减少I/O请求数量 。
### 编译层面减少解释器开销
- 使用 JIT 编译器Just-In-TimeJIT编译可以在运行时将Python代码编译成机器码从而提升执行速度 。PyPy 是一种替代 CPython 的实现PyPy 的 JIT 引擎可以分析代码执行路径,优化频繁调用的函数,充分利用处理器架构。
- **Cython 编译**Cython 允许开发者为 Python 代码添加 C 类型注解,并编译为 C 代码,再由 C 编译器生成机器码。Cython 特别适合静态类型优化场景。
### 利用Python的解释器特性
- **使用内置数据类型和函数**:内置的数据类型(如列表、字典、集合等)和函数通常经过高度优化。
- **选择合适的数据结构**:例如,一些类型执行一些操作更快,一些类型更省空间。
- **减少全局变量的使用**:访问全局变量通常比局部变量慢,因为它们需要在更大的作用域中查找。
- **减少函数调用**,可降低堆栈操作开销。
- 使用列表推导式替代循环,降低频繁创建和销毁临时对象的开销。
- 使用生成器而不是列表来处理大数据集,以减少内存占用。
- 使用XX池或预分配资源。
### 使用第三方高性能库
- NumPy、Pandas这些库用 C/C++ 编写并经过优化。
- NumPy 使用连续内存块存储数据向量化操作来代替显式的Python循环更高效 。
- SIMD 指令加速NumPy、Numba、Pandas/SciPy 都使用了 SIMD。Cython 可以直接用 C 代码使用 SIMD 。
- `gzip` 模块可压缩数据,减少网络传输的数据量,提高网络传输速度。
- `mmap` 模块实现内存映射文件在处理超大文件、优化I/O性能以及进程间通信方面具有显著优势。
- `functools.lru_cache` 缓存计算结果,避免重复计算 。
## 总结
具体实施时,应根据任务特点选择合适的策略,并结合性能分析工具(如 cProfile 、timeit或 line_profiler定位瓶颈。
计算设备方面的简单提升办法:使用多机、更快的 CPU、更多核的CPU、更多的内存、更快的存储、增加 GPU/FPGA/TPU 。
此外随着Python社区的发展新的技术和工具不断涌现开发者应持续关注最新进展以便更好地优化自己的代码 。

@ -2,3 +2,13 @@
## 代码为啥要这样写,我要这样写代码 ## 代码为啥要这样写,我要这样写代码
A 代码模式
用一个简单任务,展示各种需求(完成任务简单、可读性强、可复用高、维护成本低等)下的代码写法
B 面向对象设计模式
用一个业务场景复现面向对象的经典设计模式
C 高性能模式
考虑执行时间快,内存占用少的一些办法
Loading…
Cancel
Save