You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
264 lines
10 KiB
264 lines
10 KiB
import random
|
|
import string
|
|
import jieba
|
|
from collections import Counter
|
|
import time
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
import requests
|
|
import re
|
|
from lxml import etree
|
|
from nltk.corpus import stopwords
|
|
|
|
###############################################################################
|
|
# 爬虫父类
|
|
###############################################################################
|
|
class Crawler:
|
|
"""爬虫父类,为子类爬虫提供基础变量与函数"""
|
|
|
|
def __init__(self) -> None:
|
|
|
|
self.stop_words = self.get_stop_words()
|
|
|
|
# 一共有三个可以切换的数据源,来自三个不同的网站
|
|
self.data_source_qzgy = [
|
|
'https://data.tpdc.ac.cn/zh-hans/data/4a68ebac-571f-4860-82b3-32d397b2ff59',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/19838f9a-9672-46d5-bd98-ca56973eb5fa',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/39018cd4-a3aa-4888-9a07-7316b52aefbc',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/12191fa5-2818-4557-b665-dbae88e557a4',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/5296105f-f1b6-4296-b2ee-8ddb0945d017',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/5c6b66b6-2699-4664-9024-0bfc4d7b6800',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/c4df7e12-ec56-4078-aebb-fb1b142ae622',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/27f4e535-16f8-47ae-be83-499cdf61f522',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/689ab245-cc98-440d-91d5-9784beb0a675',
|
|
'https://data.tpdc.ac.cn/zh-hans/data/dc672fe7-7b00-4626-a9d5-8a0e00d791c3'
|
|
]
|
|
|
|
self.data_source_lzwz = [
|
|
f'https://wen.lzep.cn/wen/186{i}.html' for i in range(102, 125)
|
|
]
|
|
|
|
self.data_source_gdb = [
|
|
f'https://www.gutenberg.org/cache/epub/73208/pg73{i}.txt'
|
|
for i in range(200, 207)
|
|
]
|
|
|
|
def get_stop_words(self) -> list:
|
|
"""返回常用停用词列表"""
|
|
# 创建停用词列表
|
|
english_stopwords = stopwords.words('english')
|
|
stop_words = [
|
|
'的', '是', '一些', '将', '它们', '这个', '用于', '包含', '用于', '示例'
|
|
] + english_stopwords
|
|
|
|
# 将小写字母添加到停用词列表中
|
|
stop_words.extend(list(string.ascii_lowercase))
|
|
|
|
return stop_words
|
|
|
|
def get_headers(self) -> dict:
|
|
"""返回标头"""
|
|
agent_list = [
|
|
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 \
|
|
Firefox/87.0",
|
|
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, li\
|
|
ke Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0",
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT\
|
|
ML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
|
|
"Mozilla/5.0 (Linux; Android 7.0; SM-G950U Build/NRD90M) AppleWebK\
|
|
it/537.36 (KHTML, like Gecko) Chrome/62.0.3202.84 Mobile Safari/53\
|
|
7.36",
|
|
"Mozilla/5.0 (Linux; Android 8.0.0; SM-G965U Build/R16NW) AppleWeb\
|
|
Kit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.111 Mobile Safari/\
|
|
537.36",
|
|
"Mozilla/5.0 (Linux; Android 8.1.0; SM-T837A) AppleWebKit/537.36 (\
|
|
KHTML, like Gecko) Chrome/70.0.3538.80 Safari/537.36",
|
|
"Mozilla/5.0 (Linux; U; en-us; KFAPWI Build/JDQ39) AppleWebKit/535\
|
|
.19 (KHTML, like Gecko) Silk/3.13 Safari/535.19 Silk-Accelerated=t\
|
|
rue",
|
|
"Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \
|
|
550) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\
|
|
ile Safari/537.36 Edge/14.14263",
|
|
"Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \
|
|
950) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\
|
|
ile Safari/537.36 Edge/14.14263",
|
|
"Mozilla/5.0 (Linux; Android 11; moto g power (2022)) AppleWebKit/\
|
|
537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
|
|
"Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) AppleWebKit/537.36\
|
|
(KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
|
|
"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 10 Build/MOB31T) AppleWe\
|
|
bKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
|
|
"Mozilla/5.0 (Linux; Android 4.4.2; Nexus 4 Build/KOT49H) AppleWeb\
|
|
Kit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.\
|
|
36",
|
|
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKi\
|
|
t/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36\
|
|
",
|
|
"Mozilla/5.0 (Linux; Android 8.0.0; Nexus 5X Build/OPR4.170623.006\
|
|
) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile S\
|
|
afari/537.36",
|
|
]
|
|
headers = {
|
|
"user_agent": random.choice(agent_list),
|
|
}
|
|
return headers
|
|
|
|
|
|
|
|
def get_freqs_of_En(self, content: str) -> list[tuple[str, int]]:
|
|
"""获取英文文本的词频"""
|
|
# 获取单词
|
|
pattern = re.compile('[\W_]+')
|
|
word_list = pattern.sub(' ', content).lower()
|
|
word_list = word_list.split()
|
|
|
|
# 过滤停用词
|
|
word_list = [
|
|
w for w in word_list if (w not in self.stop_words) and len(w) >= 3
|
|
]
|
|
|
|
# 统计词频
|
|
word_freqs = {}
|
|
for word in word_list:
|
|
word_freqs[word] = word_freqs.get(word, 0) + 1
|
|
|
|
# 排序
|
|
word_freqs = sorted(word_freqs.items(),
|
|
key=lambda x: x[1],
|
|
reverse=True)
|
|
return word_freqs
|
|
|
|
def get_freqs_of_Cn(self, content: str) -> list[tuple[any, int]]:
|
|
"""获取中文文本content的词频"""
|
|
# 分词
|
|
words = jieba.cut(content)
|
|
|
|
word_freqs = Counter()
|
|
for word in words:
|
|
if word not in self.stop_words:
|
|
word_freqs[word] += 1
|
|
|
|
word_freqs = word_freqs.most_common(10)
|
|
return word_freqs
|
|
|
|
def print_freqs(self, word_freqs: list[tuple[any, int]]) -> None:
|
|
"""打印词频"""
|
|
cnt = 1
|
|
for keyword, freq in word_freqs:
|
|
if len(keyword) > 1:
|
|
print(f"{keyword}: {freq}")
|
|
cnt += 1
|
|
if cnt > 10:
|
|
return
|
|
|
|
|
|
###############################################################################
|
|
# 青藏高原数据获取词频子类
|
|
###############################################################################
|
|
class QZGYDate(Crawler):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def crawler_qzgy(self) -> str:
|
|
"""单线程爬取数据1"""
|
|
option = webdriver.EdgeOptions()
|
|
# 添加实验性选项"detach"并设置为True
|
|
option.add_experimental_option("detach", True)
|
|
driver = webdriver.Edge(options=option)
|
|
self.url = random.choice(self.data_source_qzgy)
|
|
try:
|
|
driver.get(self.url)
|
|
# 等待页面加载
|
|
element = WebDriverWait(
|
|
driver, 10).until(lambda x: driver.find_element(
|
|
by=By.XPATH,
|
|
value=
|
|
'//*[@id="app"]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]')
|
|
)
|
|
text_content = element.text
|
|
return text_content
|
|
except Exception as result:
|
|
print(f"发现错误:{result}")
|
|
return ""
|
|
finally:
|
|
driver.quit()
|
|
|
|
def get_data_qzgy(self) -> str:
|
|
content = self.crawler_qzgy()
|
|
return content
|
|
|
|
|
|
###############################################################################
|
|
# 泸州问政数据获取词频子类
|
|
###############################################################################
|
|
class LZWZDataCrawler(Crawler):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def crawler_lzwz(self, url: str):
|
|
"""单线程爬取数据2"""
|
|
try:
|
|
response = requests.post(url, headers=self.get_headers())
|
|
html = response.content.decode('utf-8')
|
|
tree = etree.HTML(html)
|
|
text = tree.xpath('/html/body/div/div/div[4]/div[1]/p/text()')[0]
|
|
return text
|
|
except :
|
|
return None
|
|
|
|
def get_data_lzwz(self) -> str:
|
|
"""获取数据源2的一些数据并以字符串的形式收集到content中"""
|
|
content = ''
|
|
for url in self.data_source_lzwz:
|
|
text = self.crawler_lzwz(url)
|
|
if text:
|
|
content = content + self.crawler_lzwz(url)
|
|
return content
|
|
|
|
|
|
###############################################################################
|
|
# 古登堡数据获取词频子类
|
|
###############################################################################
|
|
class GDBDate(Crawler):
|
|
"""古登堡数据获取词频子类"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def crawler_gdb(self, url: str):
|
|
"""单线程爬取数据3"""
|
|
try:
|
|
response = requests.get(url, headers=self.get_headers())
|
|
text = response.content.decode('utf-8')
|
|
return text
|
|
except Exception as result:
|
|
print(f"发现错误: {result}")
|
|
# time.sleep(3)
|
|
return None
|
|
|
|
def get_data_gdb(self):
|
|
content = ''
|
|
for url in self.data_source_gdb:
|
|
text = self.crawler_gdb(url)
|
|
if text:
|
|
content = content + self.crawler_gdb(url)
|
|
return content
|
|
|
|
|
|
###############################################################################
|
|
# 时间装饰器
|
|
###############################################################################
|
|
def timing_decorator(func):
|
|
"""计算函数运行时间的装饰器"""
|
|
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.time() # 记录开始时间
|
|
result = func(*args, **kwargs) # 调用原始函数
|
|
end_time = time.time() # 记录结束时间
|
|
run_time = end_time - start_time # 计算运行时间
|
|
print(f"运行时间: {run_time * 1:.2f} 秒")
|
|
return result
|
|
|
|
return wrapper
|