You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
4.6 KiB

1 year ago
import sys
import json
import pandas as pd
import tushare as ts
import os
from tqdm import tqdm
import time
import random
class DataClean(object):
def __init__(self, end_date="20120101"):
const_path = sys.path[0].replace("\\clean_data", "")
f = open(const_path + "\\const.json", "r", encoding="utf8")
self.consts = json.loads(f.read())
self.end_date = end_date
self.tushare_token = self.consts["tushare"]["token"]
self.day_line_file_prefix = self.consts["day_line_file_prefix"]["netease"]
self.COUNT_INVALID_DATA = 0
self.COUNT_INVALID_CODE = 0
# 处理单只股票
def handle_one(self, code):
# handled = self.is_handled(code)
# if handled: return
try:
df = pd.read_csv("%s%s.csv" % (self.day_line_file_prefix, code), encoding="gbk", error_bad_lines=False)
except:
print("ERROR While Opening Code %s" % code )
return
newData = pd.DataFrame([],columns=df.columns)
codeInfo = pd.Series([])
for index,row in df.iterrows():
if row['日期'] < self.end_date: continue
flag = True # 标记是否有效
new_row = row
# 遍历行中的每一项
for i,val in row.items():
if val == "None" or val == "NaN" or (val == 0 and (i[:2] != '涨跌')):
flag = False
if codeInfo.empty:
try:
codeInfo = pd.read_csv("%s%s.csv"%(self.consts['day_line_file_prefix']['tushare'], code), encoding="gbk")
except:
print("打开tushare %s失败" % code)
invalid_date = "".join(row['日期'].split("-"))
try:
tushare_row = codeInfo.loc[codeInfo['trade_date']==invalid_date]
except:
tushare_row = pd.Series([])
if not tushare_row.empty: # tushare 有这一天的数据
new_row = pd.Series([
row['日期'], # 日期
row['股票代码'], # 股票代码
row['名称'], # 名称
tushare_row['close'], # 收盘价
tushare_row['high'], # 最高价
tushare_row['low'], # 最低价
tushare_row['open'], # 开盘价
tushare_row['prev_close'], # 前收盘
tushare_row['change'], # 涨跌额
tushare_row['pct_chg'], # 涨跌幅
0, # 换手率
tushare_row['vol'], # 成交量
tushare_row['amount'], # 成交额
row['总市值'], # 总市值
row['流通市值'], # 流通市值
], columns=df.columns)
newData.loc[len(newData.index)] = new_row
break # 该行以发现无效数据, 整行处理, 不继续遍历改行的剩余元素
if (not flag):
# print("[%s.csv] 在 [%s] 数据无效;" % (code, row['日期']))
self.COUNT_INVALID_DATA += 1 # 埋点,统计无效的数据数量
else:
newData.loc[len(newData.index)] = row
if not codeInfo.empty: self.COUNT_INVALID_CODE+=1 # 埋点,统计无效的股票数量
newData.to_csv('%s%s.csv'%(self.consts['day_line_file_prefix']['netease_clean'], code), encoding="gbk")
def handle_all(self):
time_start = time.time()
file_list = os.listdir(self.day_line_file_prefix)
file_count = len(file_list)
for i in tqdm(range(file_count)):
file = file_list[i]
code = file[0:6]
self.handle_one(code)
# time.sleep(1)
print("无效的数据数量:", self.COUNT_INVALID_DATA)
print("有无效数据的股票数量:", self.COUNT_INVALID_CODE)
time_end = time.time()
time_c= time_end - time_start #运行所花时间
print('time cost: %s Seconds' % time_c)
def is_handled(self, code):
try:
df = pd.read_csv('%snew\\%s.csv'%(self.consts['day_line_file_prefix']['netease_clean'], code), encoding="gbk")
return True
except:
return False
if __name__ == "__main__":
data_clean = DataClean()
# data_clean.handle_one('000503')
data_clean.handle_all()