import magic
import io
import os
import gzip
import time
import json
import requests
import pandas as pd
from typing import Any
import seleniumwire.undetected_chromedriver as webdriver
from datetime import datetime as dt, timedelta
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# 爬取的城市
crawal_citys = ["上海", "香港", "东京"]
# 爬取日期范围:起始日期。格式'2023-12-01'
begin_date = None
# 爬取日期范围:结束日期。格式'2023-12-31'
end_date = None
# 爬取T+N,即N天后
start_interval = 1
# 爬取的日期
crawal_days = 60
# 设置各城市爬取的时间间隔(单位:秒)
crawal_interval = 5
# 日期间隔
days_interval = 1
# 设置页面加载的最长等待时间(单位:秒)
max_wait_time = 10
# 最大错误重试次数
max_retry_time = 5
# 是否只抓取直飞信息(True: 只抓取直飞,False: 抓取所有航班)
direct_flight = True
# 是否删除不重要的信息
del_info = False
# 是否重命名DataFrame的列名
rename_col = True
# 调试截图
enable_screenshot = False
# 允许登录(可能必须要登录才能获取数据)
login_allowed = True
# 账号
accounts = ['','']
# 密码
passwords = ['','']
#利用stealth.min.js隐藏selenium特征
stealth_js_path='./stealth.min.js'
# 定义下载stealth.min.js的函数
def download_stealth_js(file_path, url='https://raw.githubusercontent.com/requireCool/stealth.min.js/main/stealth.min.js'):
if not os.path.exists(file_path):
print(f"{file_path} not found, downloading...")
response = requests.get(url)
response.raise_for_status() # 确保请求成功
with open(file_path, 'w') as file:
file.write(response.text)
print(f"{file_path} downloaded.")
else:
print(f"{file_path} already exists, no need to download.")
def init_driver():
# options = webdriver.ChromeOptions() # 创建一个配置对象
options = webdriver.ChromeOptions() # 创建一个配置对象
options.add_argument("--incognito") # 隐身模式(无痕模式)
# options.add_argument('--headless') # 启用无头模式
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-blink-features")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--disable-extensions")
options.add_argument("--pageLoadStrategy=eager")
options.add_argument("--disable-gpu")
options.add_argument("--disable-software-rasterizer")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--ignore-certificate-errors-spki-list")
options.add_argument("--ignore-ssl-errors")
# options.add_experimental_option("excludeSwitches", ["enable-automation"]) # 不显示正在受自动化软件控制的提示
seleniumwireOptions: dict[str, Any] = {"verify_ssl": False}
# chromeDriverPath = 'C:/Program Files/Google/Chrome/Application/chromedriver' #chromedriver位置
# options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69")
# driver = webdriver.Chrome(executable_path=self.chromeDriverPath,chrome_options=self.options)
driver = webdriver.Chrome(options=options,seleniumwire_options=seleniumwireOptions)
try:
download_stealth_js(stealth_js_path)
# 读取并注入stealth.min.js
with open(stealth_js_path, 'r') as file:
stealth_js = file.read()
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": stealth_js})
except Exception as e:
print(e)
driver.maximize_window()
return driver
def gen_citys(crawal_citys):
# 生成城市组合列表
citys = []
ytic = list(reversed(crawal_citys))
for m in crawal_citys:
for n in ytic:
if m == n:
continue
else:
citys.append([m, n])
return citys
def generate_flight_dates(n, begin_date, end_date, start_interval, days_interval):
flight_dates = []
if begin_date:
begin_date = dt.strptime(begin_date, "%Y-%m-%d")
elif start_interval:
begin_date = dt.now() + timedelta(days=start_interval)
for i in range(0, n, days_interval):
flight_date = begin_date + timedelta(days=i)
flight_dates.append(flight_date.strftime("%Y-%m-%d"))
# 如果有结束日期,确保生成的日期不超过结束日期
if end_date:
end_date = dt.strptime(end_date, "%Y-%m-%d")
flight_dates = [date for date in flight_dates if dt.strptime(date, "%Y-%m-%d") <= end_date]
# 继续生成日期直到达到或超过结束日期
while dt.strptime(flight_dates[-1], "%Y-%m-%d") < end_date:
next_date = dt.strptime(flight_dates[-1], "%Y-%m-%d") + timedelta(days=days_interval)
if next_date <= end_date:
flight_dates.append(next_date.strftime("%Y-%m-%d"))
else:
break
return flight_dates
# element_to_be_clickable 函数来替代 expected_conditions.element_to_be_clickable 或 expected_conditions.visibility_of_element_located
def element_to_be_clickable(element):
def check_clickable(driver):
try:
if element.is_enabled() and element.is_displayed():
return element # 当条件满足时,返回元素本身
else:
return False
except:
return False
return check_clickable
class DataFetcher(object):
def __init__(self, driver):
self.driver = driver
self.date = None
self.city = None
self.err = 0 # 错误重试次数
self.switch_acc = 0 #切换账户
def refresh_driver(self):
try:
self.driver.refresh()
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} refresh_driver:刷新页面失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} refresh_driver:刷新页面')
self.refresh_driver()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,refresh_driver:不继续重试'
)
def remove_btn(self):
try:
#WebDriverWait(self.driver, max_wait_time).until(lambda d: d.execute_script('return typeof jQuery !== "undefined"'))
# 移除提醒
self.driver.execute_script("document.querySelectorAll('.notice-box').forEach(element => element.remove());")
# 移除在线客服
self.driver.execute_script("document.querySelectorAll('.shortcut, .shortcut-link').forEach(element => element.remove());")
# 移除分享链接
self.driver.execute_script("document.querySelectorAll('.shareline').forEach(element => element.remove());")
'''
# 使用JavaScript删除所有的
标签
self.driver.execute_script("""
var elements = document.getElementsByTagName('dl');
while(elements.length > 0){
elements[0].parentNode.removeChild(elements[0]);
}
""")
'''
except Exception as e:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} remove_btn:提醒移除失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
def check_verification_code(self):
try:
# 检查是否有验证码元素,如果有,则需要人工处理
if (len(self.driver.find_elements(By.ID, "verification-code"))+len(self.driver.find_elements(By.CLASS_NAME, "alert-title"))):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_verification_code:验证码被触发verification-code/alert-title,等待{crawal_interval*100}后重试。'
)
self.driver.quit()
time.sleep(crawal_interval*100)
self.driver = init_driver()
self.err = 0
self.switch_acc += 1
self.get_page(1)
return False
else:
# 移除注意事项
self.remove_btn()
# 如果没有找到验证码元素,则说明页面加载成功,没有触发验证码
return True
except Exception as e:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_verification_code:未知错误,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
def login(self):
if login_allowed:
account = accounts[self.switch_acc % len(accounts)]
password = passwords[self.switch_acc % len(passwords)]
try:
if len(self.driver.find_elements(By.CLASS_NAME, "lg_loginbox_modal")) == 0:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:未弹出登录界面')
WebDriverWait(self.driver, max_wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, "tl_nfes_home_header_login_wrapper_siwkn")))
# 点击飞机图标,返回主界面
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CLASS_NAME, "tl_nfes_home_header_login_wrapper_siwkn")))
ele.click()
#等待页面加载
WebDriverWait(self.driver, max_wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, "lg_loginwrap")))
else:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:已经弹出登录界面')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_elements(By.CLASS_NAME, "r_input.bbz-js-iconable-input")[0]))
ele.send_keys(account)
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:输入账户成功')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CSS_SELECTOR, "div[data-testid='accountPanel'] input[data-testid='passwordInput']")))
ele.send_keys(password)
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:输入密码成功')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CSS_SELECTOR, '[for="checkboxAgreementInput"]')))
ele.click()
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:勾选同意成功')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_elements(By.CLASS_NAME, "form_btn.form_btn--block")[0]))
ele.click()
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:登录成功')
# 保存登录截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
time.sleep(crawal_interval*3)
except Exception as e:
# 错误次数+1
self.err += 1
# 用f字符串格式化错误类型和错误信息,提供更多的调试信息
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:页面加载或元素操作失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:刷新页面')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.login()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,login:重新尝试加载页面,这次指定需要重定向到首页'
)
def get_page(self, reset_to_homepage=0):
next_stage_flag = False
try:
if reset_to_homepage == 1:
# 前往首页
self.driver.get(
"https://flights.ctrip.com/online/channel/domestic")
# 检查注意事项和验证码
if self.check_verification_code():
WebDriverWait(self.driver, max_wait_time).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "pc_home-jipiao"))
)
# 点击飞机图标,返回主界面
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_element(
By.CLASS_NAME, "pc_home-jipiao")
)
)
ele.click()
# 单程
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "radio-label")[0]
)
)
ele.click()
# 搜索
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_element(By.CLASS_NAME, "search-btn")
)
)
ele.click()
next_stage_flag = True
except Exception as e:
# 用f字符串格式化错误类型和错误信息,提供更多的调试信息
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_page:页面加载或元素操作失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
if next_stage_flag:
# 继续下一步
self.change_city()
def change_city(self):
next_stage_flag = False
try:
# 等待页面完成加载
WebDriverWait(self.driver, max_wait_time).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "form-input-v3"))
)
# 检查注意事项和验证码
if self.check_verification_code():
# 若出发地与目标值不符,则更改出发地
while self.city[0] not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[0].get_attribute("value"):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.click()
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.send_keys(Keys.CONTROL + "a")
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.send_keys(self.city[0])
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换城市【0】-{self.driver.find_elements(By.CLASS_NAME,"form-input-v3")[0].get_attribute("value")}'
)
# 若目的地与目标值不符,则更改目的地
while self.city[1] not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[1].get_attribute("value"):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.click()
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.send_keys(Keys.CONTROL + "a")
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.send_keys(self.city[1])
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换城市【1】-{self.driver.find_elements(By.CLASS_NAME,"form-input-v3")[1].get_attribute("value")}'
)
while (
self.driver.find_elements(By.CSS_SELECTOR, "[aria-label=请选择日期]")[
0
].get_attribute("value")
!= self.date
):
# 点击日期选择
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_element(
By.CLASS_NAME, "modifyDate.depart-date"
)
)
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[1]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) < int(self.date[:4]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.next-ico.iconf-right",
)[1]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[1].find_element(By.CLASS_NAME, "year").text[:-1])}小于 {int(self.date[:4])} 向右点击'
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[0]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) > int(self.date[:4]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.prev-ico.iconf-left",
)[0]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[0].find_element(By.CLASS_NAME, "year").text[:-1])}大于 {int(self.date[:4])} 向左点击'
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[0]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) == int(self.date[:4]):
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[0]
.find_element(By.CLASS_NAME, "month")
.text[:-1]
) > int(self.date[5:7]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.prev-ico.iconf-left",
)[0]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[0].find_element(By.CLASS_NAME, "month").text[:-1])}大于 {int(self.date[5:7])} 向左点击'
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[1]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) == int(self.date[:4]):
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[1]
.find_element(By.CLASS_NAME, "month")
.text[:-1]
) < int(self.date[5:7]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.next-ico.iconf-right",
)[1]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[1].find_element(By.CLASS_NAME, "month").text[:-1])}小于 {int(self.date[5:7])} 向右点击'
)
ele.click()
for m in self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
):
if int(m.find_element(By.CLASS_NAME, "year").text[:-1]) != int(
self.date[:4]
):
continue
if int(m.find_element(By.CLASS_NAME, "month").text[:-1]) != int(
self.date[5:7]
):
continue
for d in m.find_elements(By.CLASS_NAME, "date-d"):
if int(d.text) == int(self.date[-2:]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(d)
)
ele.click()
break
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期-{self.driver.find_elements(By.CSS_SELECTOR,"[aria-label=请选择日期]")[0].get_attribute("value")}'
)
while "(" not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[0].get_attribute("value"):
# Enter搜索
# ele=WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(its[1]))
# ele.send_keys(Keys.ENTER)
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.click()
# 通过低价提醒按钮实现enter键换页
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "low-price-remind"
)[0]
)
)
ele.click()
while "(" not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[1].get_attribute("value"):
# Enter搜索
# ele=WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(its[1]))
# ele.send_keys(Keys.ENTER)
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.click()
# 通过低价提醒按钮实现enter键换页
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "low-price-remind"
)[0]
)
)
ele.click()
next_stage_flag = True
except Exception as e:
# 错误次数+1
self.err += 1
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,change_city:更换城市和日期失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 检查注意事项和验证码
if self.check_verification_code():
if self.err < max_retry_time:
if len(self.driver.find_elements(By.CLASS_NAME, "lg_loginbox_modal")):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:检测到登录弹窗,需要登录'
)
self.login()
# 重试
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:重试')
self.change_city()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,change_city:重新尝试加载页面,这次指定需要重定向到首页'
)
# 删除本次请求
del self.driver.requests
# 重置错误计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
if next_stage_flag:
# 若无错误,执行下一步
self.get_data()
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:成功更换城市和日期,当前路线为:{self.city[0]}-{self.city[1]}')
def get_data(self):
try:
# 等待响应加载完成
self.predata = self.driver.wait_for_request(
"/international/search/api/search/batchSearch?.*", timeout=max_wait_time
)
rb = dict(json.loads(self.predata.body).get("flightSegments")[0])
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:获取数据超时,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 删除本次请求
del self.driver.requests
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_data:刷新页面')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.get_data()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:重新尝试加载页面,这次指定需要重定向到首页'
)
# 重置错误计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
# 删除本次请求
del self.driver.requests
# 检查数据获取正确性
if (
rb["departureCityName"] == self.city[0]
and rb["arrivalCityName"] == self.city[1]
and rb["departureDate"] == self.date
):
print(f"get_data:城市匹配成功:出发地-{self.city[0]},目的地-{self.city[1]}")
# 重置错误计数
self.err = 0
# 若无错误,执行下一步
self.decode_data()
else:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:刷新页面')
# 错误次数+1
self.err += 1
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 重新更换城市
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_data:重新更换城市:{rb["departureCityName"]}-{rb["arrivalCityName"]}-{rb["departureDate"]}'
)
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.change_city()
def decode_data(self):
try:
# 使用python-magic库检查MIME类型
mime = magic.Magic()
file_type = mime.from_buffer(self.predata.response.body)
buf = io.BytesIO(self.predata.response.body)
if "gzip" in file_type:
gf = gzip.GzipFile(fileobj=buf)
self.dedata = gf.read().decode("UTF-8")
elif "JSON data" in file_type:
print(buf.read().decode("UTF-8"))
else:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 未知的压缩格式:{file_type}')
self.dedata = json.loads(self.dedata)
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,decode_data:数据解码失败,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 删除本次请求
del self.driver.requests
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} decode_data:刷新页面')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.get_data()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,decode_data:重新尝试加载页面,这次指定需要重定向到首页'
)
# 重置错误计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
# 重置错误计数
self.err = 0
# 若无错误,执行下一步
self.check_data()
def check_data(self):
try:
self.flightItineraryList = self.dedata["data"]["flightItineraryList"]
# 倒序遍历,删除转机航班
for i in range(len(self.flightItineraryList) - 1, -1, -1):
if (
self.flightItineraryList[i]["flightSegments"][0]["transferCount"]
!= 0
):
self.flightItineraryList.pop(i)
if len(self.flightItineraryList) == 0 and direct_flight:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 不存在直航航班:{self.city[0]}-{self.city[1]}')
# 重置错误计数
self.err = 0
return 0
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 数据检查出错:不存在航班,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}'
)
print(self.dedata)
if self.err < max_retry_time:
if 'searchErrorInfo' in self.dedata["data"]:
# 重置错误计数
self.err = 0
return 0
else:
if "'needUserLogin': True" in str(self.dedata["data"]):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,check_data:必须要登录才能查看数据,这次指定需要重定向到首页'
)
# 重新尝试加载页面,这次指定需要重定向到首页
self.login()
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_data:刷新页面')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.get_data()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,check_data:重新尝试加载页面,这次指定需要重定向到首页'
)
# 重置错误计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
# 重置错误计数
self.err = 0
self.proc_flightSegments()
self.proc_priceList()
self.mergedata()
def proc_flightSegments(self):
self.flights = pd.DataFrame()
for flightlist in self.flightItineraryList:
flightlist = flightlist["flightSegments"][0]["flightList"]
flightUnitList = dict(flightlist[0])
departureday = flightUnitList["departureDateTime"].split(" ")[0]
departuretime = flightUnitList["departureDateTime"].split(" ")[1]
arrivalday = flightUnitList["arrivalDateTime"].split(" ")[0]
arrivaltime = flightUnitList["arrivalDateTime"].split(" ")[1]
if del_info:
# 删除一些不重要的信息
dellist = [
"sequenceNo",
"marketAirlineCode",
"departureProvinceId",
"departureCityId",
"departureCityCode",
"departureAirportShortName",
"departureTerminal",
"arrivalProvinceId",
"arrivalCityId",
"arrivalCityCode",
"arrivalAirportShortName",
"arrivalTerminal",
"transferDuration",
"stopList",
"leakedVisaTagSwitch",
"trafficType",
"highLightPlaneNo",
"mealType",
"operateAirlineCode",
"arrivalDateTime",
"departureDateTime",
"operateFlightNo",
"operateAirlineName",
]
for value in dellist:
try:
flightUnitList.pop(value)
except:
continue
# 更新日期格式
flightUnitList.update(
{
"departureday": departureday,
"departuretime": departuretime,
"arrivalday": arrivalday,
"arrivaltime": arrivaltime,
}
)
self.flights = pd.concat(
[
self.flights,
pd.DataFrame.from_dict(flightUnitList, orient="index").T,
],
ignore_index=True,
)
def proc_priceList(self):
self.prices = pd.DataFrame()
for flightlist in self.flightItineraryList:
flightNo = flightlist["itineraryId"].split("_")[0]
priceList = flightlist["priceList"]
# 经济舱,经济舱折扣
economy, economy_tax, economy_total, economy_full = [], [], [], []
economy_origin_price, economy_tax_price, economy_total_price, economy_full_price = "", "", "", ""
# 商务舱,商务舱折扣
bussiness, bussiness_tax, bussiness_total, bussiness_full = [], [], [], []
bussiness_origin_price, bussiness_tax_price, bussiness_total_price, bussiness_full_price = "", "", "", ""
for price in priceList:
adultPrice = price["adultPrice"]
adultTax = price["adultTax"]
miseryIndex = price["miseryIndex"]
cabin = price["cabin"]
# 经济舱
if cabin == "Y":
economy.append(adultPrice)
economy_tax.append(adultTax)
economy_full.append(miseryIndex)
economy_total.append(adultPrice+adultTax)
# 商务舱
elif cabin == "C":
bussiness.append(adultPrice)
bussiness_tax.append(adultTax)
bussiness_full.append(miseryIndex)
bussiness_total.append(adultPrice+adultTax)
# 初始化变量
economy_min_index = None
bussiness_min_index = None
if economy_total != []:
economy_total_price = min(economy_total)
economy_min_index = economy_total.index(economy_total_price)
if bussiness_total != []:
bussiness_total_price = min(bussiness_total)
bussiness_min_index = bussiness_total.index(bussiness_total_price)
if economy_min_index is not None:
economy_origin_price = economy[economy_min_index]
economy_tax_price = economy_tax[economy_min_index]
economy_full_price = economy_full[economy_min_index]
if bussiness_min_index is not None:
bussiness_origin_price = bussiness[bussiness_min_index]
bussiness_tax_price = bussiness_tax[bussiness_min_index]
bussiness_full_price = bussiness_full[bussiness_min_index]
price_info = {
"flightNo": flightNo,
"economy_origin": economy_origin_price,
"economy_tax": economy_tax_price,
"economy_total": economy_total_price,
"economy_full": economy_full_price,
"bussiness_origin": bussiness_origin_price,
"bussiness_tax": bussiness_tax_price,
"bussiness_total": bussiness_total_price,
"bussiness_full": bussiness_full_price,
}
# self.prices=self.prices.append(price_info,ignore_index=True)
self.prices = pd.concat(
[self.prices, pd.DataFrame(price_info, index=[0])], ignore_index=True
)
def mergedata(self):
try:
self.df = self.flights.merge(self.prices, on=["flightNo"])
self.df["dateGetTime"] = dt.now().strftime("%Y-%m-%d")
if rename_col:
# 对pandas的columns进行重命名
order = [
"数据获取日期",
"航班号",
"航空公司",
"出发日期",
"出发时间",
"到达日期",
"到达时间",
"飞行时长",
"出发国家",
"出发城市",
"出发机场",
"出发机场三字码",
"到达国家",
"到达城市",
"到达机场",
"到达机场三字码",
"飞机型号",
"飞机尺寸",
"飞机型号三字码",
"到达准点率",
"停留次数",
]
origin = [
"dateGetTime",
"flightNo",
"marketAirlineName",
"departureday",
"departuretime",
"arrivalday",
"arrivaltime",
"duration",
"departureCountryName",
"departureCityName",
"departureAirportName",
"departureAirportCode",
"arrivalCountryName",
"arrivalCityName",
"arrivalAirportName",
"arrivalAirportCode",
"aircraftName",
"aircraftSize",
"aircraftCode",
"arrivalPunctuality",
"stopCount",
]
columns = dict(zip(origin, order))
self.df = self.df.rename(columns=columns)
if del_info:
self.df = self.df[order]
files_dir = os.path.join(
os.getcwd(), self.date, dt.now().strftime("%Y-%m-%d")
)
if not os.path.exists(files_dir):
os.makedirs(files_dir)
filename = os.path.join(
files_dir, f"{self.city[0]}-{self.city[1]}.csv")
self.df.to_csv(filename, encoding="UTF-8", index=False)
print(f'\n{time.strftime("%Y-%m-%d_%H-%M-%S")} 数据爬取完成 {filename}\n')
return 0
except Exception as e:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 合并数据失败 {str(e).split("Stacktrace:")[0]}')
return 0
if __name__ == "__main__":
driver = init_driver()
citys = gen_citys(crawal_citys)
flight_dates = generate_flight_dates(crawal_days, begin_date, end_date, start_interval, days_interval)
Flight_DataFetcher = DataFetcher(driver)
for city in citys:
Flight_DataFetcher.city = city
for flight_date in flight_dates:
Flight_DataFetcher.date = flight_date
if os.path.exists(os.path.join(os.getcwd(), flight_date, dt.now().strftime("%Y-%m-%d"), f"{city[0]}-{city[1]}.csv")):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 文件已存在:{os.path.join(os.getcwd(), flight_date, dt.now().strftime("%Y-%m-%d"), f"{city[0]}-{city[1]}.csv")}')
continue
elif ('http' not in Flight_DataFetcher.driver.current_url):
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 当前的URL是:{driver.current_url}')
# 初始化页面
Flight_DataFetcher.get_page(1)
else:
# 后续运行只需更换出发与目的地
Flight_DataFetcher.change_city()
time.sleep(crawal_interval)
# 运行结束退出
try:
driver = Flight_DataFetcher.driver
driver.quit()
except Exception as e:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} An error occurred while quitting the driver: {e}')
print(f'\n{time.strftime("%Y-%m-%d_%H-%M-%S")} 程序运行完成!!!!')