You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
6.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import re # 导入正则表达式模块
import os # 导入操作系统相关模块
import joblib # 导入用于序列化和反序列化Python对象的模块
import asyncio # 导入异步I/O模块
import aiohttp # 异步HTTP客户端/服务器框架
import requests as rq # 导入用于发送HTTP请求的模块起别名rq
import pandas as pd # 导入用于数据分析的库起别名pd
class getTopSecCom:
def __init__(self, top=None):
# 设置请求头信息
self.headers = {"Referer": "http://quote.eastmoney.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"}
# 定义获取板块股票列表的URL
self.bk_url = "http://71.push2.eastmoney.com/api/qt/clist/get?cb=jQuery1124034348162124675374_1612595298605&pn=1&pz=85&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&fid=f62&fs=b:BK0655&fields=f12,f14&_=1612595298611"
# 定义雪球网的API地址
self.shares_api = "https://xueqiu.com/S/"
self.top = top
# 如果不存在文件夹"./useful_sec_com_list",则获取股票公司代码列表
if not os.path.exists("./useful_sec_com_list"):
self.useful_sec_com_list = self.get_sec_com_code()
else:
# 否则,从文件中加载已有的列表数据
with open("./useful_sec_com_list", "rb") as fp:
self.useful_sec_com_list = joblib.load(fp)
# 获取股票公司代码列表
def get_sec_com_code(self):
# 发送HTTP请求获取板块股票列表的HTML内容
html = rq.get(self.bk_url, headers=self.headers).content.decode("utf-8")
# 从HTML内容中提取股票公司列表信息
sec_com_list = eval(re.findall("\[(.*?)\]", html)[0])
# 过滤出有效的股票公司列表
useful_sec_com_list = [[i["f12"], i["f14"]] for i in sec_com_list if "ST" not in i["f14"]]
# 将股票代码格式化为统一的格式
for sec_com in useful_sec_com_list:
if sec_com[0][0] == "6":
sec_com[0] = "sh" + sec_com[0]
else:
sec_com[0] = "sz" + sec_com[0]
# 将有效的股票公司列表保存到文件中
with open("useful_sec_com_list", "wb") as fp:
joblib.dump(useful_sec_com_list, fp)
return useful_sec_com_list
# 异步方式获取股票详细信息
async def async_get_shares_details(self, sec_com, url):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers) as response:
html = await response.text()
market_value = re.search("<td>总市值:<span>(.*?)亿</span>", html)
if market_value:
return [*sec_com, market_value.groups()[0]]
# 异步方式获取所有股票的详细信息
async def async_get_all_shares(self):
tasks = []
for sec_com in self.useful_sec_com_list:
url = self.shares_api + sec_com[0]
tasks.append(
asyncio.create_task(
self.async_get_shares_details(sec_com, url)
)
)
done, pending = await asyncio.wait(tasks)
return [share.result() for share in done if share.result()]
# 获取股票详细信息(同步方式)
def get_shares_details(self):
all_shares = []
for sec_com in self.useful_sec_com_list:
url = self.shares_api + sec_com[0]
response = rq.get(url, headers=self.headers).content.decode("utf-8")
market_value = re.search("<td>总市值:<span>(.*?)亿</span>", response)
if market_value:
all_shares.append([*sec_com, market_value.groups()[0]])
return all_shares
# 保存数据到不同类型的存储介质中
def yield_storage(self, save_path, storage_type="all"):
if storage_type == "txt" or storage_type == "all":
self.save_txt(save_path) # 保存为文本文件
if storage_type == "csv" or storage_type == "all":
self.save_csv(save_path) # 保存为CSV文件
if storage_type == "sql" or storage_type == "all":
self.save_sql(save_path) # 保存到SQL数据库中
# 将数据保存为文本文件
def save_txt(self, save_path):
all_shares = asyncio.run(self.async_get_all_shares())
with open(save_path, 'w', encoding='utf-8') as f:
for share in all_shares:
f.write(f"{share[0]}, {share[1]}, {share[2]}\n")
# 将数据保存为CSV文件
def save_csv(self, save_path):
all_shares = asyncio.run(self.async_get_all_shares())
df = pd.DataFrame(all_shares, columns=["股票代码", "公司", "市值(亿)"])
df["市值(亿)"] = df["市值(亿)"].astype(float)
df.sort_values(by="市值(亿)", ascending=False, inplace=True)
df.to_csv(save_path, index=False, encoding='utf-8-sig')
# 将数据保存到SQL数据库中
def save_sql(self, save_path):
all_shares = asyncio.run(self.async_get_all_shares())
with open(save_path, 'w', encoding='utf-8') as f:
for share in all_shares:
f.write(
f"INSERT INTO shares (股票代码, 公司, 市值亿) VALUES ('{share[0]}', '{share[1]}', {share[2]});\n")
# 统一接口,根据存储类型选择不同的保存
def yield_storage(self, save_path, storage_type="all"):
if storage_type == "txt" or storage_type == "all":
self.save_txt(f"{save_path}.txt") # 修改这里,传递相同的文件名
if storage_type == "csv" or storage_type == "all":
self.save_csv(f"{save_path}.csv") # 修改这里,传递相同的文件名
if storage_type == "sql" or storage_type == "all":
self.save_sql(f"{save_path}.sql") # 修改这里,传递相同的文件名
if __name__ == "__main__":
# 创建getTopSecCom类的实例对象
m = getTopSecCom()
save_path = "rank"
# 用户选择存储方式
storage_option = input("请选择存储方式:\n1. txt\n2. csv\n3. sql\n4. all\n请输入数字:")
# 根据用户选择的存储方式执行相应的操作
if storage_option == "1":
# 保存为文本文件
m.yield_storage(f"{save_path}.txt", "txt")
elif storage_option == "2":
# 保存为CSV文件
m.yield_storage(f"{save_path}.csv", "csv")
elif storage_option == "3":
# 保存到SQL数据库中
m.yield_storage(f"{save_path}.sql", "sql")
elif storage_option == "4":
# 全部保存
m.yield_storage(save_path, "all")
else:
# 输入错误提示
print("请输入正确的数字选项。") # 提示用户输入正确的数字选项