dev #17

Merged
p46318075 merged 3 commits from pcz4qfnkl/CodePattern:dev into dev 3 months ago

4
.gitignore vendored

@ -0,0 +1,4 @@
log.txt
/test
/.venv
__pycache__

@ -7,7 +7,6 @@ with open(stopwordfilepath, encoding='utf-8') as f:
for letter in 'abcdefghijklmnopqrstuvwxyz': for letter in 'abcdefghijklmnopqrstuvwxyz':
stop_words.append(letter) stop_words.append(letter)
# 读文件,逐行扫描文本,发现词,确定不是停用词,计数 # 读文件,逐行扫描文本,发现词,确定不是停用词,计数
word_freqs = [] word_freqs = []
for line in open(testfilepath, encoding='utf-8'): for line in open(testfilepath, encoding='utf-8'):
@ -46,7 +45,6 @@ for i in range(n):
if word_freqs[j][1] < word_freqs[j + 1][1]: if word_freqs[j][1] < word_freqs[j + 1][1]:
word_freqs[j], word_freqs[j + 1] = word_freqs[j + 1], word_freqs[j] word_freqs[j], word_freqs[j + 1] = word_freqs[j + 1], word_freqs[j]
# 打印频率最高的前10个词 # 打印频率最高的前10个词
for tf in word_freqs[:10]: for tf in word_freqs[:10]:
print(tf[0], '-', tf[1]) print(tf[0], '-', tf[1])

@ -23,7 +23,6 @@ with open(testfilepath,encoding = 'utf8') as f:
# 打印前10个最常见的单词 # 打印前10个最常见的单词
for word, freq in word_freqs.most_common(10): for word, freq in word_freqs.most_common(10):
print(f"{word}-{freq}") print(f"{word}-{freq}")
''' '''
相比 A01 相比 A01
使用collections.Counter来计数单词频率从而简化了代码并提高了效率 使用collections.Counter来计数单词频率从而简化了代码并提高了效率

@ -1,11 +1,13 @@
import re, collections import re
import collections
from cppy.cp_util import stopwordfilepath, testfilepath from cppy.cp_util import stopwordfilepath, testfilepath
stopwords = set(open(stopwordfilepath, encoding='utf8').read().split(',')) stopwords = set(open(stopwordfilepath, encoding='utf8').read().split(','))
words = re.findall('[a-z]{2,}', open( testfilepath,encoding = 'utf8').read().lower()) words = re.findall('[a-z]{2,}',
open(testfilepath, encoding='utf8').read().lower())
counts = collections.Counter(w for w in words if w not in stopwords) counts = collections.Counter(w for w in words if w not in stopwords)
for (w, c) in counts.most_common(10) : print(w, '-', c) for (w, c) in counts.most_common(10):
print(w, '-', c)
''' '''
熟练的软件工程师会如此简单完成任务 熟练的软件工程师会如此简单完成任务
后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法 后面的例子我们必须变的啰嗦一些不能用这种太 hacker 的写法

@ -3,42 +3,88 @@ from cppy.cp_util import *
class DataStorageManager: class DataStorageManager:
""" 数据模型 """ """
数据模型读取文件内容并将内容分割成单词
Attributes:
_data: 单词列表
Methods:
_words (self): 返回分割后的单词列表
"""
def __init__(self, path_to_file): def __init__(self, path_to_file):
self._data = re_split(read_file(path_to_file)) self._data = re_split(read_file(path_to_file))
def words(self): def words(self):
"""返回分割后的单词列表。"""
return self._data return self._data
class StopWordManager: class StopWordManager:
""" 停用词模型 """ """
停用词模型
Attributes:
_stop_words: 停用词列表
Methods:
is_stop_word (self, word): 判断给定单词是否为停用词
"""
def __init__(self): def __init__(self):
self._stop_words = get_stopwords() self._stop_words = get_stopwords()
def is_stop_word(self, word): def is_stop_word(self, word):
"""判断给定单词是否为停用词。"""
return word in self._stop_words return word in self._stop_words
class WordFrequencyManager: class WordFrequencyManager:
""" 词频模型 """ """
词频模型计算并管理单词的频率
Attributes:
_word_freqs: 使用 Counter 存储单词及其出现次数
Methods:
increment_count (self, word): 计算词频
sorted(self): 返回按出现次数排序的单词列表
"""
def __init__(self): def __init__(self):
self._word_freqs = Counter() self._word_freqs = Counter()
def increment_count(self, word): def increment_count(self, word):
"""计算词频。"""
self._word_freqs[word] += 1 self._word_freqs[word] += 1
def sorted(self): def sorted(self):
"""返回按出现次数排序的单词列表。"""
return self._word_freqs.most_common() return self._word_freqs.most_common()
class WordFrequencyController: class WordFrequencyController:
"""
控制器控制整个流程读取文件处理停用词计算词频并输出结果
Attributes:
_storage_manager: DataStorageManager 实例用于读取和处理文件内容
_stop_word_manager: StopWordManager 实例用于管理停用词
_word_freq_manager: WordFrequencyManager 实例用于计算和存储单词频率
Methods:
run(self): 运行方法遍历单词列表过滤掉停用词并计算每个单词的频率最后输出结果
"""
def __init__(self, path_to_file): def __init__(self, path_to_file):
self._storage_manager = DataStorageManager(path_to_file) self._storage_manager = DataStorageManager(path_to_file)
self._stop_word_manager = StopWordManager() self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager() self._word_freq_manager = WordFrequencyManager()
def run(self): def run(self):
"""运行方法,遍历单词列表,过滤掉停用词,并计算每个单词的频率,最后输出结果。"""
for w in self._storage_manager.words(): for w in self._storage_manager.words():
if not self._stop_word_manager.is_stop_word(w): if not self._stop_word_manager.is_stop_word(w):
self._word_freq_manager.increment_count(w) self._word_freq_manager.increment_count(w)
@ -47,11 +93,8 @@ class WordFrequencyController:
print_word_freqs(word_freqs) print_word_freqs(word_freqs)
if __name__ == '__main__': if __name__ == '__main__':
WordFrequencyController(testfilepath).run() WordFrequencyController(testfilepath).run()
''' '''
函数输入参数调用后你的马上接住返回值 函数输入参数调用后你的马上接住返回值
类输入参数后实例化后你可以需要的时候去访问你需要的数据实例属性 类输入参数后实例化后你可以需要的时候去访问你需要的数据实例属性

@ -1,29 +1,52 @@
from cppy.cp_util import * from cppy.cp_util import *
def extract_words(obj, path_to_file): def extract_words(obj, path_to_file):
"""
从文件中提取单词并存储在对象的 'data' 字段中
Args:
obj (dict): 存储数据的字典对象
path_to_file (str): 文件路径
"""
obj['data'] = extract_file_words(path_to_file) obj['data'] = extract_file_words(path_to_file)
def increment_count(obj, w): def increment_count(obj, w):
"""
增加单词的计数如果单词不存在则将其计数设置为1
参数:
obj (dict): 存储单词频率的字典对象
w (str): 单词
"""
obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w] + 1 obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w] + 1
# 数据存储对象,包含初始化和获取单词的方法
data_storage_obj = { data_storage_obj = {
'data' : [], 'data': [], # 存储单词列表
'init' : lambda path_to_file : extract_words(data_storage_obj, path_to_file), 'init': lambda path_to_file: extract_words(data_storage_obj, path_to_file
'words' : lambda : data_storage_obj['data'] ), # 初始化方法,提取文件中的单词
'words': lambda: data_storage_obj['data'] # 获取单词列表的方法
} }
# 单词频率对象,包含增加计数和排序的方法
word_freqs_obj = { word_freqs_obj = {
'freqs' : {}, 'freqs': {}, # 存储单词频率的字典
'increment_count' : lambda w : increment_count(word_freqs_obj, w), 'increment_count':
'sorted' : lambda : sort_dict(word_freqs_obj['freqs']) lambda w: increment_count(word_freqs_obj, w), # 增加单词计数的方法
'sorted': lambda: sort_dict(word_freqs_obj['freqs']) # 获取排序后的单词频率的方法
} }
if __name__ == '__main__': if __name__ == '__main__':
# 初始化数据存储对象,提取文件中的单词
data_storage_obj['init'](testfilepath) data_storage_obj['init'](testfilepath)
# 遍历单词列表,增加单词的计数
for word in data_storage_obj['words'](): for word in data_storage_obj['words']():
word_freqs_obj['increment_count'](word) word_freqs_obj['increment_count'](word)
# 获取排序后的单词频率并打印
word_freqs = word_freqs_obj['sorted']() word_freqs = word_freqs_obj['sorted']()
print_word_freqs(word_freqs) print_word_freqs(word_freqs)

@ -0,0 +1,192 @@
import site
import os, re, time
import string, operator
################################################################################
# 变量
################################################################################
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy', 'data', 'stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy', 'data', testfilename)
################################################################################
# 项目函数
################################################################################
def read_file(path_to_file):
"""
读取指定文件的内容
Args:
path_to_file (str): 文件路径
Returns:
str: 文件内容
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read()
return data
def re_split(data):
"""
使用正则表达式分割字符串将非字母字符替换为空格并将所有字符转换为小写
Args:
data (str): 输入字符串
Returns:
list: 分割后的单词列表
"""
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords(path_to_file=stopwordfilepath):
"""
获取停用词列表
Args:
path_to_file (str): 停用词文件路径默认为 stopwordfilepath
Returns:
list: 停用词列表
"""
with open(path_to_file, encoding='utf-8') as f:
data = f.read().split(',')
data.extend(list(string.ascii_lowercase))
return data
def get_chunks(file_path=testfilepath, chunk_size=1000):
"""
将文件内容分割成多个块
Args:
file_path (str): 文件路径默认为 testfilepath
chunk_size (int): 每个块的大小默认为 1000
Returns:
list: 分割后的块列表
"""
content = re_split(read_file(file_path))
chunks = [
content[i:i + chunk_size] for i in range(0, len(content), chunk_size)
]
return chunks
def extract_file_words(path_to_file):
"""
提取文件中的单词去除停用词和长度小于3的单词
Args:
path_to_file (str): 文件路径
Returns:
list: 提取后的单词列表
"""
word_list = re_split(read_file(path_to_file))
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def extract_str_words(data_str):
"""
提取字符串中的单词去除停用词和长度小于3的单词
Args:
data_str (str): 输入字符串
Returns:
list: 提取后的单词列表
"""
word_list = re_split(data_str)
stop_words = get_stopwords()
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
def count_word(word, word_freqs, stopwords):
"""
统计单词频率
Args:
word (str): 单词
word_freqs (dict): 单词频率字典
stopwords (list): 停用词列表
"""
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
"""
获取单词频率
Args:
word_list (list): 单词列表
Returns:
dict: 单词频率字典
"""
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict(word_freq):
"""
对字典进行排序
Args:
word_freq (dict): 单词频率字典
Returns:
list: 排序后的单词频率列表
"""
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
def print_word_freqs(word_freqs, n=10):
"""
打印单词频率
Args:
word_freqs (list): 单词频率列表
n (int): 打印的单词数量默认为 10
"""
for (w, c) in word_freqs[:n]:
print(w, '-', c)
################################################################################
# 通用工具
################################################################################
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 调用原始函数
end_time = time.time() # 记录结束时间
run_time = end_time - start_time # 计算运行时间
print(f"{func.__name__} 运行时间: {run_time*1000:.2f}")
return result
return wrapper
def test():
print('cppy welcome')

@ -1,93 +0,0 @@
import site
import os,re,time
import string,operator
################################################################################
# 变量
################################################################################
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy','data','stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy','data',testfilename )
################################################################################
# 项目函数
################################################################################
def read_file(path_to_file):
with open(path_to_file,encoding='utf-8') as f:
data = f.read()
return data
def re_split( data ):
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords( path_to_file = stopwordfilepath ):
with open(path_to_file,encoding='utf-8') as f:
data = f.read().split(',')
data.extend(list(string.ascii_lowercase))
return data
def get_chunks( file_path = testfilepath, chunk_size = 1000):
# 读取文件内容,分割文件内容为多个块,每个块由一个进程处理
# 可以根据实际情况调整块大小
content = re_split(read_file(file_path))
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def extract_file_words(path_to_file):
word_list = re_split( read_file(path_to_file) )
stop_words = get_stopwords()
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
def extract_str_words(data_str):
word_list = re_split( data_str )
stop_words = get_stopwords()
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
def count_word(word, word_freqs, stopwords):
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict (word_freq):
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
# return sorted( word_freq, key=lambda x: x[1], reverse=True )
def print_word_freqs( word_freqs, n = 10):
for (w, c) in word_freqs[ :n ]:
print( w, '-', c )
################################################################################
# 通用工具
################################################################################
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 调用原始函数
end_time = time.time() # 记录结束时间
run_time = end_time - start_time # 计算运行时间
print(f"{func.__name__} 运行时间: {run_time*1000:.2f}")
return result
return wrapper
def test():
print( 'cppy welcome' )

@ -0,0 +1,74 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为普通做法即使用requests库通过Post请求爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='普通爬取进度', unit='', ncols=80)
title_list = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,86 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多进程做法即使用多进程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import util
import logging
from typing import List
import multiprocessing
import tqdm
lock = multiprocessing.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行普通做法")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
title_list = []
pbar = tqdm.tqdm(total=size * 10, desc='多进程爬取进度', unit='', ncols=80)
with multiprocessing.Pool(processes=5) as pool:
results = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
results.append(pool.apply_async(spider.fetch, (config, )))
for result in results:
data = result.get()
title_list += spider.parse(data)
lock.acquire()
pbar.update(size)
lock.release()
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,89 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用多线程并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import util
import logging
from typing import List
import tqdm
lock = threading.Lock()
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行多线程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='多线程爬取进度', unit='', ncols=80)
title_list = []
tasks = []
with ThreadPoolExecutor(max_workers=5) as executor:
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
future = executor.submit(spider.fetch, config)
tasks.append(future)
# 更新进度条
lock.acquire()
pbar.update(size)
lock.release()
for future in as_completed(tasks):
data = future.result()
title_list += spider.parse(data)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,89 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为协程做法即使用gevent库通过协程并发爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)
```
"""
import gevent
from gevent import monkey
# 打补丁使标准库能够与gevent协同工作
monkey.patch_all()
import util
import logging
from typing import List
import tqdm
@util.timeit
def main(keywords: List[str], begin_date: str, end_date: str, size: int = 10):
"""
爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行协程爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='协程爬取进度', unit='', ncols=80)
title_list = []
def fetch_and_parse(keyword, current):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
data = spider.fetch(config)
titles = spider.parse(data)
title_list.extend(titles)
pbar.update(size)
jobs = [
gevent.spawn(fetch_and_parse, keyword, current) for keyword in keywords
for current in range(1, 11)
]
gevent.joinall(jobs)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
main(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10)

@ -0,0 +1,85 @@
"""
根据提供的关键词列表爬取天水市人民政府网站上指定日期内与关键词相关的新闻的标题并将其存储至数据库中
考虑到相关因素因此本代码只爬取前10页的新闻内容即最多100条新闻作为测试
此方法为多线程做法即使用异步并行爬取网页内容再使用json提取新闻内容
注意本代码中的关键词列表默认为['灾害']日期范围默认为2018年1月1日至2018年12月31日
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
Examples:
```
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))
```
"""
import asyncio
import util
import logging
from typing import List
import tqdm
@util.timeit_async
async def main_async(keywords: List[str],
begin_date: str,
end_date: str,
size: int = 10):
"""
使用异步方式爬取与提供的关键词列表相关的新闻.
Args:
keywords: 用于搜索新闻的关键词列表
begin_date: 开始日期用于搜索
end_date: 结束日期用于搜索
size: 一次请求返回的新闻或政策的最大数量
"""
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
filename='log.txt',
encoding='utf-8')
logging.info("开始运行异步爬取")
spider = util.Spider(keywords=keywords,
begin_date=begin_date,
end_date=end_date,
size=size)
pbar = tqdm.tqdm(total=size * 10, desc='异步爬取进度', unit='', ncols=80)
title_list = []
tasks = []
for keyword in keywords:
for current in range(1, 11):
logging.info(f'keyword: {keyword}, current: {current}')
config = spider.get_config(keyword, current)
task = asyncio.create_task(spider.fetch_async(config))
tasks.append(task)
for task in asyncio.as_completed(tasks):
data = await task
title_list += spider.parse(data)
pbar.update(size)
spider.save(title_list)
pbar.close()
logging.info("爬取完成")
if __name__ == "__main__":
asyncio.run(
main_async(keywords=['灾害'],
begin_date='2018-01-01',
end_date='2018-12-31',
size=10))

@ -9,3 +9,17 @@
# 讨论分析 # 讨论分析
普通做法连续进行了五次测试时间分别为34.231s、34.091s、34.164s、34.226s、33.958s平均时间为34.134s
多进程(进程数=5连续进行了五次测试时间分别为7.719s、7.716s、7.690s、7.730s、7.711s平均时间为7.7132s
多线程(线程数=5连续进行了五次测试时间分别为7.185s、7.964s、6.983s、6.969s、7.035s平均时间为7.2272s
协程连续进行了五次测试时间分别为3.775s、3.807s、3.733s、3.824s、3.744s平均时间为3.776s
异步连续进行了五次测试时间分别为6.975s、7.675s、7.018s、7.032s、7.049s平均时间为7.1498s
为保证公平性每一次Post请求后休眠3秒
可以看出,协程的性能最好,普通做法的性能最差,多线程、多进程和异步的性能介于两者之间。
考虑到多进程和多线程是故意开的5个进程和线程而协程是单线程所以协程的性能最好。
另外,异步的性能最差,可能是由于异步的并发模型需要频繁地切换线程,导致性能下降。
总的来说,协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。
# 总结
协程的性能最好,多线程和多进程的性能介于两者之间,普通做法的性能最差。

@ -1,4 +1,188 @@
"""
################################################################################ """
# 本主题通用代码 import re
################################################################################ import time
import functools
import json
import asyncio
import requests
from typing import Any, Dict, List
class Spider:
"""
爬虫类
Args:
keywords (List[str]): 用于搜索新闻的关键词列表
begin_date (str): 开始日期用于搜索
end_date (str): 结束日期用于搜索
size (int): 一次请求返回的新闻或政策的最大数量
Attributes:
URL (str): 网址
"""
# 天水市人民政府网站
URL = ('https://www.tianshui.gov.cn/aop_component/'
'/webber/search/search/search/queryPage')
def __init__(self, keywords: List[str], begin_date: str, end_date: str,
size: int):
self.keywords = keywords
self.begin_date = begin_date
self.end_date = end_date
self.size = size
def get_config(self, keyword: str, current: int) -> Dict[str, Any]:
"""
获取配置信息
Args:
keyword (str): 关键词
size (int): 一次请求返回的新闻的最大数量
Returns:
Dict[str, Any]: 配置信息
"""
return {
"aliasName": "article_data,open_data,mailbox_data,article_file",
"keyWord": keyword,
"lastkeyWord": keyword,
"searchKeyWord": False,
"orderType": "score",
"searchType": "text",
"searchScope": "3",
"searchOperator": 0,
"searchDateType": "custom",
"searchDateName": f"{self.begin_date}-{self.end_date}",
"beginDate": self.begin_date,
"endDate": self.end_date,
"showId": "c2ee13065aae85d7a998b8a3cd645961",
"auditing": ["1"],
"owner": "1912126876",
"token": "tourist",
"urlPrefix": "/aop_component/",
"page": {
"current": current,
"size": self.size,
"pageSizes": [2, 5, 10, 20, 50, 100],
"total": 0,
"totalPage": 0,
"indexs": []
},
"advance": False,
"advanceKeyWord": "",
"lang": "i18n_zh_CN"
}
def generate_headers(self) -> dict:
"""
生成请求头
Returns:
dict: 请求头
"""
return {
'Authorization':
'tourist',
'User-Agent':
('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit'
'/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari'
'/537.36 Edg/124.0.0.0')
}
def fetch(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
普通做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
time.sleep(3)
return json.loads(response)
async def fetch_async(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""
异步做法
Post请求获取网页内容并返回请求结果
Args:
config (Dict[str, Any]): 配置信息
Returns:
Dict[str, Any]: 请求结果
"""
response = requests.post(self.URL,
headers=self.generate_headers(),
json=config).text
await asyncio.sleep(3)
return json.loads(response)
def parse(self, data: Dict[str, Any]) -> List[str]:
"""
解析网页内容
Args:
data (Dict[str, Any]): 网页内容
Returns:
List[str]: 标题列表
"""
title_list = []
records = data['data']['page']['records']
for i in range(self.size):
title = records[i]['title']
title = re.sub('<[^>]*>', '', title) # 去除html标签
title_list.append(title)
# print(title)
return title_list
def save(self, title_list: List[str]):
"""
保存数据
"""
pass
# 时间装饰器
def timeit(func):
"""
计算函数运行时间
Args:
func: 函数
Return:
函数
"""
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper
def timeit_async(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.time()
result = await func(*args, **kwargs)
print(f'{func.__name__} cost: {time.time() - start}')
return result
return wrapper

Loading…
Cancel
Save