import random import string import jieba from collections import Counter import time from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait import requests import re from lxml import etree from nltk.corpus import stopwords ############################################################################### # 爬虫父类 ############################################################################### class Crawler: """爬虫父类,为子类爬虫提供基础变量与函数""" def __init__(self) -> None: self.stop_words = self.get_stop_words() # 一共有三个可以切换的数据源,来自三个不同的网站 self.data_source_qzgy = [ 'https://data.tpdc.ac.cn/zh-hans/data/4a68ebac-571f-4860-82b3-32d397b2ff59', 'https://data.tpdc.ac.cn/zh-hans/data/19838f9a-9672-46d5-bd98-ca56973eb5fa', 'https://data.tpdc.ac.cn/zh-hans/data/39018cd4-a3aa-4888-9a07-7316b52aefbc', 'https://data.tpdc.ac.cn/zh-hans/data/12191fa5-2818-4557-b665-dbae88e557a4', 'https://data.tpdc.ac.cn/zh-hans/data/5296105f-f1b6-4296-b2ee-8ddb0945d017', 'https://data.tpdc.ac.cn/zh-hans/data/5c6b66b6-2699-4664-9024-0bfc4d7b6800', 'https://data.tpdc.ac.cn/zh-hans/data/c4df7e12-ec56-4078-aebb-fb1b142ae622', 'https://data.tpdc.ac.cn/zh-hans/data/27f4e535-16f8-47ae-be83-499cdf61f522', 'https://data.tpdc.ac.cn/zh-hans/data/689ab245-cc98-440d-91d5-9784beb0a675', 'https://data.tpdc.ac.cn/zh-hans/data/dc672fe7-7b00-4626-a9d5-8a0e00d791c3' ] self.data_source_lzwz = [ f'https://wen.lzep.cn/wen/186{i}.html' for i in range(102, 125) ] self.data_source_gdb = [ f'https://www.gutenberg.org/cache/epub/73208/pg73{i}.txt' for i in range(200, 207) ] def get_stop_words(self) -> list: """返回常用停用词列表""" # 创建停用词列表 english_stopwords = stopwords.words('english') stop_words = [ '的', '是', '一些', '将', '它们', '这个', '用于', '包含', '用于', '示例' ] + english_stopwords # 将小写字母添加到停用词列表中 stop_words.extend(list(string.ascii_lowercase)) return stop_words def get_headers(self) -> dict: """返回标头""" agent_list = [ "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 \ Firefox/87.0", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, li\ ke Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0", 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT\ ML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0', "Mozilla/5.0 (Linux; Android 7.0; SM-G950U Build/NRD90M) AppleWebK\ it/537.36 (KHTML, like Gecko) Chrome/62.0.3202.84 Mobile Safari/53\ 7.36", "Mozilla/5.0 (Linux; Android 8.0.0; SM-G965U Build/R16NW) AppleWeb\ Kit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.111 Mobile Safari/\ 537.36", "Mozilla/5.0 (Linux; Android 8.1.0; SM-T837A) AppleWebKit/537.36 (\ KHTML, like Gecko) Chrome/70.0.3538.80 Safari/537.36", "Mozilla/5.0 (Linux; U; en-us; KFAPWI Build/JDQ39) AppleWebKit/535\ .19 (KHTML, like Gecko) Silk/3.13 Safari/535.19 Silk-Accelerated=t\ rue", "Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \ 550) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\ ile Safari/537.36 Edge/14.14263", "Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \ 950) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\ ile Safari/537.36 Edge/14.14263", "Mozilla/5.0 (Linux; Android 11; moto g power (2022)) AppleWebKit/\ 537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) AppleWebKit/537.36\ (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", "Mozilla/5.0 (Linux; Android 6.0.1; Nexus 10 Build/MOB31T) AppleWe\ bKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", "Mozilla/5.0 (Linux; Android 4.4.2; Nexus 4 Build/KOT49H) AppleWeb\ Kit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.\ 36", "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKi\ t/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36\ ", "Mozilla/5.0 (Linux; Android 8.0.0; Nexus 5X Build/OPR4.170623.006\ ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile S\ afari/537.36", ] headers = { "user_agent": random.choice(agent_list), } return headers def get_freqs_of_En(self, content: str) -> list[tuple[str, int]]: """获取英文文本的词频""" # 获取单词 pattern = re.compile('[\W_]+') word_list = pattern.sub(' ', content).lower() word_list = word_list.split() # 过滤停用词 word_list = [ w for w in word_list if (w not in self.stop_words) and len(w) >= 3 ] # 统计词频 word_freqs = {} for word in word_list: word_freqs[word] = word_freqs.get(word, 0) + 1 # 排序 word_freqs = sorted(word_freqs.items(), key=lambda x: x[1], reverse=True) return word_freqs def get_freqs_of_Cn(self, content: str) -> list[tuple[any, int]]: """获取中文文本content的词频""" # 分词 words = jieba.cut(content) word_freqs = Counter() for word in words: if word not in self.stop_words: word_freqs[word] += 1 word_freqs = word_freqs.most_common(10) return word_freqs def print_freqs(self, word_freqs: list[tuple[any, int]]) -> None: """打印词频""" cnt = 1 for keyword, freq in word_freqs: if len(keyword) > 1: print(f"{keyword}: {freq}") cnt += 1 if cnt > 10: return ############################################################################### # 青藏高原数据获取词频子类 ############################################################################### class QZGYDate(Crawler): def __init__(self) -> None: super().__init__() def crawler_qzgy(self) -> str: """单线程爬取数据1""" option = webdriver.EdgeOptions() # 添加实验性选项"detach"并设置为True option.add_experimental_option("detach", True) driver = webdriver.Edge(options=option) self.url = random.choice(self.data_source_qzgy) try: driver.get(self.url) # 等待页面加载 element = WebDriverWait( driver, 10).until(lambda x: driver.find_element( by=By.XPATH, value= '//*[@id="app"]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]') ) text_content = element.text return text_content except Exception as result: print(f"发现错误:{result}") return "" finally: driver.quit() def get_data_qzgy(self) -> str: content = self.crawler_qzgy() return content ############################################################################### # 泸州问政数据获取词频子类 ############################################################################### class LZWZDataCrawler(Crawler): def __init__(self) -> None: super().__init__() def crawler_lzwz(self, url: str): """单线程爬取数据2""" try: response = requests.post(url, headers=self.get_headers()) html = response.content.decode('utf-8') tree = etree.HTML(html) text = tree.xpath('/html/body/div/div/div[4]/div[1]/p/text()')[0] return text except : return None def get_data_lzwz(self) -> str: """获取数据源2的一些数据并以字符串的形式收集到content中""" content = '' for url in self.data_source_lzwz: text = self.crawler_lzwz(url) if text: content = content + self.crawler_lzwz(url) return content ############################################################################### # 古登堡数据获取词频子类 ############################################################################### class GDBDate(Crawler): """古登堡数据获取词频子类""" def __init__(self): super().__init__() def crawler_gdb(self, url: str): """单线程爬取数据3""" try: response = requests.get(url, headers=self.get_headers()) text = response.content.decode('utf-8') return text except Exception as result: print(f"发现错误: {result}") # time.sleep(3) return None def get_data_gdb(self): content = '' for url in self.data_source_gdb: text = self.crawler_gdb(url) if text: content = content + self.crawler_gdb(url) return content ############################################################################### # 时间装饰器 ############################################################################### def timing_decorator(func): """计算函数运行时间的装饰器""" def wrapper(*args, **kwargs): start_time = time.time() # 记录开始时间 result = func(*args, **kwargs) # 调用原始函数 end_time = time.time() # 记录结束时间 run_time = end_time - start_time # 计算运行时间 print(f"运行时间: {run_time * 1:.2f} 秒") return result return wrapper