import random
import string
import jieba
from collections import Counter
import time
from selenium import webdriver
from import By
from import WebDriverWait
import requests
import re
from lxml import etree
from nltk.corpus import stopwords
# 爬虫父类
class Crawler:
def __init__(self) -> None:
self.stop_words = self.get_stop_words()
# 一共有三个可以切换的数据源,来自三个不同的网站
self.data_source_qzgy = [
self.data_source_lzwz = [
f'{i}.html' for i in range(102, 125)
self.data_source_gdb = [
for i in range(200, 207)
def get_stop_words(self) -> list:
# 创建停用词列表
english_stopwords = stopwords.words('english')
stop_words = [
'的', '是', '一些', '将', '它们', '这个', '用于', '包含', '用于', '示例'
] + english_stopwords
# 将小写字母添加到停用词列表中
return stop_words
def get_headers(self) -> dict:
agent_list = [
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 \
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, li\
ke Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0",
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT\
ML, like Gecko) Chrome/ Safari/537.36 Edg/',
"Mozilla/5.0 (Linux; Android 7.0; SM-G950U Build/NRD90M) AppleWebK\
it/537.36 (KHTML, like Gecko) Chrome/62.0.3202.84 Mobile Safari/53\
"Mozilla/5.0 (Linux; Android 8.0.0; SM-G965U Build/R16NW) AppleWeb\
Kit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.111 Mobile Safari/\
"Mozilla/5.0 (Linux; Android 8.1.0; SM-T837A) AppleWebKit/537.36 (\
KHTML, like Gecko) Chrome/70.0.3538.80 Safari/537.36",
"Mozilla/5.0 (Linux; U; en-us; KFAPWI Build/JDQ39) AppleWebKit/535\
.19 (KHTML, like Gecko) Silk/3.13 Safari/535.19 Silk-Accelerated=t\
"Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \
550) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\
ile Safari/537.36 Edge/14.14263",
"Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \
950) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\
ile Safari/537.36 Edge/14.14263",
"Mozilla/5.0 (Linux; Android 11; moto g power (2022)) AppleWebKit/\
537.36 (KHTML, like Gecko) Chrome/ Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) AppleWebKit/537.36\
(KHTML, like Gecko) Chrome/ Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 10 Build/MOB31T) AppleWe\
bKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36",
"Mozilla/5.0 (Linux; Android 4.4.2; Nexus 4 Build/KOT49H) AppleWeb\
Kit/537.36 (KHTML, like Gecko) Chrome/ Mobile Safari/537.\
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKi\
t/537.36 (KHTML, like Gecko) Chrome/ Mobile Safari/537.36\
"Mozilla/5.0 (Linux; Android 8.0.0; Nexus 5X Build/OPR4.170623.006\
) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Mobile S\
headers = {
"user_agent": random.choice(agent_list),
return headers
def get_freqs_of_En(self, content: str) -> list[tuple[str, int]]:
# 获取单词
pattern = re.compile('[\W_]+')
word_list = pattern.sub(' ', content).lower()
word_list = word_list.split()
# 过滤停用词
word_list = [
w for w in word_list if (w not in self.stop_words) and len(w) >= 3
# 统计词频
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
# 排序
word_freqs = sorted(word_freqs.items(),
key=lambda x: x[1],
return word_freqs
def get_freqs_of_Cn(self, content: str) -> list[tuple[any, int]]:
# 分词
words = jieba.cut(content)
word_freqs = Counter()
for word in words:
if word not in self.stop_words:
word_freqs[word] += 1
word_freqs = word_freqs.most_common(10)
return word_freqs
def print_freqs(self, word_freqs: list[tuple[any, int]]) -> None:
cnt = 1
for keyword, freq in word_freqs:
if len(keyword) > 1:
print(f"{keyword}: {freq}")
cnt += 1
if cnt > 10:
# 青藏高原数据获取词频子类
class QZGYDate(Crawler):
def __init__(self) -> None:
def crawler_qzgy(self) -> str:
option = webdriver.EdgeOptions()
# 添加实验性选项"detach"并设置为True
option.add_experimental_option("detach", True)
driver = webdriver.Edge(options=option)
self.url = random.choice(self.data_source_qzgy)
# 等待页面加载
element = WebDriverWait(
driver, 10).until(lambda x: driver.find_element(
text_content = element.text
return text_content
except Exception as result:
return ""
def get_data_qzgy(self) -> str:
content = self.crawler_qzgy()
return content
# 泸州问政数据获取词频子类
class LZWZDataCrawler(Crawler):
def __init__(self) -> None:
def crawler_lzwz(self, url: str):
response =, headers=self.get_headers())
html = response.content.decode('utf-8')
tree = etree.HTML(html)
text = tree.xpath('/html/body/div/div/div[4]/div[1]/p/text()')[0]
return text
except :
return None
def get_data_lzwz(self) -> str:
content = ''
for url in self.data_source_lzwz:
text = self.crawler_lzwz(url)
if text:
content = content + self.crawler_lzwz(url)
return content
# 古登堡数据获取词频子类
class GDBDate(Crawler):
def __init__(self):
def crawler_gdb(self, url: str):
response = requests.get(url, headers=self.get_headers())
text = response.content.decode('utf-8')
return text
except Exception as result:
print(f"发现错误: {result}")
# time.sleep(3)
return None
def get_data_gdb(self):
content = ''
for url in self.data_source_gdb:
text = self.crawler_gdb(url)
if text:
content = content + self.crawler_gdb(url)
return content
# 时间装饰器
def timing_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time() # 记录开始时间
result = func(*args, **kwargs) # 调用原始函数
end_time = time.time() # 记录结束时间
run_time = end_time - start_time # 计算运行时间
print(f"运行时间: {run_time * 1:.2f} 秒")
return result
return wrapper