diff --git a/util.py b/util.py deleted file mode 100644 index 909a3e6..0000000 --- a/util.py +++ /dev/null @@ -1,248 +0,0 @@ -import random -import string -import jieba -from collections import Counter -import time -from selenium import webdriver -from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import WebDriverWait -import requests -import re -from lxml import etree -############################################################################### -# 爬虫父类 -############################################################################### - - -class Crawler: - """爬虫父类,为子类爬虫提供基础变量与函数""" - - def __init__(self) -> None: - - self.stop_words = self.get_stop_words() - - # 一共有三个可以切换的数据源,来自三个不同的网站 - self.data_source_1 = [ - 'https://data.tpdc.ac.cn/zh-hans/data/4a68ebac-571f-4860-82b3-32d397b2ff59', - 'https://data.tpdc.ac.cn/zh-hans/data/19838f9a-9672-46d5-bd98-ca56973eb5fa', - 'https://data.tpdc.ac.cn/zh-hans/data/39018cd4-a3aa-4888-9a07-7316b52aefbc', - 'https://data.tpdc.ac.cn/zh-hans/data/12191fa5-2818-4557-b665-dbae88e557a4', - 'https://data.tpdc.ac.cn/zh-hans/data/5296105f-f1b6-4296-b2ee-8ddb0945d017', - 'https://data.tpdc.ac.cn/zh-hans/data/5c6b66b6-2699-4664-9024-0bfc4d7b6800', - 'https://data.tpdc.ac.cn/zh-hans/data/c4df7e12-ec56-4078-aebb-fb1b142ae622', - 'https://data.tpdc.ac.cn/zh-hans/data/27f4e535-16f8-47ae-be83-499cdf61f522', - 'https://data.tpdc.ac.cn/zh-hans/data/689ab245-cc98-440d-91d5-9784beb0a675', - 'https://data.tpdc.ac.cn/zh-hans/data/dc672fe7-7b00-4626-a9d5-8a0e00d791c3' - ] - - self.data_source_2 = [ - f'https://wen.lzep.cn/wen/186{i}.html' for i in range(100, 130) - ] - - self.data_source_3 = [ - f'https://www.gutenberg.org/cache/epub/73208/pg73{i}.txt' - for i in range(200, 210) - ] - - def get_stop_words(self) -> list: - """返回常用停用词列表""" - # 创建停用词列表 - stop_words = [ - 'a', 'able', 'about', 'across', 'after', 'all', 'almost', 'also', - 'am', 'among', 'an', 'and', 'any', 'are', 'as', 'at', 'be', - 'because', 'been', 'but', 'by', 'can', 'cannot', 'could', 'dear', - 'did', 'do', 'does', 'either', 'else', 'ever', 'every', 'for', - 'from', 'get', 'got', 'had', 'has', 'have', 'he', 'her', 'hers', - 'him', 'his', 'how', 'however', 'i', 'if', 'in', 'into', 'is', - 'it', 'its', 'just', 'least', 'let', 'like', 'likely', 'may', 'me', - 'might', 'most', 'must', 'my', 'neither', 'no', 'nor', 'not', 'of', - 'off', 'often', 'on', 'only', 'or', 'other', 'our', 'own', - 'rather', 'said', 'say', 'says', 'she', 'should', 'since', 'so', - 'some', 'than', 'that', 'the', 'their', 'them', 'then', 'there', - 'these', 'they', 'this', 'tis', 'to', 'too', 'twas', 'us', 'wants', - 'was', 'we', 'were', 'what', 'when', 'where', 'which', 'while', - 'who', 'whom', 'why', 'will', 'with', 'would', 'yet', 'you', - 'your', '的', '是', '一些', '将', '它们', '这个', '用于', '包含', '用于', '示例' - ] - - # 将小写字母添加到停用词列表中 - stop_words.extend(list(string.ascii_lowercase)) - - return stop_words - - def get_headers(self) -> dict: - """返回标头""" - agent_list = [ - "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 \ - Firefox/87.0", - "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, li\ - ke Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0", - 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT\ - ML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0', - "Mozilla/5.0 (Linux; Android 7.0; SM-G950U Build/NRD90M) AppleWebK\ - it/537.36 (KHTML, like Gecko) Chrome/62.0.3202.84 Mobile Safari/53\ - 7.36", - "Mozilla/5.0 (Linux; Android 8.0.0; SM-G965U Build/R16NW) AppleWeb\ - Kit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.111 Mobile Safari/\ - 537.36", - "Mozilla/5.0 (Linux; Android 8.1.0; SM-T837A) AppleWebKit/537.36 (\ - KHTML, like Gecko) Chrome/70.0.3538.80 Safari/537.36", - "Mozilla/5.0 (Linux; U; en-us; KFAPWI Build/JDQ39) AppleWebKit/535\ - .19 (KHTML, like Gecko) Silk/3.13 Safari/535.19 Silk-Accelerated=t\ - rue", - "Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \ - 550) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\ - ile Safari/537.36 Edge/14.14263", - "Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \ - 950) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\ - ile Safari/537.36 Edge/14.14263", - "Mozilla/5.0 (Linux; Android 11; moto g power (2022)) AppleWebKit/\ - 537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) AppleWebKit/537.36\ - (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (Linux; Android 6.0.1; Nexus 10 Build/MOB31T) AppleWe\ - bKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", - "Mozilla/5.0 (Linux; Android 4.4.2; Nexus 4 Build/KOT49H) AppleWeb\ - Kit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.\ - 36", - "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKi\ - t/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36\ - ", - "Mozilla/5.0 (Linux; Android 8.0.0; Nexus 5X Build/OPR4.170623.006\ - ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile S\ - afari/537.36", - ] - headers = { - "user_agent": random.choice(agent_list), - } - return headers - - def crawler_1(self) -> str: - """单线程爬取数据1""" - option = webdriver.EdgeOptions() - # 添加实验性选项"detach"并设置为True - option.add_experimental_option("detach", True) - driver = webdriver.Edge(options=option) - self.url = random.choice(self.data_source_1) - try: - driver.get(self.url) - # 等待页面加载 - element = WebDriverWait( - driver, 10).until(lambda x: driver.find_element( - by=By.XPATH, - value= - '//*[@id="app"]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]') - ) - text_content = element.text - return text_content - except Exception as result: - print(f"发现错误:{result}") - return "" - finally: - driver.quit() - - - def crawler_2(self, url: str): - """单线程爬取数据2""" - try: - response = requests.post(url, headers=self.get_headers()) - html = response.content.decode('utf-8') - tree = etree.HTML(html) - text = tree.xpath('/html/body/div/div/div[4]/div[1]/p/text()')[0] - return text - except Exception as result: - # print(f"发现错误: {result}") - # time.sleep(3) - return None - - def get_data_2(self) -> str: - """获取数据源2的一些数据并以字符串的形式收集到content中""" - content = '' - for url in self.data_source_2: - text = self.crawler_2(url) - if text: - content = content + self.crawler_2(url) - return content - - def crawler_3(self, url: str): - """单线程爬取数据3""" - try: - response = requests.get(url, headers=self.get_headers()) - text = response.content.decode('utf-8') - return text - except Exception as result: - print(f"发现错误: {result}") - # time.sleep(3) - return None - - def get_data_3(self) -> str: - """获取数据源3的一些数据并以字符串的形式收集到content中""" - content = '' - for url in self.data_source_3: - text = self.crawler_3(url) - if text: - content = content + self.crawler_3(url) - return content - - def get_freqs_of_En(self, content: str) -> list[tuple[str, int]]: - """获取英文文本的词频""" - # 获取单词 - pattern = re.compile('[\W_]+') - word_list = pattern.sub(' ', content).lower() - word_list = word_list.split() - - # 过滤停用词 - word_list = [ - w for w in word_list if (w not in self.stop_words) and len(w) >= 3 - ] - - # 统计词频 - word_freqs = {} - for word in word_list: - word_freqs[word] = word_freqs.get(word, 0) + 1 - - # 排序 - word_freqs = sorted(word_freqs.items(), - key=lambda x: x[1], - reverse=True) - return word_freqs - - def get_freqs(self, content: str) -> list[tuple[any, int]]: - """获取中文文本content的词频""" - # 分词 - words = jieba.cut(content) - - word_freqs = Counter() - for word in words: - if word not in self.stop_words: - word_freqs[word] += 1 - - word_freqs = word_freqs.most_common(10) - return word_freqs - - def print_freqs(self, word_freqs: list[tuple[any, int]]) -> None: - """打印词频""" - cnt = 1 - for keyword, freq in word_freqs: - if len(keyword) > 1: - print(f"{keyword}: {freq}") - cnt += 1 - if cnt > 10: - return - - -############################################################################### -# 时间装饰器 -############################################################################### -def timing_decorator(func): - """计算函数运行时间的装饰器""" - - def wrapper(*args, **kwargs): - start_time = time.time() # 记录开始时间 - result = func(*args, **kwargs) # 调用原始函数 - end_time = time.time() # 记录结束时间 - run_time = end_time - start_time # 计算运行时间 - print(f"运行时间: {run_time * 1:.2f} 秒") - return result - - return wrapper