|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
import re
|
|
|
|
|
import time
|
|
|
|
|
import functools
|
|
|
|
|
import json
|
|
|
|
|
import asyncio
|
|
|
|
|
import requests
|
|
|
|
|
from typing import Any, Dict, List
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Spider:
|
|
|
|
|
"""
|
|
|
|
|
爬虫类。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
keywords (List[str]): 用于搜索新闻的关键词列表
|
|
|
|
|
begin_date (str): 开始日期,用于搜索
|
|
|
|
|
end_date (str): 结束日期,用于搜索
|
|
|
|
|
size (int): 一次请求返回的新闻或政策的最大数量
|
|
|
|
|
|
|
|
|
|
Attributes:
|
|
|
|
|
URL (str): 网址
|
|
|
|
|
"""
|
|
|
|
|
# 天水市人民政府网站
|
|
|
|
|
URL = ('https://www.tianshui.gov.cn/aop_component/'
|
|
|
|
|
'/webber/search/search/search/queryPage')
|
|
|
|
|
|
|
|
|
|
def __init__(self, keywords: List[str], begin_date: str, end_date: str,
|
|
|
|
|
size: int):
|
|
|
|
|
self.keywords = keywords
|
|
|
|
|
self.begin_date = begin_date
|
|
|
|
|
self.end_date = end_date
|
|
|
|
|
self.size = size
|
|
|
|
|
|
|
|
|
|
def get_config(self, keyword: str, current: int) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
获取配置信息。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
keyword (str): 关键词
|
|
|
|
|
size (int): 一次请求返回的新闻的最大数量
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 配置信息
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"aliasName": "article_data,open_data,mailbox_data,article_file",
|
|
|
|
|
"keyWord": keyword,
|
|
|
|
|
"lastkeyWord": keyword,
|
|
|
|
|
"searchKeyWord": False,
|
|
|
|
|
"orderType": "score",
|
|
|
|
|
"searchType": "text",
|
|
|
|
|
"searchScope": "3",
|
|
|
|
|
"searchOperator": 0,
|
|
|
|
|
"searchDateType": "custom",
|
|
|
|
|
"searchDateName": f"{self.begin_date}-{self.end_date}",
|
|
|
|
|
"beginDate": self.begin_date,
|
|
|
|
|
"endDate": self.end_date,
|
|
|
|
|
"showId": "c2ee13065aae85d7a998b8a3cd645961",
|
|
|
|
|
"auditing": ["1"],
|
|
|
|
|
"owner": "1912126876",
|
|
|
|
|
"token": "tourist",
|
|
|
|
|
"urlPrefix": "/aop_component/",
|
|
|
|
|
"page": {
|
|
|
|
|
"current": current,
|
|
|
|
|
"size": self.size,
|
|
|
|
|
"pageSizes": [2, 5, 10, 20, 50, 100],
|
|
|
|
|
"total": 0,
|
|
|
|
|
"totalPage": 0,
|
|
|
|
|
"indexs": []
|
|
|
|
|
},
|
|
|
|
|
"advance": False,
|
|
|
|
|
"advanceKeyWord": "",
|
|
|
|
|
"lang": "i18n_zh_CN"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def generate_headers(self) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
生成请求头。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict: 请求头
|
|
|
|
|
"""
|
|
|
|
|
return {
|
|
|
|
|
'Authorization':
|
|
|
|
|
'tourist',
|
|
|
|
|
'User-Agent':
|
|
|
|
|
('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit'
|
|
|
|
|
'/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari'
|
|
|
|
|
'/537.36 Edg/124.0.0.0')
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def fetch(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
普通做法。
|
|
|
|
|
Post请求获取网页内容,并返回请求结果。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (Dict[str, Any]): 配置信息
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 请求结果
|
|
|
|
|
"""
|
|
|
|
|
response = requests.post(self.URL,
|
|
|
|
|
headers=self.generate_headers(),
|
|
|
|
|
json=config).text
|
|
|
|
|
time.sleep(3)
|
|
|
|
|
return json.loads(response)
|
|
|
|
|
|
|
|
|
|
async def fetch_async(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
|
|
"""
|
|
|
|
|
异步做法。
|
|
|
|
|
Post请求获取网页内容,并返回请求结果。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config (Dict[str, Any]): 配置信息
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
Dict[str, Any]: 请求结果
|
|
|
|
|
"""
|
|
|
|
|
response = requests.post(self.URL,
|
|
|
|
|
headers=self.generate_headers(),
|
|
|
|
|
json=config).text
|
|
|
|
|
await asyncio.sleep(3)
|
|
|
|
|
return json.loads(response)
|
|
|
|
|
|
|
|
|
|
def parse(self, data: Dict[str, Any]) -> List[str]:
|
|
|
|
|
"""
|
|
|
|
|
解析网页内容。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
data (Dict[str, Any]): 网页内容
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
List[str]: 标题列表
|
|
|
|
|
"""
|
|
|
|
|
title_list = []
|
|
|
|
|
records = data['data']['page']['records']
|
|
|
|
|
for i in range(self.size):
|
|
|
|
|
title = records[i]['title']
|
|
|
|
|
title = re.sub('<[^>]*>', '', title) # 去除html标签
|
|
|
|
|
title_list.append(title)
|
|
|
|
|
# print(title)
|
|
|
|
|
return title_list
|
|
|
|
|
|
|
|
|
|
def save(self, title_list: List[str]):
|
|
|
|
|
"""
|
|
|
|
|
保存数据。
|
|
|
|
|
"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 时间装饰器
|
|
|
|
|
def timeit(func):
|
|
|
|
|
"""
|
|
|
|
|
计算函数运行时间。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
func: 函数
|
|
|
|
|
|
|
|
|
|
Return:
|
|
|
|
|
函数
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
|
start = time.time()
|
|
|
|
|
result = func(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
print(f'{func.__name__} cost: {time.time() - start}')
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def timeit_async(func):
|
|
|
|
|
|
|
|
|
|
@functools.wraps(func)
|
|
|
|
|
async def wrapper(*args, **kwargs):
|
|
|
|
|
start = time.time()
|
|
|
|
|
result = await func(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
print(f'{func.__name__} cost: {time.time() - start}')
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
return wrapper
|