Delete '20 高性能模式/000 普通做法.py'

dev
pbr4nzfkh 11 months ago
parent a885e6b63e
commit dbd4cd1e68

@ -1,172 +0,0 @@
import re
import requests
import random
from util import Crawler
from lxml import etree
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
class CrawlerDataSource1(Crawler):
"""包含数据1爬虫普通做法相应函数的类"""
def __init__(self) -> None:
super().__init__()
self.driver = self.setup_driver()
self.url = random.choice(self.data_source_1)
def setup_driver(self):
"""设置driver"""
option = webdriver.EdgeOptions()
# 添加实验性选项"detach"并设置为True
option.add_experimental_option("detach", True)
driver = webdriver.Edge(options=option)
return driver
def crawler_2(self) -> str:
"""单线程爬取数据2"""
driver = self.driver
try:
driver.get(self.url)
# 等待页面加载
element = WebDriverWait(
driver, 10).until(lambda x: driver.find_element(
by=By.XPATH,
value=
'//*[@id="app"]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]')
)
text_content = element.text
return text_content
except Exception as result:
print(f"发现错误:{result}")
return ""
finally:
driver.quit()
class CrawlerDataSource2(Crawler):
"""包含数据2爬虫普通做法相应函数的类"""
def __init__(self) -> None:
super().__init__()
def crawler_2(self, url: str):
"""单线程爬取数据2"""
try:
response = requests.post(url, headers=self.get_headers())
html = response.content.decode('utf-8')
tree = etree.HTML(html)
text = tree.xpath('/html/body/div/div/div[4]/div[1]/p/text()')[0]
return text
except Exception as result:
print(f"发现错误: {result}")
# time.sleep(3)
return None
def get_data_2(self) -> str:
"""获取数据源2的一些数据并以字符串的形式收集到content中"""
content = ''
for url in self.data_source_2:
text = self.crawler_2(url)
if text:
content = content + self.crawler_2(url)
return content
class CrawlerDataSource3(Crawler):
"""包含数据3爬虫普通做法相应函数的类"""
def __init__(self) -> None:
super().__init__()
def crawler_3(self, url: str):
"""单线程爬取数据3"""
try:
response = requests.get(url, headers=self.get_headers())
text = response.content.decode('utf-8')
return text
except Exception as result:
print(f"发现错误: {result}")
# time.sleep(3)
return None
def get_data_3(self) -> str:
"""获取数据源3的一些数据并以字符串的形式收集到content中"""
content = ''
for url in self.data_source_3:
text = self.crawler_3(url)
if text:
content = content + self.crawler_3(url)
return content
def get_freqs_of_En(self, content: str) -> list[tuple[str, int]]:
"""获取英文文本的词频"""
# 获取单词
pattern = re.compile('[\W_]+')
word_list = pattern.sub(' ', content).lower()
word_list = word_list.split()
# 过滤停用词
word_list = [
w for w in word_list if (w not in self.stop_words) and len(w) >= 3
]
# 统计词频
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
# 排序
word_freqs = sorted(word_freqs.items(),
key=lambda x: x[1],
reverse=True)
return word_freqs
def work1() -> None:
"""简单方法爬取数据1"""
cds1 = CrawlerDataSource1()
content = cds1.crawler_2()
word_freqs = cds1.get_freqs(content)
cds1.print_freqs(word_freqs)
def work2() -> None:
"""简单方法爬取数据2"""
cds2 = CrawlerDataSource2()
content = cds2.get_data_2()
word_freqs = cds2.get_freqs(content)
cds2.print_freqs(word_freqs)
def work3() -> None:
"""简单方法爬取数据3"""
cds3 = CrawlerDataSource3()
content = cds3.get_data_3()
word_freqs = cds3.get_freqs_of_En(content)
cds3.print_freqs(word_freqs)
if __name__ == '__main__':
print("开始爬取数据1……")
t0 = datetime.now()
work1()
t1 = datetime.now()
print(f"数据1耗时:{t1-t0}")
print("数据1爬取结束。\n")
print("开始爬取数据1……")
t0 = datetime.now()
work2()
t1 = datetime.now()
print(f"数据2耗时:{t1-t0}")
print("数据2爬取结束。\n")
print("开始爬取数据1……")
t0 = datetime.now()
work3()
t1 = datetime.now()
print(f"数据3耗时:{t1-t0}")
print("数据3爬取结束。\n")
Loading…
Cancel
Save