import magic import io import os import gzip import time import json import requests import pandas as pd from typing import Any import seleniumwire.undetected_chromedriver as webdriver from datetime import datetime as dt, timedelta from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait # 爬取的城市 crawal_citys = ["上海", "香港", "东京"] # 爬取日期范围:起始日期。格式'2023-12-01' begin_date = None # 爬取日期范围:结束日期。格式'2023-12-31' end_date = None # 爬取T+N,即N天后 start_interval = 1 # 爬取的日期 crawal_days = 60 # 设置各城市爬取的时间间隔(单位:秒) crawal_interval = 5 # 日期间隔 days_interval = 1 # 设置页面加载的最长等待时间(单位:秒) max_wait_time = 10 # 最大错误重试次数 max_retry_time = 5 # 是否只抓取直飞信息(True: 只抓取直飞,False: 抓取所有航班) direct_flight = True # 是否删除不重要的信息 del_info = False # 是否重命名DataFrame的列名 rename_col = True # 调试截图 enable_screenshot = False # 允许登录(可能必须要登录才能获取数据) login_allowed = True # 账号 accounts = ['',''] # 密码 passwords = ['',''] #利用stealth.min.js隐藏selenium特征 stealth_js_path='./stealth.min.js' # 定义下载stealth.min.js的函数 def download_stealth_js(file_path, url='https://raw.githubusercontent.com/requireCool/stealth.min.js/main/stealth.min.js'): if not os.path.exists(file_path): print(f"{file_path} not found, downloading...") response = requests.get(url) response.raise_for_status() # 确保请求成功 with open(file_path, 'w') as file: file.write(response.text) print(f"{file_path} downloaded.") else: print(f"{file_path} already exists, no need to download.") def init_driver(): # options = webdriver.ChromeOptions() # 创建一个配置对象 options = webdriver.ChromeOptions() # 创建一个配置对象 options.add_argument("--incognito") # 隐身模式(无痕模式) # options.add_argument('--headless') # 启用无头模式 options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-blink-features") options.add_argument("--disable-blink-features=AutomationControlled") options.add_argument("--disable-extensions") options.add_argument("--pageLoadStrategy=eager") options.add_argument("--disable-gpu") options.add_argument("--disable-software-rasterizer") options.add_argument("--disable-dev-shm-usage") options.add_argument("--ignore-certificate-errors") options.add_argument("--ignore-certificate-errors-spki-list") options.add_argument("--ignore-ssl-errors") # options.add_experimental_option("excludeSwitches", ["enable-automation"]) # 不显示正在受自动化软件控制的提示 seleniumwireOptions: dict[str, Any] = {"verify_ssl": False} # chromeDriverPath = 'C:/Program Files/Google/Chrome/Application/chromedriver' #chromedriver位置 # options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36 Edg/116.0.1938.69") # driver = webdriver.Chrome(executable_path=self.chromeDriverPath,chrome_options=self.options) driver = webdriver.Chrome(options=options,seleniumwire_options=seleniumwireOptions) try: download_stealth_js(stealth_js_path) # 读取并注入stealth.min.js with open(stealth_js_path, 'r') as file: stealth_js = file.read() driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": stealth_js}) except Exception as e: print(e) driver.maximize_window() return driver def gen_citys(crawal_citys): # 生成城市组合列表 citys = [] ytic = list(reversed(crawal_citys)) for m in crawal_citys: for n in ytic: if m == n: continue else: citys.append([m, n]) return citys def generate_flight_dates(n, begin_date, end_date, start_interval, days_interval): flight_dates = [] if begin_date: begin_date = dt.strptime(begin_date, "%Y-%m-%d") elif start_interval: begin_date = dt.now() + timedelta(days=start_interval) for i in range(0, n, days_interval): flight_date = begin_date + timedelta(days=i) flight_dates.append(flight_date.strftime("%Y-%m-%d")) # 如果有结束日期,确保生成的日期不超过结束日期 if end_date: end_date = dt.strptime(end_date, "%Y-%m-%d") flight_dates = [date for date in flight_dates if dt.strptime(date, "%Y-%m-%d") <= end_date] # 继续生成日期直到达到或超过结束日期 while dt.strptime(flight_dates[-1], "%Y-%m-%d") < end_date: next_date = dt.strptime(flight_dates[-1], "%Y-%m-%d") + timedelta(days=days_interval) if next_date <= end_date: flight_dates.append(next_date.strftime("%Y-%m-%d")) else: break return flight_dates # element_to_be_clickable 函数来替代 expected_conditions.element_to_be_clickable 或 expected_conditions.visibility_of_element_located def element_to_be_clickable(element): def check_clickable(driver): try: if element.is_enabled() and element.is_displayed(): return element # 当条件满足时,返回元素本身 else: return False except: return False return check_clickable class DataFetcher(object): def __init__(self, driver): self.driver = driver self.date = None self.city = None self.err = 0 # 错误重试次数 self.switch_acc = 0 #切换账户 def refresh_driver(self): try: self.driver.refresh() except Exception as e: # 错误次数+1 self.err += 1 print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} refresh_driver:刷新页面失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}' ) # 保存错误截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) if self.err < max_retry_time: # 刷新页面 print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} refresh_driver:刷新页面') self.refresh_driver() # 判断错误次数 if self.err >= max_retry_time: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,refresh_driver:不继续重试' ) def remove_btn(self): try: #WebDriverWait(self.driver, max_wait_time).until(lambda d: d.execute_script('return typeof jQuery !== "undefined"')) # 移除提醒 self.driver.execute_script("document.querySelectorAll('.notice-box').forEach(element => element.remove());") # 移除在线客服 self.driver.execute_script("document.querySelectorAll('.shortcut, .shortcut-link').forEach(element => element.remove());") # 移除分享链接 self.driver.execute_script("document.querySelectorAll('.shareline').forEach(element => element.remove());") ''' # 使用JavaScript删除所有的
标签 self.driver.execute_script(""" var elements = document.getElementsByTagName('dl'); while(elements.length > 0){ elements[0].parentNode.removeChild(elements[0]); } """) ''' except Exception as e: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} remove_btn:提醒移除失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}' ) def check_verification_code(self): try: # 检查是否有验证码元素,如果有,则需要人工处理 if (len(self.driver.find_elements(By.ID, "verification-code"))+len(self.driver.find_elements(By.CLASS_NAME, "alert-title"))): print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_verification_code:验证码被触发verification-code/alert-title,等待{crawal_interval*100}后重试。' ) self.driver.quit() time.sleep(crawal_interval*100) self.driver = init_driver() self.err = 0 self.switch_acc += 1 self.get_page(1) return False else: # 移除注意事项 self.remove_btn() # 如果没有找到验证码元素,则说明页面加载成功,没有触发验证码 return True except Exception as e: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_verification_code:未知错误,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}' ) def login(self): if login_allowed: account = accounts[self.switch_acc % len(accounts)] password = passwords[self.switch_acc % len(passwords)] try: if len(self.driver.find_elements(By.CLASS_NAME, "lg_loginbox_modal")) == 0: print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:未弹出登录界面') WebDriverWait(self.driver, max_wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, "tl_nfes_home_header_login_wrapper_siwkn"))) # 点击飞机图标,返回主界面 ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CLASS_NAME, "tl_nfes_home_header_login_wrapper_siwkn"))) ele.click() #等待页面加载 WebDriverWait(self.driver, max_wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, "lg_loginwrap"))) else: print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:已经弹出登录界面') ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_elements(By.CLASS_NAME, "r_input.bbz-js-iconable-input")[0])) ele.send_keys(account) print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:输入账户成功') ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CSS_SELECTOR, "div[data-testid='accountPanel'] input[data-testid='passwordInput']"))) ele.send_keys(password) print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:输入密码成功') ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_element(By.CSS_SELECTOR, '[for="checkboxAgreementInput"]'))) ele.click() print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:勾选同意成功') ele = WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(self.driver.find_elements(By.CLASS_NAME, "form_btn.form_btn--block")[0])) ele.click() print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:登录成功') # 保存登录截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) time.sleep(crawal_interval*3) except Exception as e: # 错误次数+1 self.err += 1 # 用f字符串格式化错误类型和错误信息,提供更多的调试信息 print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:页面加载或元素操作失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}' ) # 保存错误截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) if self.err < max_retry_time: # 刷新页面 print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} login:刷新页面') self.refresh_driver() # 检查注意事项和验证码 if self.check_verification_code(): # 重试 self.login() # 判断错误次数 if self.err >= max_retry_time: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,login:重新尝试加载页面,这次指定需要重定向到首页' ) def get_page(self, reset_to_homepage=0): next_stage_flag = False try: if reset_to_homepage == 1: # 前往首页 self.driver.get( "https://flights.ctrip.com/online/channel/domestic") # 检查注意事项和验证码 if self.check_verification_code(): WebDriverWait(self.driver, max_wait_time).until( EC.presence_of_element_located( (By.CLASS_NAME, "pc_home-jipiao")) ) # 点击飞机图标,返回主界面 ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_element( By.CLASS_NAME, "pc_home-jipiao") ) ) ele.click() # 单程 ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "radio-label")[0] ) ) ele.click() # 搜索 ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_element(By.CLASS_NAME, "search-btn") ) ) ele.click() next_stage_flag = True except Exception as e: # 用f字符串格式化错误类型和错误信息,提供更多的调试信息 print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_page:页面加载或元素操作失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}' ) # 保存错误截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) # 重新尝试加载页面,这次指定需要重定向到首页 self.get_page(1) else: if next_stage_flag: # 继续下一步 self.change_city() def change_city(self): next_stage_flag = False try: # 等待页面完成加载 WebDriverWait(self.driver, max_wait_time).until( EC.presence_of_element_located( (By.CLASS_NAME, "form-input-v3")) ) # 检查注意事项和验证码 if self.check_verification_code(): # 若出发地与目标值不符,则更改出发地 while self.city[0] not in self.driver.find_elements( By.CLASS_NAME, "form-input-v3" )[0].get_attribute("value"): ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[0] ) ) ele.click() ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[0] ) ) ele.send_keys(Keys.CONTROL + "a") ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[0] ) ) ele.send_keys(self.city[0]) print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换城市【0】-{self.driver.find_elements(By.CLASS_NAME,"form-input-v3")[0].get_attribute("value")}' ) # 若目的地与目标值不符,则更改目的地 while self.city[1] not in self.driver.find_elements( By.CLASS_NAME, "form-input-v3" )[1].get_attribute("value"): ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[1] ) ) ele.click() ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[1] ) ) ele.send_keys(Keys.CONTROL + "a") ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[1] ) ) ele.send_keys(self.city[1]) print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换城市【1】-{self.driver.find_elements(By.CLASS_NAME,"form-input-v3")[1].get_attribute("value")}' ) while ( self.driver.find_elements(By.CSS_SELECTOR, "[aria-label=请选择日期]")[ 0 ].get_attribute("value") != self.date ): # 点击日期选择 ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_element( By.CLASS_NAME, "modifyDate.depart-date" ) ) ) ele.click() if int( self.driver.find_elements( By.CLASS_NAME, "date-picker.date-picker-block" )[1] .find_element(By.CLASS_NAME, "year") .text[:-1] ) < int(self.date[:4]): ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "in-date-picker.icon.next-ico.iconf-right", )[1] ) ) print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[1].find_element(By.CLASS_NAME, "year").text[:-1])}小于 {int(self.date[:4])} 向右点击' ) ele.click() if int( self.driver.find_elements( By.CLASS_NAME, "date-picker.date-picker-block" )[0] .find_element(By.CLASS_NAME, "year") .text[:-1] ) > int(self.date[:4]): ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "in-date-picker.icon.prev-ico.iconf-left", )[0] ) ) print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[0].find_element(By.CLASS_NAME, "year").text[:-1])}大于 {int(self.date[:4])} 向左点击' ) ele.click() if int( self.driver.find_elements( By.CLASS_NAME, "date-picker.date-picker-block" )[0] .find_element(By.CLASS_NAME, "year") .text[:-1] ) == int(self.date[:4]): if int( self.driver.find_elements( By.CLASS_NAME, "date-picker.date-picker-block" )[0] .find_element(By.CLASS_NAME, "month") .text[:-1] ) > int(self.date[5:7]): ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "in-date-picker.icon.prev-ico.iconf-left", )[0] ) ) print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[0].find_element(By.CLASS_NAME, "month").text[:-1])}大于 {int(self.date[5:7])} 向左点击' ) ele.click() if int( self.driver.find_elements( By.CLASS_NAME, "date-picker.date-picker-block" )[1] .find_element(By.CLASS_NAME, "year") .text[:-1] ) == int(self.date[:4]): if int( self.driver.find_elements( By.CLASS_NAME, "date-picker.date-picker-block" )[1] .find_element(By.CLASS_NAME, "month") .text[:-1] ) < int(self.date[5:7]): ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "in-date-picker.icon.next-ico.iconf-right", )[1] ) ) print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期{int(self.driver.find_elements(By.CLASS_NAME, "date-picker.date-picker-block")[1].find_element(By.CLASS_NAME, "month").text[:-1])}小于 {int(self.date[5:7])} 向右点击' ) ele.click() for m in self.driver.find_elements( By.CLASS_NAME, "date-picker.date-picker-block" ): if int(m.find_element(By.CLASS_NAME, "year").text[:-1]) != int( self.date[:4] ): continue if int(m.find_element(By.CLASS_NAME, "month").text[:-1]) != int( self.date[5:7] ): continue for d in m.find_elements(By.CLASS_NAME, "date-d"): if int(d.text) == int(self.date[-2:]): ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable(d) ) ele.click() break print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:更换日期-{self.driver.find_elements(By.CSS_SELECTOR,"[aria-label=请选择日期]")[0].get_attribute("value")}' ) while "(" not in self.driver.find_elements( By.CLASS_NAME, "form-input-v3" )[0].get_attribute("value"): # Enter搜索 # ele=WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(its[1])) # ele.send_keys(Keys.ENTER) ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[0] ) ) ele.click() # 通过低价提醒按钮实现enter键换页 ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "low-price-remind" )[0] ) ) ele.click() while "(" not in self.driver.find_elements( By.CLASS_NAME, "form-input-v3" )[1].get_attribute("value"): # Enter搜索 # ele=WebDriverWait(self.driver, max_wait_time).until(element_to_be_clickable(its[1])) # ele.send_keys(Keys.ENTER) ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "form-input-v3")[1] ) ) ele.click() # 通过低价提醒按钮实现enter键换页 ele = WebDriverWait(self.driver, max_wait_time).until( element_to_be_clickable( self.driver.find_elements( By.CLASS_NAME, "low-price-remind" )[0] ) ) ele.click() next_stage_flag = True except Exception as e: # 错误次数+1 self.err += 1 # 保存错误截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,change_city:更换城市和日期失败,错误类型:{type(e).__name__}, 详细错误信息:{str(e).split("Stacktrace:")[0]}' ) # 检查注意事项和验证码 if self.check_verification_code(): if self.err < max_retry_time: if len(self.driver.find_elements(By.CLASS_NAME, "lg_loginbox_modal")): print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:检测到登录弹窗,需要登录' ) self.login() # 重试 print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:重试') self.change_city() # 判断错误次数 if self.err >= max_retry_time: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,change_city:重新尝试加载页面,这次指定需要重定向到首页' ) # 删除本次请求 del self.driver.requests # 重置错误计数 self.err = 0 # 重新尝试加载页面,这次指定需要重定向到首页 self.get_page(1) else: if next_stage_flag: # 若无错误,执行下一步 self.get_data() print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} change_city:成功更换城市和日期,当前路线为:{self.city[0]}-{self.city[1]}') def get_data(self): try: # 等待响应加载完成 self.predata = self.driver.wait_for_request( "/international/search/api/search/batchSearch?.*", timeout=max_wait_time ) rb = dict(json.loads(self.predata.body).get("flightSegments")[0]) except Exception as e: # 错误次数+1 self.err += 1 print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:获取数据超时,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}' ) # 保存错误截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) # 删除本次请求 del self.driver.requests if self.err < max_retry_time: # 刷新页面 print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_data:刷新页面') self.refresh_driver() # 检查注意事项和验证码 if self.check_verification_code(): # 重试 self.get_data() # 判断错误次数 if self.err >= max_retry_time: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:重新尝试加载页面,这次指定需要重定向到首页' ) # 重置错误计数 self.err = 0 # 重新尝试加载页面,这次指定需要重定向到首页 self.get_page(1) else: # 删除本次请求 del self.driver.requests # 检查数据获取正确性 if ( rb["departureCityName"] == self.city[0] and rb["arrivalCityName"] == self.city[1] and rb["departureDate"] == self.date ): print(f"get_data:城市匹配成功:出发地-{self.city[0]},目的地-{self.city[1]}") # 重置错误计数 self.err = 0 # 若无错误,执行下一步 self.decode_data() else: print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,get_data:刷新页面') # 错误次数+1 self.err += 1 # 保存错误截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) # 重新更换城市 print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} get_data:重新更换城市:{rb["departureCityName"]}-{rb["arrivalCityName"]}-{rb["departureDate"]}' ) # 检查注意事项和验证码 if self.check_verification_code(): # 重试 self.change_city() def decode_data(self): try: # 使用python-magic库检查MIME类型 mime = magic.Magic() file_type = mime.from_buffer(self.predata.response.body) buf = io.BytesIO(self.predata.response.body) if "gzip" in file_type: gf = gzip.GzipFile(fileobj=buf) self.dedata = gf.read().decode("UTF-8") elif "JSON data" in file_type: print(buf.read().decode("UTF-8")) else: print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 未知的压缩格式:{file_type}') self.dedata = json.loads(self.dedata) except Exception as e: # 错误次数+1 self.err += 1 print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,decode_data:数据解码失败,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}' ) # 保存错误截图 if enable_screenshot: self.driver.save_screenshot( f'screenshot/screenshot_{time.strftime("%Y-%m-%d_%H-%M-%S")}.png' ) # 删除本次请求 del self.driver.requests if self.err < max_retry_time: # 刷新页面 print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} decode_data:刷新页面') self.refresh_driver() # 检查注意事项和验证码 if self.check_verification_code(): # 重试 self.get_data() # 判断错误次数 if self.err >= max_retry_time: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,decode_data:重新尝试加载页面,这次指定需要重定向到首页' ) # 重置错误计数 self.err = 0 # 重新尝试加载页面,这次指定需要重定向到首页 self.get_page(1) else: # 重置错误计数 self.err = 0 # 若无错误,执行下一步 self.check_data() def check_data(self): try: self.flightItineraryList = self.dedata["data"]["flightItineraryList"] # 倒序遍历,删除转机航班 for i in range(len(self.flightItineraryList) - 1, -1, -1): if ( self.flightItineraryList[i]["flightSegments"][0]["transferCount"] != 0 ): self.flightItineraryList.pop(i) if len(self.flightItineraryList) == 0 and direct_flight: print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 不存在直航航班:{self.city[0]}-{self.city[1]}') # 重置错误计数 self.err = 0 return 0 except Exception as e: # 错误次数+1 self.err += 1 print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 数据检查出错:不存在航班,错误类型:{type(e).__name__}, 错误详细:{str(e).split("Stacktrace:")[0]}' ) print(self.dedata) if self.err < max_retry_time: if 'searchErrorInfo' in self.dedata["data"]: # 重置错误计数 self.err = 0 return 0 else: if "'needUserLogin': True" in str(self.dedata["data"]): print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,check_data:必须要登录才能查看数据,这次指定需要重定向到首页' ) # 重新尝试加载页面,这次指定需要重定向到首页 self.login() # 刷新页面 print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} check_data:刷新页面') self.refresh_driver() # 检查注意事项和验证码 if self.check_verification_code(): # 重试 self.get_data() # 判断错误次数 if self.err >= max_retry_time: print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 错误次数【{self.err}-{max_retry_time}】,check_data:重新尝试加载页面,这次指定需要重定向到首页' ) # 重置错误计数 self.err = 0 # 重新尝试加载页面,这次指定需要重定向到首页 self.get_page(1) else: # 重置错误计数 self.err = 0 self.proc_flightSegments() self.proc_priceList() self.mergedata() def proc_flightSegments(self): self.flights = pd.DataFrame() for flightlist in self.flightItineraryList: flightlist = flightlist["flightSegments"][0]["flightList"] flightUnitList = dict(flightlist[0]) departureday = flightUnitList["departureDateTime"].split(" ")[0] departuretime = flightUnitList["departureDateTime"].split(" ")[1] arrivalday = flightUnitList["arrivalDateTime"].split(" ")[0] arrivaltime = flightUnitList["arrivalDateTime"].split(" ")[1] if del_info: # 删除一些不重要的信息 dellist = [ "sequenceNo", "marketAirlineCode", "departureProvinceId", "departureCityId", "departureCityCode", "departureAirportShortName", "departureTerminal", "arrivalProvinceId", "arrivalCityId", "arrivalCityCode", "arrivalAirportShortName", "arrivalTerminal", "transferDuration", "stopList", "leakedVisaTagSwitch", "trafficType", "highLightPlaneNo", "mealType", "operateAirlineCode", "arrivalDateTime", "departureDateTime", "operateFlightNo", "operateAirlineName", ] for value in dellist: try: flightUnitList.pop(value) except: continue # 更新日期格式 flightUnitList.update( { "departureday": departureday, "departuretime": departuretime, "arrivalday": arrivalday, "arrivaltime": arrivaltime, } ) self.flights = pd.concat( [ self.flights, pd.DataFrame.from_dict(flightUnitList, orient="index").T, ], ignore_index=True, ) def proc_priceList(self): self.prices = pd.DataFrame() for flightlist in self.flightItineraryList: flightNo = flightlist["itineraryId"].split("_")[0] priceList = flightlist["priceList"] # 经济舱,经济舱折扣 economy, economy_tax, economy_total, economy_full = [], [], [], [] economy_origin_price, economy_tax_price, economy_total_price, economy_full_price = "", "", "", "" # 商务舱,商务舱折扣 bussiness, bussiness_tax, bussiness_total, bussiness_full = [], [], [], [] bussiness_origin_price, bussiness_tax_price, bussiness_total_price, bussiness_full_price = "", "", "", "" for price in priceList: adultPrice = price["adultPrice"] adultTax = price["adultTax"] miseryIndex = price["miseryIndex"] cabin = price["cabin"] # 经济舱 if cabin == "Y": economy.append(adultPrice) economy_tax.append(adultTax) economy_full.append(miseryIndex) economy_total.append(adultPrice+adultTax) # 商务舱 elif cabin == "C": bussiness.append(adultPrice) bussiness_tax.append(adultTax) bussiness_full.append(miseryIndex) bussiness_total.append(adultPrice+adultTax) # 初始化变量 economy_min_index = None bussiness_min_index = None if economy_total != []: economy_total_price = min(economy_total) economy_min_index = economy_total.index(economy_total_price) if bussiness_total != []: bussiness_total_price = min(bussiness_total) bussiness_min_index = bussiness_total.index(bussiness_total_price) if economy_min_index is not None: economy_origin_price = economy[economy_min_index] economy_tax_price = economy_tax[economy_min_index] economy_full_price = economy_full[economy_min_index] if bussiness_min_index is not None: bussiness_origin_price = bussiness[bussiness_min_index] bussiness_tax_price = bussiness_tax[bussiness_min_index] bussiness_full_price = bussiness_full[bussiness_min_index] price_info = { "flightNo": flightNo, "economy_origin": economy_origin_price, "economy_tax": economy_tax_price, "economy_total": economy_total_price, "economy_full": economy_full_price, "bussiness_origin": bussiness_origin_price, "bussiness_tax": bussiness_tax_price, "bussiness_total": bussiness_total_price, "bussiness_full": bussiness_full_price, } # self.prices=self.prices.append(price_info,ignore_index=True) self.prices = pd.concat( [self.prices, pd.DataFrame(price_info, index=[0])], ignore_index=True ) def mergedata(self): try: self.df = self.flights.merge(self.prices, on=["flightNo"]) self.df["dateGetTime"] = dt.now().strftime("%Y-%m-%d") if rename_col: # 对pandas的columns进行重命名 order = [ "数据获取日期", "航班号", "航空公司", "出发日期", "出发时间", "到达日期", "到达时间", "飞行时长", "出发国家", "出发城市", "出发机场", "出发机场三字码", "到达国家", "到达城市", "到达机场", "到达机场三字码", "飞机型号", "飞机尺寸", "飞机型号三字码", "到达准点率", "停留次数", ] origin = [ "dateGetTime", "flightNo", "marketAirlineName", "departureday", "departuretime", "arrivalday", "arrivaltime", "duration", "departureCountryName", "departureCityName", "departureAirportName", "departureAirportCode", "arrivalCountryName", "arrivalCityName", "arrivalAirportName", "arrivalAirportCode", "aircraftName", "aircraftSize", "aircraftCode", "arrivalPunctuality", "stopCount", ] columns = dict(zip(origin, order)) self.df = self.df.rename(columns=columns) if del_info: self.df = self.df[order] files_dir = os.path.join( os.getcwd(), self.date, dt.now().strftime("%Y-%m-%d") ) if not os.path.exists(files_dir): os.makedirs(files_dir) filename = os.path.join( files_dir, f"{self.city[0]}-{self.city[1]}.csv") self.df.to_csv(filename, encoding="UTF-8", index=False) print(f'\n{time.strftime("%Y-%m-%d_%H-%M-%S")} 数据爬取完成 {filename}\n') return 0 except Exception as e: print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 合并数据失败 {str(e).split("Stacktrace:")[0]}') return 0 if __name__ == "__main__": driver = init_driver() citys = gen_citys(crawal_citys) flight_dates = generate_flight_dates(crawal_days, begin_date, end_date, start_interval, days_interval) Flight_DataFetcher = DataFetcher(driver) for city in citys: Flight_DataFetcher.city = city for flight_date in flight_dates: Flight_DataFetcher.date = flight_date if os.path.exists(os.path.join(os.getcwd(), flight_date, dt.now().strftime("%Y-%m-%d"), f"{city[0]}-{city[1]}.csv")): print( f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 文件已存在:{os.path.join(os.getcwd(), flight_date, dt.now().strftime("%Y-%m-%d"), f"{city[0]}-{city[1]}.csv")}') continue elif ('http' not in Flight_DataFetcher.driver.current_url): print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} 当前的URL是:{driver.current_url}') # 初始化页面 Flight_DataFetcher.get_page(1) else: # 后续运行只需更换出发与目的地 Flight_DataFetcher.change_city() time.sleep(crawal_interval) # 运行结束退出 try: driver = Flight_DataFetcher.driver driver.quit() except Exception as e: print(f'{time.strftime("%Y-%m-%d_%H-%M-%S")} An error occurred while quitting the driver: {e}') print(f'\n{time.strftime("%Y-%m-%d_%H-%M-%S")} 程序运行完成!!!!')