You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
GenFlightRec/ctrip_flights_scraper_V3.py

1227 lines
52 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import magic
import io
import os
import gzip
import time
import json
import requests
import pandas as pd
from seleniumwire import webdriver
from datetime import datetime as dt, timedelta
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# 爬取的城市
crawal_citys = ["天津", "上海"]
# 爬取日期范围:起始日期。格式'2023-12-01'
begin_date = "2024-10-12"
# 爬取日期范围:结束日期。格式'2023-12-31'
end_date = "2024-10-15"
# 爬取T+N即N天后
start_interval = 1
# 爬取的日期
crawal_days = 60
# 设置各城市爬取的时间间隔(单位:秒)
crawal_interval = 5
# 日期间隔
days_interval = 1
# 设置页面加载的最长等待时间(单位:秒)
max_wait_time = 10
# 最大错误重试次数
max_retry_time = 5
# 是否只抓取直飞信息True: 只抓取直飞False: 抓取所有航班)
direct_flight = True
# 是否删除不重要的信息
del_info = False
# 是否重命DataFrame的列名
rename_col = True
# 调试截图
enable_screenshot = False
# 允许登录(可能必须要登录才能获取数据)
login_allowed = True
# 账号
accounts = ['','']
# 密码
passwords = ['','']
#利用stealth.min.js隐藏selenium特征
stealth_js_path='./stealth.min.js'
# 定义下载stealth.min.js的函数
def download_stealth_js(file_path, url='https://raw.githubusercontent.com/requireCool/stealth.min.js/main/stealth.min.js'):
if not os.path.exists(file_path):
print(f"{file_path} not found, downloading...")
response = requests.get(url)
response.raise_for_status() # 确保请求成功
with open(file_path, 'w') as file:
file.write(response.text)
print(f"{file_path} downloaded.")
else:
print(f"{file_path} already exists, no need to download.")
def init_driver():
# options = webdriver.ChromeOptions() # 创建一个配置对象
options = webdriver.EdgeOptions() # 创建一个配置对象
options.add_argument("--incognito") # 隐身模式(无痕模式)
# options.add_argument('--headless') # 启用无头模式
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-blink-features")
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_argument("--disable-extensions")
options.add_argument("--pageLoadStrategy=eager")
options.add_argument("--disable-gpu")
options.add_argument("--disable-software-rasterizer")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--ignore-certificate-errors-spki-list")
options.add_argument("--ignore-ssl-errors")
options.add_experimental_option("excludeSwitches", ["enable-automation"]) # 不显示正在受自动化软件控制的提示
# chromeDriverPath = 'C:/Program Files/Google/Chrome/Application/chromedriver' #chromedriver位置
# options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69")
# driver = webdriver.Chrome(executable_path=self.chromeDriverPath,chrome_options=self.options)
driver = webdriver.Edge(options=options)
try:
download_stealth_js(stealth_js_path)
# 读取并注入stealth.min.js
with open(stealth_js_path, 'r') as file:
stealth_js = file.read()
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": stealth_js})
except Exception as e:
print(e)
driver.maximize_window()
return driver
def gen_citys(crawal_citys):
# 生成城市组合列表
citys = []
ytic = list(reversed(crawal_citys))
for m in crawal_citys:
for n in ytic:
if m == n:
continue
else:
citys.append([m, n])
return citys
def generate_flight_dates(n, begin_date, end_date, start_interval, days_interval):
flight_dates = []
if begin_date:
begin_date = dt.strptime(begin_date, "%Y-%m-%d")
elif start_interval:
begin_date = dt.now() + timedelta(days=start_interval)
for i in range(0, n, days_interval):
flight_date = begin_date + timedelta(days=i)
flight_dates.append(flight_date.strftime("%Y-%m-%d"))
# 如果有结束日期,确保生成的日期不超过结束日期
if end_date:
end_date = dt.strptime(end_date, "%Y-%m-%d")
flight_dates = [date for date in flight_dates if dt.strptime(date, "%Y-%m-%d") <= end_date]
# 继续生成日期直到达到或超过结束日期
while dt.strptime(flight_dates[-1], "%Y-%m-%d") < end_date:
next_date = dt.strptime(flight_dates[-1], "%Y-%m-%d") + timedelta(days=days_interval)
if next_date <= end_date:
flight_dates.append(next_date.strftime("%Y-%m-%d"))
else:
break
return flight_dates
# element_to_be_clickable 函数来替代 expected_conditions.element_to_be_clickable 或 expected_conditions.visibility_of_element_located
def element_to_be_clickable(element):
def check_clickable(driver):
try:
if element.is_enabled() and element.is_displayed():
return element # 当条件满足时,返回元素本身
else:
return False
except:
return False
return check_clickable
class DataFetcher(object):
def __init__(self, driver):
self.driver = driver
self.date = None
self.city = None
self.err = 0 # 错误重试次数
self.switch_acc = 0 #切换账户
def refresh_driver(self):
try:
self.driver.refresh()
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} refresh_driver:刷新页面失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} refresh_driver刷新页面')
self.refresh_driver()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,refresh_driver:不继续重试'
)
def remove_btn(self):
try:
#WebDriverWait(self.driver, max_wait_time).until(lambda d: d.execute_script('return typeof jQuery !== "undefined"'))
# 移除提醒
self.driver.execute_script("document.querySelectorAll('.notice-box').forEach(element => element.remove());")
# 移除在线客服
self.driver.execute_script("document.querySelectorAll('.shortcut, .shortcut-link').forEach(element => element.remove());")
# 移除分享链接
self.driver.execute_script("document.querySelectorAll('.shareline').forEach(element => element.remove());")
'''
# 使用JavaScript删除所有的<dl>标签
self.driver.execute_script("""
var elements = document.getElementsByTagName('dl');
while(elements.length > 0){
elements[0].parentNode.removeChild(elements[0]);
}
""")
'''
except Exception as e:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} remove_btn:提醒移除失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
def check_verification_code(self):
try:
# 检查是否有验证码元素,如果有,则需要人工处理
if (len(self.driver.find_elements(By.ID, "verification-code"))+len(self.driver.find_elements(By.CLASS_NAME, "alert-title"))):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_verification_code验证码被触发verification-code/alert-title等待{crawal_interval*100}后重试。'
)
self.driver.quit()
time.sleep(crawal_interval*100)
self.driver = init_driver()
self.err = 0
self.switch_acc += 1
self.get_page(1)
return False
else:
# 移除注意事项
self.remove_btn()
# 如果没有找到验证码元素,则说明页面加载成功,没有触发验证码
return True
except Exception as e:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_verification_code:未知错误,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
def login(self):
if login_allowed:
account = accounts[self.switch_acc % len(accounts)]
password = passwords[self.switch_acc % len(passwords)]
try:
if len(self.driver.find_elements(By.CLASS_NAME, "lg_loginbox_modal")) == 0:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:未弹出登录界面')
WebDriverWait(self.driver, max_wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, "tl_nfes_home_header_login_wrapper_siwkn")))
# 点击飞机图标,返回主界面
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CLASS_NAME, "tl_nfes_home_header_login_wrapper_siwkn")))
ele.click()
#等待页面加
WebDriverWait(self.driver, max_wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, "lg_loginwrap")))
else:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:已经弹出登录界面')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_elements(By.CLASS_NAME, "r_input.bbz-js-iconable-input")[0]))
ele.send_keys(account)
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:输入账户成功')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CSS_SELECTOR, "div[data-testid='accountPanel'] input[data-testid='passwordInput']")))
ele.send_keys(password)
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:输入密码成功')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CSS_SELECTOR, '[for="checkboxAgreementInput"]')))
ele.click()
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:勾选同意成功')
ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_elements(By.CLASS_NAME, "form_btn.form_btn--block")[0]))
ele.click()
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login登录成功')
# 保存登录截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
time.sleep(crawal_interval*3)
except Exception as e:
# 错误次数+1
self.err += 1
# 用f字符串格式化错误类型和错误信息提供更多的调试信息
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login页面加载或元素操作失败错误类型{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login刷新页')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.login()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,login:重新尝试加载页面,这次指定需要重定向到首页'
)
def get_page(self, reset_to_homepage=0):
next_stage_flag = False
try:
if reset_to_homepage == 1:
# 前往首页
self.driver.get(
"https://flights.ctrip.com/online/channel/domestic")
# 检查注意事项和验证码
if self.check_verification_code():
WebDriverWait(self.driver, max_wait_time).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "pc_home-jipiao"))
)
# 点击飞机图标,返回主界面
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_element(
By.CLASS_NAME, "pc_home-jipiao")
)
)
ele.click()
# 单程
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "radio-label")[0]
)
)
ele.click()
# 搜索
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_element(By.CLASS_NAME, "search-btn")
)
)
ele.click()
next_stage_flag = True
except Exception as e:
# 用f字符串格式化错误类型和错误信息提供更多的调试信息
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_page页面加载或元素操作失败错误类型{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 重新尝试加载页面,这指定需要重向到首页
self.get_page(1)
else:
if next_stage_flag:
# 继续下一步
self.change_city()
def change_city(self):
next_stage_flag = False
try:
# 等待页面完成加载
WebDriverWait(self.driver, max_wait_time).until(
EC.presence_of_element_located(
(By.CLASS_NAME, "form-input-v3"))
)
# 检查注意事项和验证码
if self.check_verification_code():
# 若出发地与目标值不符,则更改出发地
while self.city[0] not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[0].get_attribute("value"):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.click()
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.send_keys(Keys.CONTROL + "a")
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.send_keys(self.city[0])
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city更换城市【0】-{self.driver.find_elements(By.CLASS_NAME,"form-input-v3")[0].get_attribute("value")}'
)
# 若目的地与目标值不符,则更改目的地
while self.city[1] not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[1].get_attribute("value"):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.click()
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.send_keys(Keys.CONTROL + "a")
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.send_keys(self.city[1])
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city更换城市【1】-{self.driver.find_elements(By.CLASS_NAME,"form-input-v3")[1].get_attribute("value")}'
)
while (
self.driver.find_elements(By.CSS_SELECTOR, "[aria-label=请选择日期]")[
0
].get_attribute("value")
!= self.date
):
# 点击日期选择
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_element(
By.CLASS_NAME, "modifyDate.depart-date"
)
)
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[1]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) < int(self.date[:4]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.next-ico.iconf-right",
)[1]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[1].find_element(By.CLASS_NAME, "year").text[:-1])}小于 {int(self.date[:4])} 向右点击'
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[0]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) > int(self.date[:4]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.prev-ico.iconf-left",
)[0]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[0].find_element(By.CLASS_NAME, "year").text[:-1])}大于 {int(self.date[:4])} 向左点击'
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[0]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) == int(self.date[:4]):
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[0]
.find_element(By.CLASS_NAME, "month")
.text[:-1]
) > int(self.date[5:7]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.prev-ico.iconf-left",
)[0]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[0].find_element(By.CLASS_NAME, "month").text[:-1])}大于 {int(self.date[5:7])} 左点击'
)
ele.click()
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[1]
.find_element(By.CLASS_NAME, "year")
.text[:-1]
) == int(self.date[:4]):
if int(
self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
)[1]
.find_element(By.CLASS_NAME, "month")
.text[:-1]
) < int(self.date[5:7]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME,
"in-date-picker.icon.next-ico.iconf-right",
)[1]
)
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[1].find_element(By.CLASS_NAME, "month").text[:-1])}小于 {int(self.date[5:7])} 向右点击'
)
ele.click()
for m in self.driver.find_elements(
By.CLASS_NAME, "date-picker.date-picker-block"
):
if int(m.find_element(By.CLASS_NAME, "year").text[:-1]) != int(
self.date[:4]
):
continue
if int(m.find_element(By.CLASS_NAME, "month").text[:-1]) != int(
self.date[5:7]
):
continue
for d in m.find_elements(By.CLASS_NAME, "date-d"):
if int(d.text) == int(self.date[-2:]):
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(d)
)
ele.click()
break
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city更换日期-{self.driver.find_elements(By.CSS_SELECTOR,"[aria-label=请选择日期]")[0].get_attribute("value")}'
)
while "(" not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[0].get_attribute("value"):
# Enter搜索
# ele=WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(its[1]))
# ele.send_keys(Keys.ENTER)
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[0]
)
)
ele.click()
# 通过低价提醒按钮实现enter键换页
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "low-price-remind"
)[0]
)
)
ele.click()
while "(" not in self.driver.find_elements(
By.CLASS_NAME, "form-input-v3"
)[1].get_attribute("value"):
# Enter搜索
# ele=WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(its[1]))
# ele.send_keys(Keys.ENTER)
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "form-input-v3")[1]
)
)
ele.click()
# 通过低价提醒按钮实现enter键换页
ele = WebDriverWait(self.driver, max_wait_time).until(
element_to_be_clickable(
self.driver.find_elements(
By.CLASS_NAME, "low-price-remind"
)[0]
)
)
ele.click()
next_stage_flag = True
except Exception as e:
# 错误次数+1
self.err += 1
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,change_city更换城市和日期失败错误类型{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}'
)
# 检查注意事项和验证码
if self.check_verification_code():
if self.err < max_retry_time:
if len(self.driver.find_elements(By.CLASS_NAME, "lg_loginbox_modal")):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city检测到登录弹窗需要登录'
)
self.login()
# 重试
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city重试')
self.change_city()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,change_city:重新尝试加载页面,这次指定需要重定向到首页'
)
# 删除本次请求
del self.driver.requests
# 置错计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
if next_stage_flag:
# 若无错误,执行下一步
self.get_data()
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city成功更换城市和日期当前路线为{self.city[0]}-{self.city[1]}')
def get_data(self):
try:
# 等待响应加载完成
self.predata = self.driver.wait_for_request(
"/international/search/api/search/batchSearch?.*", timeout=max_wait_time
)
rb = dict(json.loads(self.predata.body).get("flightSegments")[0])
# 捕获 getFlightComfort 数据
self.comfort_data = self.capture_flight_comfort_data()
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:获取数据超时,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 删除本次请求
del self.driver.requests
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_data刷新页面')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.get_data()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:重新尝试加载页面,这次指定需要重定向到首页'
)
# 重置错误计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
# 删除本次请求
del self.driver.requests
# 检查数据获取正确性
if (
rb["departureCityName"] == self.city[0]
and rb["arrivalCityName"] == self.city[1]
and rb["departureDate"] == self.date
):
print(f"get_data:城市匹配成功:出发地-{self.city[0]},目的地-{self.city[1]}")
# 重置错误计数
self.err = 0
# 若无错误,执行下一步
self.decode_data()
else:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:刷新页面')
# 错误次数+1
self.err += 1
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 重新更换城市
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_data重新更换城市:{rb["departureCityName"]}-{rb["arrivalCityName"]}-{rb["departureDate"]}'
)
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.change_city()
def decode_data(self):
try:
# 使用python-magic库检查MIME类型
mime = magic.Magic()
file_type = mime.from_buffer(self.predata.response.body)
buf = io.BytesIO(self.predata.response.body)
if "gzip" in file_type:
gf = gzip.GzipFile(fileobj=buf)
self.dedata = gf.read().decode("UTF-8")
elif "JSON data" in file_type:
print(buf.read().decode("UTF-8"))
else:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 未知的压缩格式:{file_type}')
self.dedata = json.loads(self.dedata)
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,decode_data:数据解码失败,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}'
)
# 保存错误截图
if enable_screenshot:
self.driver.save_screenshot(
f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png'
)
# 删除本次请求
del self.driver.requests
if self.err < max_retry_time:
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} decode_data刷新页面')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.get_data()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,decode_data:重新尝试加载页面,这次指定需要重定向到首页'
)
# 重置错误计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
# 重置错误计数
self.err = 0
# 若无错误,执行下一步
self.check_data()
def check_data(self):
try:
self.flightItineraryList = self.dedata["data"]["flightItineraryList"]
# 倒序遍历,删除转机航班
for i in range(len(self.flightItineraryList) - 1, -1, -1):
if (
self.flightItineraryList[i]["flightSegments"][0]["transferCount"]
!= 0
):
self.flightItineraryList.pop(i)
if len(self.flightItineraryList) == 0 and direct_flight:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 不存在直航航班:{self.city[0]}-{self.city[1]}')
# 重置错误计数
self.err = 0
return 0
except Exception as e:
# 错误次数+1
self.err += 1
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 数据检查出错:不存在航班,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}'
)
print(self.dedata)
if self.err < max_retry_time:
if 'searchErrorInfo' in self.dedata["data"]:
# 重置错误计数
self.err = 0
return 0
else:
if "'needUserLogin': True" in str(self.dedata["data"]):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,check_data:必须要登录才能查看数据,这次指定需要重定向到首页'
)
# 重新尝试加载页面,这次指定需要重定向到首页
self.login()
# 刷新页面
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_data刷新页面')
self.refresh_driver()
# 检查注意事项和验证码
if self.check_verification_code():
# 重试
self.get_data()
# 判断错误次数
if self.err >= max_retry_time:
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,check_data:重新尝试加载页面,这次指定需要重定向到首页'
)
# 重置错误计数
self.err = 0
# 重新尝试加载页面,这次指定需要重定向到首页
self.get_page(1)
else:
# 重置错误计数
self.err = 0
self.proc_flightSegments()
self.proc_priceList()
self.mergedata()
def proc_flightSegments(self):
self.flights = pd.DataFrame()
for flightlist in self.flightItineraryList:
flightlist = flightlist["flightSegments"][0]["flightList"]
flightUnitList = dict(flightlist[0])
departureday = flightUnitList["departureDateTime"].split(" ")[0]
departuretime = flightUnitList["departureDateTime"].split(" ")[1]
arrivalday = flightUnitList["arrivalDateTime"].split(" ")[0]
arrivaltime = flightUnitList["arrivalDateTime"].split(" ")[1]
if del_info:
# 删除一些不重要的信息
dellist = [
"sequenceNo",
"marketAirlineCode",
"departureProvinceId",
"departureCityId",
"departureCityCode",
"departureAirportShortName",
"departureTerminal",
"arrivalProvinceId",
"arrivalCityId",
"arrivalCityCode",
"arrivalAirportShortName",
"arrivalTerminal",
"transferDuration",
"stopList",
"leakedVisaTagSwitch",
"trafficType",
"highLightPlaneNo",
"mealType",
"operateAirlineCode",
"arrivalDateTime",
"departureDateTime",
"operateFlightNo",
"operateAirlineName",
]
for value in dellist:
try:
flightUnitList.pop(value)
except:
continue
# 更新日期格式
flightUnitList.update(
{
"departureday": departureday,
"departuretime": departuretime,
"arrivalday": arrivalday,
"arrivaltime": arrivaltime,
}
)
self.flights = pd.concat(
[
self.flights,
pd.DataFrame.from_dict(flightUnitList, orient="index").T,
],
ignore_index=True,
)
def proc_priceList(self):
self.prices = pd.DataFrame()
for flightlist in self.flightItineraryList:
flightNo = flightlist["itineraryId"].split("_")[0]
priceList = flightlist["priceList"]
# 经济舱,经济舱折扣
economy, economy_tax, economy_total, economy_full = [], [], [], []
economy_origin_price, economy_tax_price, economy_total_price, economy_full_price = "", "", "", ""
# 商务舱,商务舱折扣
bussiness, bussiness_tax, bussiness_total, bussiness_full = [], [], [], []
bussiness_origin_price, bussiness_tax_price, bussiness_total_price, bussiness_full_price = "", "", "", ""
for price in priceList:
# print("Price dictionary keys:", price.keys())
# print("Full price dictionary:", json.dumps(price, indent=2))
adultPrice = price["adultPrice"]
childPrice = price.get("childPrice", adultPrice) # 如果没有childPrice使用adultPrice
freeOilFeeAndTax = price["freeOilFeeAndTax"]
sortPrice = price["sortPrice"]
# 估算税费(如果需要的话)
estimatedTax = sortPrice - adultPrice if not freeOilFeeAndTax else 0
miseryIndex = price["miseryIndex"]
cabin = price["cabin"]
# 经济舱
if cabin == "Y":
economy.append(adultPrice)
economy_tax.append(estimatedTax)
economy_full.append(miseryIndex)
economy_total.append(adultPrice+estimatedTax)
# 商务舱
elif cabin == "C":
bussiness.append(adultPrice)
bussiness_tax.append(estimatedTax)
bussiness_full.append(miseryIndex)
bussiness_total.append(adultPrice+estimatedTax)
# 初始化变量
economy_min_index = None
bussiness_min_index = None
if economy_total != []:
economy_total_price = min(economy_total)
economy_min_index = economy_total.index(economy_total_price)
if bussiness_total != []:
bussiness_total_price = min(bussiness_total)
bussiness_min_index = bussiness_total.index(bussiness_total_price)
if economy_min_index is not None:
economy_origin_price = economy[economy_min_index]
economy_tax_price = economy_tax[economy_min_index]
economy_full_price = economy_full[economy_min_index]
if bussiness_min_index is not None:
bussiness_origin_price = bussiness[bussiness_min_index]
bussiness_tax_price = bussiness_tax[bussiness_min_index]
bussiness_full_price = bussiness_full[bussiness_min_index]
price_info = {
"flightNo": flightNo,
"economy_origin": economy_origin_price,
"economy_tax": economy_tax_price,
"economy_total": economy_total_price,
"economy_full": economy_full_price,
"bussiness_origin": bussiness_origin_price,
"bussiness_tax": bussiness_tax_price,
"bussiness_total": bussiness_total_price,
"bussiness_full": bussiness_full_price,
}
# self.prices=self.prices.append(price_info,ignore_index=True)
self.prices = pd.concat(
[self.prices, pd.DataFrame(price_info, index=[0])], ignore_index=True
)
def mergedata(self):
try:
self.df = self.flights.merge(self.prices, on=["flightNo"])
self.df["dateGetTime"] = dt.now().strftime("%Y-%m-%d")
if rename_col:
# 对pandas的columns进行重命名
order = [
"数据获取日期",
"航班号",
"航空公司",
"出发日期",
"出发时间",
"到达日期",
"到达时间",
"飞行时长",
"出发国家",
"出发城市",
"出发机场",
"出发机场三字码",
"到达国家",
"到达城市",
"到达机场",
"到达机场三字码",
"飞机型号",
"飞机尺寸",
"飞机型号三字码",
"到达准点率",
"停留次数",
]
origin = [
"dateGetTime",
"flightNo",
"marketAirlineName",
"departureday",
"departuretime",
"arrivalday",
"arrivaltime",
"duration",
"departureCountryName",
"departureCityName",
"departureAirportName",
"departureAirportCode",
"arrivalCountryName",
"arrivalCityName",
"arrivalAirportName",
"arrivalAirportCode",
"aircraftName",
"aircraftSize",
"aircraftCode",
"arrivalPunctuality",
"stopCount",
]
columns = dict(zip(origin, order))
self.df = self.df.rename(columns=columns)
if del_info:
self.df = self.df[order]
# 如果有 comfort_data将其添加到数据框中
if hasattr(self, 'comfort_data') and self.comfort_data:
comfort_df = pd.DataFrame(self.comfort_data)
self.df = pd.concat([self.df, comfort_df], axis=1)
files_dir = os.path.join(
os.getcwd(), self.date, dt.now().strftime("%Y-%m-%d")
)
if not os.path.exists(files_dir):
os.makedirs(files_dir)
filename = os.path.join(
files_dir, f"{self.city[0]}-{self.city[1]}.csv")
self.df.to_csv(filename, encoding="UTF-8", index=False)
print(f'\n{time.strftime("%Y-%m-%d_%H-%M-%S")} 数据爬取完成 {filename}\n')
return 0
except Exception as e:
print(f"合并数据失败 {str(e).split('Stacktrace:')[0]}")
return 0
def capture_flight_comfort_data(self):
try:
# 等待并捕获所有 getFlightComfort 请求
comfort_requests = self.driver.wait_for_request("/international/search/api/flight/comfort/getFlightComfort", timeout=max_wait_time)
comfort_data = []
for request in comfort_requests:
response = request.response
if response:
try:
json_data = json.loads(response.body.decode('utf-8'))
if json_data['status'] == 0 and json_data['msg'] == 'success':
flight_comfort = json_data['data']
# 提取准点率信息
punctuality = flight_comfort['punctualityInfo']
# 提取飞机信息
plane_info = flight_comfort['planeInfo']
# 提取舱位信息
cabin_info = {cabin['cabin']: cabin for cabin in flight_comfort['cabinInfoList']}
processed_data = {
'departure_delay_time': punctuality['departureDelaytime'],
'departure_bridge_rate': punctuality['departureBridge'],
'arrival_delay_time': punctuality['arrivalDelaytime'],
'arrival_bridge_rate': punctuality['arrivalBridge'],
'plane_type': plane_info['planeTypeName'],
'plane_width': plane_info['planeWidthCategory'],
'plane_age': plane_info['planeAge']
}
# 添加经济舱和商务舱信息
for cabin_type in ['Y', 'C']:
if cabin_type in cabin_info:
cabin = cabin_info[cabin_type]
processed_data.update({
f'{cabin_type}_has_meal': cabin['hasMeal'],
f'{cabin_type}_entertain_equipment': cabin['entertainEquipment'],
f'{cabin_type}_seat_tilt': cabin['seatTilt']['value'],
f'{cabin_type}_seat_width': cabin['seatWidth']['value'],
f'{cabin_type}_seat_pitch': cabin['seatPitch']['value'],
f'{cabin_type}_meal_msg': cabin['mealMsg']
})
if 'power' in cabin:
processed_data[f'{cabin_type}_power'] = cabin['power']
comfort_data.append(processed_data)
else:
print(f"getFlightComfort 响应状态异常: {json_data['status']}, {json_data['msg']}")
except json.JSONDecodeError:
print(f"无法解析 getFlightComfort 响应的 JSON 数据")
return comfort_data
except Exception as e:
print(f"捕获 getFlightComfort 数据时出错:{str(e)}")
return None
if __name__ == "__main__":
driver = init_driver()
citys = gen_citys(crawal_citys)
flight_dates = generate_flight_dates(crawal_days, begin_date, end_date, start_interval, days_interval)
Flight_DataFetcher = DataFetcher(driver)
for city in citys:
Flight_DataFetcher.city = city
for flight_date in flight_dates:
Flight_DataFetcher.date = flight_date
if os.path.exists(os.path.join(os.getcwd(), flight_date, dt.now().strftime("%Y-%m-%d"), f"{city[0]}-{city[1]}.csv")):
print(
f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 文件已存在:{os.path.join(os.getcwd(), flight_date, dt.now().strftime("%Y-%m-%d"), f"{city[0]}-{city[1]}.csv")}')
continue
elif ('http' not in Flight_DataFetcher.driver.current_url):
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 当前的URL是{driver.current_url}')
# 初始化页面
Flight_DataFetcher.get_page(1)
else:
# 后续运行只需更换出发与目的地
Flight_DataFetcher.change_city()
time.sleep(crawal_interval)
# 运行结束退出
try:
driver = Flight_DataFetcher.driver
driver.quit()
except Exception as e:
print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} An error occurred while quitting the driver: {e}')
print(f'\n{time.strftime("%Y-%m-%d_%H-%M-%S")} 程序运行完成!!!!')