From 90f439e11d3f3c7b74ca9e6dacf6c5bb6da152bf Mon Sep 17 00:00:00 2001 From: pbr4nzfkh <18879212807@163.com> Date: Fri, 22 Mar 2024 23:55:23 +0800 Subject: [PATCH] util --- util.py | 248 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 248 insertions(+) create mode 100644 util.py diff --git a/util.py b/util.py new file mode 100644 index 0000000..909a3e6 --- /dev/null +++ b/util.py @@ -0,0 +1,248 @@ +import random +import string +import jieba +from collections import Counter +import time +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +import requests +import re +from lxml import etree +############################################################################### +# 爬虫父类 +############################################################################### + + +class Crawler: + """爬虫父类,为子类爬虫提供基础变量与函数""" + + def __init__(self) -> None: + + self.stop_words = self.get_stop_words() + + # 一共有三个可以切换的数据源,来自三个不同的网站 + self.data_source_1 = [ + 'https://data.tpdc.ac.cn/zh-hans/data/4a68ebac-571f-4860-82b3-32d397b2ff59', + 'https://data.tpdc.ac.cn/zh-hans/data/19838f9a-9672-46d5-bd98-ca56973eb5fa', + 'https://data.tpdc.ac.cn/zh-hans/data/39018cd4-a3aa-4888-9a07-7316b52aefbc', + 'https://data.tpdc.ac.cn/zh-hans/data/12191fa5-2818-4557-b665-dbae88e557a4', + 'https://data.tpdc.ac.cn/zh-hans/data/5296105f-f1b6-4296-b2ee-8ddb0945d017', + 'https://data.tpdc.ac.cn/zh-hans/data/5c6b66b6-2699-4664-9024-0bfc4d7b6800', + 'https://data.tpdc.ac.cn/zh-hans/data/c4df7e12-ec56-4078-aebb-fb1b142ae622', + 'https://data.tpdc.ac.cn/zh-hans/data/27f4e535-16f8-47ae-be83-499cdf61f522', + 'https://data.tpdc.ac.cn/zh-hans/data/689ab245-cc98-440d-91d5-9784beb0a675', + 'https://data.tpdc.ac.cn/zh-hans/data/dc672fe7-7b00-4626-a9d5-8a0e00d791c3' + ] + + self.data_source_2 = [ + f'https://wen.lzep.cn/wen/186{i}.html' for i in range(100, 130) + ] + + self.data_source_3 = [ + f'https://www.gutenberg.org/cache/epub/73208/pg73{i}.txt' + for i in range(200, 210) + ] + + def get_stop_words(self) -> list: + """返回常用停用词列表""" + # 创建停用词列表 + stop_words = [ + 'a', 'able', 'about', 'across', 'after', 'all', 'almost', 'also', + 'am', 'among', 'an', 'and', 'any', 'are', 'as', 'at', 'be', + 'because', 'been', 'but', 'by', 'can', 'cannot', 'could', 'dear', + 'did', 'do', 'does', 'either', 'else', 'ever', 'every', 'for', + 'from', 'get', 'got', 'had', 'has', 'have', 'he', 'her', 'hers', + 'him', 'his', 'how', 'however', 'i', 'if', 'in', 'into', 'is', + 'it', 'its', 'just', 'least', 'let', 'like', 'likely', 'may', 'me', + 'might', 'most', 'must', 'my', 'neither', 'no', 'nor', 'not', 'of', + 'off', 'often', 'on', 'only', 'or', 'other', 'our', 'own', + 'rather', 'said', 'say', 'says', 'she', 'should', 'since', 'so', + 'some', 'than', 'that', 'the', 'their', 'them', 'then', 'there', + 'these', 'they', 'this', 'tis', 'to', 'too', 'twas', 'us', 'wants', + 'was', 'we', 'were', 'what', 'when', 'where', 'which', 'while', + 'who', 'whom', 'why', 'will', 'with', 'would', 'yet', 'you', + 'your', '的', '是', '一些', '将', '它们', '这个', '用于', '包含', '用于', '示例' + ] + + # 将小写字母添加到停用词列表中 + stop_words.extend(list(string.ascii_lowercase)) + + return stop_words + + def get_headers(self) -> dict: + """返回标头""" + agent_list = [ + "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 \ + Firefox/87.0", + "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, li\ + ke Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0", + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT\ + ML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0', + "Mozilla/5.0 (Linux; Android 7.0; SM-G950U Build/NRD90M) AppleWebK\ + it/537.36 (KHTML, like Gecko) Chrome/62.0.3202.84 Mobile Safari/53\ + 7.36", + "Mozilla/5.0 (Linux; Android 8.0.0; SM-G965U Build/R16NW) AppleWeb\ + Kit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.111 Mobile Safari/\ + 537.36", + "Mozilla/5.0 (Linux; Android 8.1.0; SM-T837A) AppleWebKit/537.36 (\ + KHTML, like Gecko) Chrome/70.0.3538.80 Safari/537.36", + "Mozilla/5.0 (Linux; U; en-us; KFAPWI Build/JDQ39) AppleWebKit/535\ + .19 (KHTML, like Gecko) Silk/3.13 Safari/535.19 Silk-Accelerated=t\ + rue", + "Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \ + 550) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\ + ile Safari/537.36 Edge/14.14263", + "Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \ + 950) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\ + ile Safari/537.36 Edge/14.14263", + "Mozilla/5.0 (Linux; Android 11; moto g power (2022)) AppleWebKit/\ + 537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) AppleWebKit/537.36\ + (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 6.0.1; Nexus 10 Build/MOB31T) AppleWe\ + bKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", + "Mozilla/5.0 (Linux; Android 4.4.2; Nexus 4 Build/KOT49H) AppleWeb\ + Kit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.\ + 36", + "Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKi\ + t/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36\ + ", + "Mozilla/5.0 (Linux; Android 8.0.0; Nexus 5X Build/OPR4.170623.006\ + ) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile S\ + afari/537.36", + ] + headers = { + "user_agent": random.choice(agent_list), + } + return headers + + def crawler_1(self) -> str: + """单线程爬取数据1""" + option = webdriver.EdgeOptions() + # 添加实验性选项"detach"并设置为True + option.add_experimental_option("detach", True) + driver = webdriver.Edge(options=option) + self.url = random.choice(self.data_source_1) + try: + driver.get(self.url) + # 等待页面加载 + element = WebDriverWait( + driver, 10).until(lambda x: driver.find_element( + by=By.XPATH, + value= + '//*[@id="app"]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]') + ) + text_content = element.text + return text_content + except Exception as result: + print(f"发现错误:{result}") + return "" + finally: + driver.quit() + + + def crawler_2(self, url: str): + """单线程爬取数据2""" + try: + response = requests.post(url, headers=self.get_headers()) + html = response.content.decode('utf-8') + tree = etree.HTML(html) + text = tree.xpath('/html/body/div/div/div[4]/div[1]/p/text()')[0] + return text + except Exception as result: + # print(f"发现错误: {result}") + # time.sleep(3) + return None + + def get_data_2(self) -> str: + """获取数据源2的一些数据并以字符串的形式收集到content中""" + content = '' + for url in self.data_source_2: + text = self.crawler_2(url) + if text: + content = content + self.crawler_2(url) + return content + + def crawler_3(self, url: str): + """单线程爬取数据3""" + try: + response = requests.get(url, headers=self.get_headers()) + text = response.content.decode('utf-8') + return text + except Exception as result: + print(f"发现错误: {result}") + # time.sleep(3) + return None + + def get_data_3(self) -> str: + """获取数据源3的一些数据并以字符串的形式收集到content中""" + content = '' + for url in self.data_source_3: + text = self.crawler_3(url) + if text: + content = content + self.crawler_3(url) + return content + + def get_freqs_of_En(self, content: str) -> list[tuple[str, int]]: + """获取英文文本的词频""" + # 获取单词 + pattern = re.compile('[\W_]+') + word_list = pattern.sub(' ', content).lower() + word_list = word_list.split() + + # 过滤停用词 + word_list = [ + w for w in word_list if (w not in self.stop_words) and len(w) >= 3 + ] + + # 统计词频 + word_freqs = {} + for word in word_list: + word_freqs[word] = word_freqs.get(word, 0) + 1 + + # 排序 + word_freqs = sorted(word_freqs.items(), + key=lambda x: x[1], + reverse=True) + return word_freqs + + def get_freqs(self, content: str) -> list[tuple[any, int]]: + """获取中文文本content的词频""" + # 分词 + words = jieba.cut(content) + + word_freqs = Counter() + for word in words: + if word not in self.stop_words: + word_freqs[word] += 1 + + word_freqs = word_freqs.most_common(10) + return word_freqs + + def print_freqs(self, word_freqs: list[tuple[any, int]]) -> None: + """打印词频""" + cnt = 1 + for keyword, freq in word_freqs: + if len(keyword) > 1: + print(f"{keyword}: {freq}") + cnt += 1 + if cnt > 10: + return + + +############################################################################### +# 时间装饰器 +############################################################################### +def timing_decorator(func): + """计算函数运行时间的装饰器""" + + def wrapper(*args, **kwargs): + start_time = time.time() # 记录开始时间 + result = func(*args, **kwargs) # 调用原始函数 + end_time = time.time() # 记录结束时间 + run_time = end_time - start_time # 计算运行时间 + print(f"运行时间: {run_time * 1:.2f} 秒") + return result + + return wrapper