vnpy/vn.training/Public/Public.py

481 lines
13 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import os
import copy
import itertools
import math
import datetime
import logging
import logging.handlers
class IndentLogger:
'''
'''
def __init__(self, logger, indent):
self._logger= logger
self._indent= indent
def indent_levelup(self, level=1):
self._indent= self._indent - level
def indent_leveldown(self, level=1):
self._indent= self._indent + level
def set_indent_level(self, level):
self._indent= level
#---------------------------------------------------------------
def set_critical(self):
self._logger.setLevel(logging.CRITICAL)
def set_error(self):
self._logger.setLevel(logging.ERROR)
def set_warning(self):
self._logger.setLevel(logging.WARNING)
def set_info(self):
self._logger.setLevel(logging.INFO)
def set_debug(self):
self._logger.setLevel(logging.DEBUG)
def set_notset(self):
self._logger.setLevel(logging.NOTSET)
#---------------------------------------------------------------
def critical(self, message):
self._logger.critical('\t' * self._indent + message)
def error(self, message):
self._logger.error('\t' * self._indent + message)
def warning(self, message):
self._logger.warning('\t' * self._indent + message)
def info(self, message):
self._logger.info('\t' * self._indent + message)
def debug(self, message):
self._logger.debug('\t' * self._indent + message)
def noset(self, message):
self._logger.noset('\t' * self._indent + message)
def TempLogger(loggername, filename=None, taskdir=None):
'''
'''
if not taskdir:
taskdir= __dir_tmpfiles__
if not os.path.exists(taskdir):
os.mkdir(taskdir, 0o700)
if not filename:
timestamp= datetime.datetime.now()
filename= os.path.join(taskdir, loggername + '_' + timestamp.strftime('%Y-%m-%d_%H:%M:%S,%f'))
else:
filename= os.path.join(taskdir, filename)
if not os.path.exists(filename):
os.mknod(filename, 0o700)
myformatstr= "%(asctime)s %(levelname)-9s>> %(message)s"
myformatter= logging.Formatter(myformatstr)
myhandler= logging.handlers.RotatingFileHandler(filename=filename, mode='a', encoding='utf-8')
myhandler.setFormatter(myformatter)
mylogger= logging.getLogger(name=loggername)
mylogger.setLevel(level=logging.DEBUG)
mylogger.addHandler(myhandler)
ilogger= IndentLogger(logger=mylogger, indent=0)
return ilogger
def 计算个股换手率(个股行情, 个股股本变更记录):
'''
'''
个股股本变更列表= [rec for rec in 个股股本变更记录 if rec['流通股'] != 0 and rec['变更日期'] <= 个股行情['日期'][-1]]
个股股本变更字典= {}
for rec in 个股股本变更列表:
if rec['变更日期'] in 个股行情['日期']:
个股股本变更字典[rec['变更日期']]= rec
else:
个股股本变更字典[ [ds for ds in 个股行情['日期'] if ds > rec['变更日期']][0] ]= rec
当前流通股= 个股股本变更字典[min(个股股本变更字典.keys())]['流通股']
换手率= []
for ds, vol in zip(个股行情['日期'], 个股行情['成交量']):
if ds in 个股股本变更字典:
当前流通股= 个股股本变更字典[ds]['流通股']
换手率.append( vol*100000/当前流通股 )
个股行情['换手率']= 换手率
def 计算复权行情(个股行情, 均线参数=None):
'''
'''
日期= 个股行情['日期']
复权开盘= copy.copy(个股行情['开盘'])
复权最高= copy.copy(个股行情['最高'])
复权收盘= copy.copy(个股行情['收盘'])
复权最低= copy.copy(个股行情['最低'])
复权开收中= copy.copy(个股行情['开收中'])
复权记录= []
sidx= 1
done= False
while not done:
done= True
for idx, date in enumerate(日期[sidx:], start=sidx):
涨幅= (复权开盘[idx] - 复权收盘[idx-1]) / 复权收盘[idx-1]
if 涨幅 <= -0.12:
复权因子= round(复权收盘[idx-1]/复权开盘[idx], 2)
调整因子= round(复权因子, 1)
if abs(round(复权因子-调整因子, 2)) <= 0.01:
复权因子= 调整因子
复权开盘[:idx]= [nr/复权因子 for nr in 复权开盘[:idx]]
复权最高[:idx]= [nr/复权因子 for nr in 复权最高[:idx]]
复权收盘[:idx]= [nr/复权因子 for nr in 复权收盘[:idx]]
复权最低[:idx]= [nr/复权因子 for nr in 复权最低[:idx]]
复权开收中[:idx]= [nr/复权因子 for nr in 复权开收中[:idx]]
复权记录.append( (date, 复权因子) )
sidx= idx
done= False
break
复权行情= {}
复权行情['复权记录']= 复权记录
复权行情['日期']= copy.copy(日期)
复权行情['开盘']= 复权开盘
复权行情['最高']= 复权最高
复权行情['收盘']= 复权收盘
复权行情['最低']= 复权最低
复权行情['开收中']= 复权开收中
if 均线参数:
复权行情['均线集']= { n : 计算序列加权均线(复权开盘, 复权最高, 复权收盘, 复权最低, n) for n in 均线参数 }
return 复权行情
def 计算序列加权均线(开盘序列, 最高序列, 收盘序列, 最低序列, n):
'''
'''
length= len(开盘序列)
if length < n:
return [None] * length
sumhilo= sum(最高序列[:n]) + sum(最低序列[:n])
sumopen= sum(开盘序列[:n])
sumclose= sum(收盘序列[:n])
输出序列= [ ((sumhilo / 2 + sumopen) / 2 + sumclose) / (2*n) ]
for idx in range(n, length):
sumhilo= sumhilo - 最高序列[idx-n] - 最低序列[idx-n] + 最高序列[idx] + 最低序列[idx]
sumopen= sumopen - 开盘序列[idx-n] + 开盘序列[idx]
sumclose= sumclose - 收盘序列[idx-n] + 收盘序列[idx]
输出序列.append( ((sumhilo / 2 + sumopen) / 2 + sumclose) / (2*n) )
return [None] * (n-1) + 输出序列
def 补全个股行情(完整日期, 个股行情):
'''
'''
代码= 个股行情.pop('代码') if '代码' in 个股行情 else None
日期= 个股行情.pop('日期')
for idx, dstr in enumerate(完整日期):
if dstr not in 日期:
日期.insert(idx, dstr)
for seq in 个股行情.values():
seq.insert(idx, None)
if 代码:
个股行情['代码']= 代码
个股行情['日期']= 日期
def 计算个股行情衍生数据(ilogger, 个股行情, 均线参数=None):
'''
'''
length= len(个股行情['开盘'])
开盘= 个股行情['开盘']
最高= 个股行情['最高']
收盘= 个股行情['收盘']
最低= 个股行情['最低']
if 均线参数 and '均线集' not in 个股行情:
个股行情['均线集']= {n : 计算序列加权均线(开盘, 最高, 收盘, 最低, n) for n in 均线参数}
if '均线集' in 个股行情:
个股行情['均线走势标记集']= {n : [None]*(n-1)+计算走势标记(序列=序列[n-1:]) if length>=n else [None]*length for n, 序列 in 个股行情['均线集'].items()}
开收中= 个股行情['开收中']
开收中线走势标记= 计算走势标记(序列=开收中)
个股行情['开收中线走势标记']= 开收中线走势标记
最小长度= 个股行情.pop('目标偏移') if '目标偏移' in 个股行情 else 0
截去行情头部无效片断(行情数据=个股行情, 最小长度=最小长度)
开收中线走势拐点= 计算走势拐点(目标序列=开收中, 走势标记=开收中线走势标记)
个股行情['开收中线走势拐点']= 开收中线走势拐点
def 截去行情头部无效片断(行情数据, 最小长度):
'''
'''
length= len(行情数据['开盘'])
dirlists= [seq for seq in 行情数据.values() if (type(seq) is list) and (len(seq)==length)]
subkeys= ('均线集', '均线走势标记集')
sublists= [行情数据[key].values() for key in subkeys if key in 行情数据]
cntlist= [seq.count(None) for seq in dirlists]
itor= itertools.chain.from_iterable(seq for seq in sublists)
cntlist.extend( [seq.count(None) for seq in itor] )
截去长度= max(最小长度, max(cntlist))
for seq in dirlists:
del seq[:截去长度]
itor= itertools.chain.from_iterable(seq for seq in sublists)
for seq in itor:
del seq[:截去长度]
def 计算均值(序列):
'''
'''
if not 序列:
return None
长度= len(序列)
均值= sum(序列)/长度
最大值= max(序列)
最小值= min(序列)
标准差= math.sqrt(sum([(nr-均值)**2 for nr in 序列]) / 长度)
return (均值, 最大值, 最小值, 标准差, 长度)
def 计算日内定时均线(价格序列, 调整时间序列, 格点粒度, 间隔点数, 定时点数, 需要规整=True):
'''
'''
日期对象= 调整时间序列[0].date()
datetime_0925= datetime.datetime.combine(日期对象, datetime.time(hour=9, minute=25))
格点序列= [datetime_0925 + datetime.timedelta(seconds=格点粒度*i) for i in range(int((3600*4+1000)/格点粒度))]
if 需要规整:
规整时间序列= []
格点当前位置= 0
for 时间 in 调整时间序列:
while 格点序列[格点当前位置] < 时间:
格点当前位置 += 1
前方格点= 格点序列[格点当前位置]
后方格点= 格点序列[格点当前位置-1]
规整时间= 前方格点 if 前方格点-时间 <= 时间-后方格点 else 后方格点
规整时间序列.append(规整时间)
else:
规整时间序列= 调整时间序列
目标格点序列= [时间 for 时间 in 格点序列 if 时间>=规整时间序列[0] and 时间<=规整时间序列[-1]]
补全价格序列= []
当前价格= 价格序列[0]
for 格点 in 目标格点序列:
if 格点 in 规整时间序列:
当前价格= 价格序列[规整时间序列.index(格点)]
补全价格序列.append(当前价格)
定时均线= {}
for 点数 in 定时点数:
偏移序列= range(点数-1, len(目标格点序列), 间隔点数)
时间序列= [目标格点序列[偏移] for 偏移 in 偏移序列]
均线序列= [ sum(补全价格序列[偏移-点数+1 : 偏移+1]) / 点数 for 偏移 in 偏移序列 ]
定时均线[点数]= {
'时间序列': 时间序列,
'均线序列': 均线序列,
}
return 定时均线
def 计算走势标记(序列):
'''
'''
length= len(序列)
if length < 2:
return ['-'] * length
标记序列= []
当前方向= '/' if 序列[1] > 序列[0] else \
'\\' if 序列[1] < 序列[0] else \
'-'
for idx in range(1, length-1):
sign= '/' if 序列[idx] > 序列[idx-1] and 序列[idx+1] >= 序列[idx] else \
'\\' if 序列[idx] < 序列[idx-1] and 序列[idx+1] <= 序列[idx] else \
'^' if 序列[idx] > 序列[idx-1] and 序列[idx+1] < 序列[idx] else \
'v' if 序列[idx] < 序列[idx-1] and 序列[idx+1] > 序列[idx] else \
'/' if 当前方向 in '/-' and 序列[idx+1] > 序列[idx] else \
'\\' if 当前方向 in '\\-' and 序列[idx+1] < 序列[idx] else \
'^' if 当前方向 == '/' and 序列[idx+1] < 序列[idx] else \
'v' if 当前方向 == '\\' and 序列[idx+1] > 序列[idx] else \
'-'
当前方向= '/' if sign in '/v' else \
'\\' if sign in '\\^' else \
当前方向
标记序列.append(sign)
return ['-'] + 标记序列 + ['/' if 序列[-1] > 序列[-2] else '\\' if 序列[-1] < 序列[-2] else '-']
def 计算走势拐点(目标序列, 走势标记, 扩展=True):
'''
'''
length= len(目标序列)
if length <= 2:
return []
走势拐点= []
for idx, sign in [(i, s) for i, s in enumerate(走势标记) if s in ('^', 'v')]:
拐点记录= {}
拐点记录['偏移']= idx
拐点记录['类型']= sign
if 扩展:
# 计算关键度
拐点记录['关键度']= 计算最新极点关键度(序列=目标序列[:idx+1], 类型=sign)['关键度']
走势拐点.append(拐点记录)
return 走势拐点
def 计算最新极点关键度(序列, 类型=None):
'''
'''
长度= len(序列)
if 类型 is None:
for i in range(1, 长度):
if 序列[-i] > 序列[-(i+1)]:
类型= '^'
break
elif 序列[-i] < 序列[-(i+1)]:
类型= 'v'
break
结果= {
'类型': 类型,
'关键度': 长度,
'偏移': 长度-1,
}
if 长度 < 2:
return 结果
if 类型 == '^':
chunk= [idx for idx, item in enumerate(reversed(序列)) if item > 序列[-1]]
elif 类型 == 'v':
chunk= [idx for idx, item in enumerate(reversed(序列)) if item < 序列[-1]]
else:
return 长度
结果['关键度']= chunk[0] if chunk else 长度
return 结果
def repr_data(data, indent=0):
'''
'''
tlist= (list, dict, set, tuple)
dtype= type(data)
if dtype is list:
head= '\t'*indent + '['
body= ',\n'.join( [repr_data(data=item, indent=indent+1) for item in data] )
tail= '\n' + '\t'*indent + ']'
return head + '\t' + body.lstrip() + tail
elif dtype is dict:
head= '\t'*indent + '{'
body= ',\n'.join( ['\t'*(indent+1) + str(key) + ' :' + ( ('\n' + repr_data(data=val, indent=indent+1)) if type(val) in tlist else ('\t' + str(val)) ) for key, val in sorted(data.items())] )
tail= '\n' + '\t'*indent + '}'
return head + '\t' + body.lstrip() + tail
elif dtype is set:
head= '\t'*indent + '{'
body= ',\n'.join( [repr_data(data=item, indent=indent+1) for item in sorted(data)] )
tail= '\n' + '\t'*indent + '}'
return head + '\t' + body.lstrip() + tail
elif dtype is tuple:
head= '\t'*indent + '('
body= ',\n'.join( [repr_data(data=item, indent=indent+1) for item in data] )
tail= '\n' + '\t'*indent + ')'
return head + '\t' + body.lstrip() + tail
else:
return '\t'*indent + str(data)