You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

395 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sys
import json
import pandas as pd
import time
import os
from tqdm import tqdm
from functools import reduce
from collections import deque
class HlyScore(object):
def __init__(self, serial_day_arr=[7], expectation=[5,10,15,20]):
const_path = sys.path[0].replace("\\analyze_data", "")
f = open(const_path + "\\const.json", "r", encoding="utf8")
self.consts = json.loads(f.read())
self.dayline_file_prefix = self.consts['day_line_file_prefix']['netease_clean']
self.with_my_result = self.consts['path']['result']['hly']
self.serial_day_arr = serial_day_arr # 连续计算的天数
self.expectation = expectation # ma 数组
self.res_matrix = {}
self.res_matrix_reduce = {}
# 计算单只股票的加分
def calculate_score_one(self,code):
column_name = 'hly_score'
try:
df = pd.read_csv("%s%s.csv" % (self.dayline_file_prefix, code), encoding="gbk", error_bad_lines=False)
except:
print("ERROR WHILE OPENING %s.csv" % code)
df = df[::-1]
df[column_name] = ""
last_row = pd.Series([])
for index,row in df.iterrows():
if last_row.empty: score = 0
else: score = self.get_score(last_row,row)
df.loc[index, column_name] = score
last_row = row
df[::-1].to_csv("%s%s.csv" % (self.with_my_result, code), encoding="gbk", index=None)
def calculate_score_all(self):
time_start = time.time()
file_list = os.listdir(self.dayline_file_prefix)
file_count = len(file_list)
for i in tqdm(range(file_count)):
file = file_list[i]
code = file[0:6]
self.calculate_score_one(code)
time_end = time.time()
time_c= time_end - time_start #运行所花时间
print('time cost: %s Seconds' % time_c)
# 计算单只股票的加分和
def calculate_sum_one(self, code):
colmun_name = "sum_%s"
try:
df = pd.read_csv("%s%s.csv" % (self.with_my_result, code), encoding="gbk", error_bad_lines=False)
except:
print("ERROR WHILE OPENING %s.csv" % code)
sum_arr = {}
for sum in self.serial_day_arr:
df[colmun_name % sum] = ""
sum_arr[sum] = deque([])
df = df[::-1]
for index,row in df.iterrows():
for day in sum_arr:
sum_arr[day].append(row['hly_score'])
if len(sum_arr[day]) < int(day):
df.loc[index,colmun_name % day] = 0
else:
df.loc[index,colmun_name % day] = reduce(lambda x,y:x+y,sum_arr[day])
sum_arr[day].popleft()
df[::-1].to_csv("%s%s.csv" % (self.with_my_result, code), encoding="gbk",index=None)
def calculate_sum_all(self):
time_start = time.time()
file_list = os.listdir(self.dayline_file_prefix)
file_count = len(file_list)
for i in tqdm(range(file_count)):
file = file_list[i]
code = file[0:6]
self.calculate_sum_one(code)
time_end = time.time()
time_c= time_end - time_start #运行所花时间
print('time cost: %s Seconds' % time_c)
def get_score(self,yesterday,today):
if yesterday['收盘价'] < today['收盘价']:
if yesterday['成交量'] < today['成交量']:
return 2
else:
return 1
elif yesterday['收盘价'] > today['收盘价']:
if yesterday['成交量'] < today['成交量']:
return -2
else:
return -1
return 0
def analyze_one(self, code):
try:
df = pd.read_csv("%s%s.csv" % (self.with_my_result, code), encoding="gbk", error_bad_lines=False)
except:
print("ERROR WHILE OPENING %s.csv" % code)
colmun_name = "MA_%s"
df = df[::-1]
ma_arr = {}
for sum in self.expectation:
df[colmun_name % sum] = ""
ma_arr[sum] = deque([])
for index,row in df.iterrows():
for ma in ma_arr:
ma_arr[ma].append(row['收盘价'])
if len(ma_arr[ma]) < int(ma):
df.loc[index,colmun_name % ma] = 0
else:
df.loc[index,colmun_name % ma] = self.get_ma(ma_arr[ma])
ma_arr[ma].popleft()
df[::-1].to_csv("%s%s.csv" % (self.with_my_result, code), encoding="gbk",index=None)
def analyze_all(self):
time_start = time.time()
file_list = os.listdir(self.dayline_file_prefix)
file_count = len(file_list)
for i in tqdm(range(file_count)):
file = file_list[i]
code = file[0:6]
self.analyze_one(code)
time_end = time.time()
time_c= time_end - time_start
print('time cost: %s Seconds' % time_c)
def get_ma(self, deq):
res = 0
for i in deq:
res += i
return res / len(deq)
def transform_js(self):
codelist = os.listdir(self.with_my_result)
for i in tqdm(range(len(codelist))):
file = codelist[i]
f = open( "%sjs\\%s.js" % (self.with_my_result, file[0:6]),"w", encoding="utf8")
f.write('''module.exports = [
''')
try:
df = pd.read_csv(self.with_my_result + file, encoding="gbk", error_bad_lines=False)
except:
print("ERROR OPENING %s"%file)
cols = df.columns
for col in cols:
if col[0:2] == "ma":
df.drop([col], axis=1, inplace=True)
# print(df)
# return
for index,row in df.iterrows():
f.write('''%s,
'''% row.to_dict())
f.write(''']''')
f.close()
def count_one(self, code):
try:
df = pd.read_csv("%s%s.csv" % (self.with_my_result, code), encoding="gbk", error_bad_lines=False)
except:
print("ERROR OPENING %s.csv " % code)
return
# 1. 读取 表头
sum_arr = []
ma_arr = []
for col in df.columns:
if col[0:3] == "sum": sum_arr.append(col)
elif col[0:2] == "MA": ma_arr.append(col)
for index,row in df[::-1].iterrows():
# 2. 判断改天的每个打分对应每个展望值是否呈上升趋势True更新矩阵
# 2.1 遍历打分
for daysum in sum_arr:
# 2.2 判断改日打分是否存在
# !!!!!!!!!!!!!!!!!!!!!!!!
# 这里在计算时默认天数不足为 0 ,所以打分一定存在,不作判断,可能会影响结果
# 2.3 遍历 MA
for ma in ma_arr:
# 2.4 判断 ma 日后的 ma 值是否大于当日的 ma
# 2.4.1 判断 ma 日后的数据是否还没有
if index - int(ma[3:]) < 0:
continue
if df.loc[index-int(ma[3:]), ma] > row[ma]:
self.res_plus_plus(daysum, ma)
else:
self.res_reduce_reduce(daysum, ma)
# 统一处理累加
def res_plus_plus(self, sum_key, ma_key, score_key):
score_key = "_%s" % score_key
if not sum_key in self.res_matrix.keys():
self.res_matrix[sum_key] = {}
if not ma_key in self.res_matrix[sum_key].keys():
self.res_matrix[sum_key][ma_key] = {}
if not score_key in self.res_matrix[sum_key][ma_key]:
self.res_matrix[sum_key][ma_key][score_key] = 0
self.res_matrix[sum_key][ma_key][score_key] += 1
def res_reduce_reduce(self, sum_key, ma_key, score_key):
score_key = "_%s" % score_key
if not sum_key in self.res_matrix_reduce.keys():
self.res_matrix_reduce[sum_key] = {}
if not ma_key in self.res_matrix_reduce[sum_key].keys():
self.res_matrix_reduce[sum_key][ma_key] = {}
if not score_key in self.res_matrix_reduce[sum_key][ma_key]:
self.res_matrix_reduce[sum_key][ma_key][score_key] = 0
self.res_matrix_reduce[sum_key][ma_key][score_key] += 1
def count(self):
time_start = time.time()
file_list = os.listdir(self.dayline_file_prefix)
file_count = len(file_list)
for i in tqdm(range(file_count)):
file = file_list[i]
code = file[0:6]
self.count_one(file[0:6])
time_end = time.time()
time_c= time_end - time_start
print('time cost: %s Seconds' % time_c)
print(self.res_matrix)
print(self.res_matrix_reduce)
f = open("%s\\hly_count_res_plus.json" % sys.path[0], "w", encoding="utf-8")
f.write(json.dumps(self.res_matrix, ensure_ascii=False))
f.close()
f = open("%s\\hly_count_res_reduce.json" % sys.path[0], "w", encoding="utf-8")
f.write(json.dumps(self.res_matrix_reduce, ensure_ascii=False))
f.close()
# 将所有操作集中到一起,减少 IO
def do_all_job_one(self, code):
# 1. 打开文件
try:
df = pd.read_csv("%s%s.csv" % (self.dayline_file_prefix, code), encoding="gbk", error_bad_lines=False, index_col=0)
except:
print("ERROR OPENING %s.csv" % code)
return
# 2. 计算打分
column_name = 'hly_score'
df = df[::-1]
df[column_name] = ""
last_row = pd.Series([])
for index,row in df.iterrows():
if last_row.empty: score = 0
else: score = self.get_score(last_row,row)
df.loc[index, column_name] = score
last_row = row
# 3. 计算加分和, 同时计算 MA
column_name_sum = "sum_%s"
column_name_ma = "ma_%s"
sum_arr = {} # 用来保存每个天数队列
ma_arr = {}
# 3.1 遍历所有天数和,新建列, 置为空白
for day_sum in self.serial_day_arr:
df[column_name_sum % day_sum] = ""
sum_arr[day_sum] = deque([])
for sum_ma in self.expectation:
df[column_name_ma % sum_ma] = ""
ma_arr[sum_ma] = deque([])
for index,row in df.iterrows():
# 3.2 对每个 "和" 的天进行计算
for day in sum_arr:
sum_arr[day].append(row['hly_score'])
if len(sum_arr[day]) < int(day):
df.loc[index,column_name_sum % day] = 0
else:
df.loc[index,column_name_sum % day] = reduce(lambda x,y:x+y,sum_arr[day])
sum_arr[day].popleft()
# 3.3 对 每个 MA 的天进行计算
for ma in ma_arr:
ma_arr[ma].append(row["收盘价"])
if len(ma_arr[ma]) < int(ma):
df.loc[index, column_name_ma % ma] = 0
else:
df.loc[index,column_name_ma % ma] = self.get_ma(ma_arr[ma])
ma_arr[ma].popleft()
# 4. 进行统计
# 4.1 读取表头
sum_arr = []
ma_arr = []
for col in df.columns:
if col[0:3] == "sum": sum_arr.append(col)
elif col[0:2] == "ma": ma_arr.append(col)
# print(sum_arr)
# print(ma_arr)
for index, row in df.iterrows():
for daysum in sum_arr:
for ma in ma_arr:
# 4.2 判断 ma 日后的 ma 值是否大于当日的 ma
# 4.2.1 判断 ma 日后的数据是否还没有
if (index-int(ma[3:])) < 0:
continue
if df.loc[index-int(ma[3:]), ma] > row[ma]:
self.res_plus_plus(daysum, ma, row[daysum])
else:
self.res_reduce_reduce(daysum, ma, row[daysum])
# 5. 全部处理完毕,保存
# f = open("%s\\tmp\\%s.json" % (sys.path[0],code), "w", encoding="utf-8")
# f.write(json.dumps(self.res_matrix, ensure_ascii=False))
# f.close()
# print(self.res_matrix)
# print(self.res_matrix_reduce)
df[::-1].to_csv("%s%s.csv"%(self.with_my_result, code),encoding="gbk",index=None)
def yes_run_me(self):
time_start = time.time()
file_list = os.listdir(self.dayline_file_prefix)
file_count = len(file_list)
for i in tqdm(range(file_count)):
file = file_list[i]
code = file[0:6]
self.do_all_job_one(file[0:6])
time_end = time.time()
time_c= time_end - time_start
print('time cost: %s Seconds' % time_c)
print(self.res_matrix)
print(self.res_matrix_reduce)
f = open("%s\\hly_count_res_plus.json" % sys.path[0], "w", encoding="utf-8")
f.write(json.dumps(self.res_matrix, ensure_ascii=False))
f.close()
f = open("%s\\hly_count_res_reduce.json" % sys.path[0], "w", encoding="utf-8")
f.write(json.dumps(self.res_matrix_reduce, ensure_ascii=False))
f.close()
def run_by_thread(self, thread_num):
time_start = time.time()
file_list = os.listdir(self.dayline_file_prefix)
file_count = len(file_list)
offset = file_count / thread_num
offset = math.ceil(offset)
threads = []
for i in range(thread_num):
start = i * offset
end = (i+1) * offset if (i+1) * offset < file_count else -1
thread = threading.Thread(target=self.do_block, args=(start, end))
threads.append(thread)
for t in threads:
t.setDaemon(True)
t.start()
t.join()
time_end = time.time()
time_c= time_end - time_start
print('time cost: %s Seconds' % time_c)
print(self.res_matrix)
print(self.res_matrix_reduce)
f = open("%s\\hly_count_res_plus.json" % sys.path[0], "w", encoding="utf-8")
f.write(json.dumps(self.res_matrix, ensure_ascii=False))
f.close()
f = open("%s\\hly_count_res_reduce.json" % sys.path[0], "w", encoding="utf-8")
f.write(json.dumps(self.res_matrix_reduce, ensure_ascii=False))
f.close()
def do_block(self, start, end):
file_list = os.listdir(self.dayline_file_prefix)
file_list = file_list[start:end]
for index in tqdm(range(len(file_list))):
code = file_list[index]
code = code[0:6]
self.do_all_job_one(code)
if __name__ == "__main__":
serial_day_arr = [5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
expectation = [5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]
# expectation = [5,10,15,20,30]
# serial_day_arr = [6,7,8,9,10]
hly = HlyScore(serial_day_arr=serial_day_arr,expectation=expectation)
# hly.calculate_score_all()
# # hly.calculate_sum_one('000002')
# hly.calculate_sum_all()
# hly.analyze_all()
# hly.transform_js()
# hly.count()
# hly.yes_run_me()
# hly.run_by_thread(8)