You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

120 lines
4.0 KiB

6 months ago
import requests
import random
from concurrent.futures import ThreadPoolExecutor
import json
class Optimize:
def __init__(self):
self.proxies = [] # 初始代理列表
self.working_proxies = [] # 当前工作代理列表
self.proxy_cache_file = 'working_proxies.cache' # 代理缓存文件
# ip代理池
self.proxies = [
# {'http': 'http://203.74.125.18:8888'},
# {'http': 'http://39.165.0.137:9002'},
# 如果有HTTPS代理也可以这样添加
# {'https': 'https://example.com:port'},
]
# 加载ip代理池
self.load_proxy_pool()
# 测试成功的ip代理池
self.working_proxies = []
# 加载ip代理池
def load_proxy_pool(self):
with open('ip代理池.json', 'r', encoding='utf-8') as f:
content = json.load(f)
if content: # 检查内容是否不为空
for proxy_dict in content:
self.proxies.append(proxy_dict)
else:
print("ip代理池为空")
def load_working_proxies_from_cache(self):
# 从缓存文件中加载工作代理
try:
with open(self.proxy_cache_file, 'r') as f:
return [line.strip() for line in f.readlines()]
except FileNotFoundError:
return []
def save_working_proxies_to_cache(self, proxies):
# 将工作代理保存到缓存文件
with open(self.proxy_cache_file, 'w') as f:
for proxy in proxies:
f.write(f"{proxy}\n")
def test_proxy(self, proxy):
# 测试单个代理是否有效
# test_url = 'http://example.com'
test_url = 'https://www.baidu.com/'
try:
response = requests.get(url=test_url, proxies=proxy, timeout=5)
return response.status_code == 200
except requests.RequestException:
return False
def refresh_working_proxies(self):
# 刷新工作代理列表
with ThreadPoolExecutor(max_workers=20) as executor: # 使用线程池并行测试代理
futures = {executor.submit(self.test_proxy, proxy): proxy for proxy in self.proxies}
for future in futures:
if future.result():
self.working_proxies.append(futures[future])
# 保存代理到缓存
self.save_working_proxies_to_cache(self.working_proxies)
def get_random_working_proxy(self):
# 获取随机工作代理
if not self.working_proxies:
# 如果工作代理为空,尝试从缓存加载
self.working_proxies = self.load_working_proxies_from_cache()
if not self.working_proxies:
# 如果缓存也为空,则刷新代理列表
self.refresh_working_proxies()
if self.working_proxies:
return random.choice(self.working_proxies)
else:
print("没有找到有效的代理")
return None
def startup():
# 假设你已经有了一个Spider实例
optimize = Optimize()
# 调用refresh_working_proxies方法来刷新工作代理列表
optimize.refresh_working_proxies()
# 调用get_random_working_proxy方法来获取一个随机的工作代理
proxy = optimize.get_random_working_proxy()
if proxy:
print("获取到的代理是:", proxy)
return proxy
# 在这里使用代理进行你的网络请求
else:
print("没有可用的代理")
return None
if __name__ == '__main__':
# 假设你已经有了一个Spider实例
optimize = Optimize()
# 调用refresh_working_proxies方法来刷新工作代理列表
optimize.refresh_working_proxies()
# 调用get_random_working_proxy方法来获取一个随机的工作代理
proxy = optimize.get_random_working_proxy()
if proxy:
print("获取到的代理是:", proxy)
# 在这里使用代理进行你的网络请求
else:
print("没有可用的代理")