import csv import random from DrissionPage import ChromiumOptions from DrissionPage import ChromiumPage import time path=r"C:\Program Files\Google\Chrome\Application\chrome.exe" ChromiumOptions().set_browser_path(path).save() class DataRequest: def __init__(self,keyword): #创建文件对象 self.success='False' self.resp = None self.data_csv_file_path= 'data.csv' self.notes_csv_file_path= 'notes.csv' #初始化 清空文档 self.clear_csv_file() self.f=open('data.csv', mode='a+', encoding='utf-8', newline='') self.csv_writer=csv.DictWriter(self.f,fieldnames=['文章ID','文章标题','喜欢数量','图片资源']) # self.keyword=keyword self.url='https://www.xiaohongshu.com/search_result?keyword='+self.keyword+'&type=51' #数据 self.items=[] #打开浏览器 self.drive=ChromiumPage() self.drive.set.scroll.smooth(on_off=True) self.drive.set.scroll.wait_complete(on_off=True) # 监听数据报 self.drive.listen.start('api/sns/web/v1/search/notes') #访问网站 self.drive.get(self.url) self.dataRequest() def clear_csv_file(self): with open(self.data_csv_file_path, 'w') as file1: # 使用'w'模式打开文件 pass # 不需要执行任何操作,因为打开文件时内容已被清空 with open(self.notes_csv_file_path,'w') as file2: pass # 不需要执行任何操作,因为打开文件时内容已被清空 def dataRequest(self): #滚动到底部刷新数据包 self.drive.scroll.to_bottom() #等待数据包加载 self.resp=self.drive.listen.wait() #获取数据包返回的数据 json_data=self.resp.response.body #分析数据放入容器 for item in json_data['data']['items']: if '-' not in item['id']: if 'display_title' in item['note_card']: self.success = 'True' time_Images=[] for image_list in item['note_card']['image_list']: time_Images.append(image_list['info_list'][0]['url']) dic={ '文章ID':item['id'], '文章标题':item['note_card']['display_title'], '喜欢数量':item['note_card']['interact_info']['liked_count'], '图片资源':time_Images } self.items.append(dic) else: self.success = 'False' break if self.success == 'True': self.sort() self.check() def is_in_csv(self,data_to_check): with open(self.data_csv_file_path,'r',encoding='utf-8') as f: csv_reader=csv.reader(f) for row in csv_reader: if data_to_check in row: return True return False def sort(self): #按照喜欢人数排序 self.items.sort(key=lambda item:int(item['喜欢数量']),reverse=True) for i in self.items: if self.is_in_csv(i['文章ID']): continue else: self.csv_writer.writerow(i) def close(self): self.drive.listen.stop() self.f.close() def check(self): with open('data.csv', mode='r', encoding='utf-8') as file: for line in file: if line==None: print('数据获取失败') break else: print('数据获取成功') break # print('[1]:',line.split(',')[1]) # break if __name__=='__main__': dataRequest=DataRequest('厦门旅游攻略') for i in range(1): dataRequest.dataRequest() time.sleep(random.randint(3,4))