增加卡尔曼均线/周期判断逻辑

This commit is contained in:
msincenselee 2017-12-11 13:29:26 +08:00
parent eefbd4eba0
commit 13eade96f9

View File

@ -7,11 +7,13 @@
from datetime import datetime
import talib as ta
import numpy
import math
import copy,csv
from pykalman import KalmanFilter
from vnpy.trader.app.ctaStrategy.ctaBase import *
from vnpy.trader.vtConstant import *
from vnpy.trader.app.ctaStrategy.ctaPeriod import *
DEBUGCTALOG = True
@ -53,11 +55,14 @@ class CtaLineBar(object):
# 参数列表,保存了参数的名称
paramList = ['vtSymbol']
def __init__(self, strategy, onBarFunc, setting=None,):
def __init__(self, strategy, onBarFunc, setting=None):
# OnBar事件回调函数
self.onBarFunc = onBarFunc
# 周期变更事件回调函数
self.onPeriodChgFunc = None
# 参数列表
self.paramList.append('barTimeInterval')
self.paramList.append('period')
@ -243,6 +248,13 @@ class CtaLineBar(object):
self.lineStateMean = []
self.lineStateCovar = []
# 周期
self.atan = None
self.atan_list = []
self.curPeriod = None # 当前所在周期
self.periods = []
if setting:
self.setParam(setting)
@ -263,7 +275,6 @@ class CtaLineBar(object):
self.writeCtaLog(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman')
self.inputKF = False
def setParam(self, setting):
"""设置参数"""
d = self.__dict__
@ -298,10 +309,17 @@ class CtaLineBar(object):
self.lastTick = tick
# 更新curPeriod的Highlow
if self.curPeriod is not None:
if self.curTick.lastPrice is None:
self.curTick.lastPrice = (self.curTick.askPrice1 + self.curTick.bidPrice1) / 2
self.curPeriod.onPrice(self.curTick.lastPrice)
# 4.执行 bar内计算
self.__recountKdj(countInBar=True)
def addBar(self,bar):
def addBar(self, bar, bar_is_completed=False):
"""予以外部初始化程序增加bar"""
l1 = len(self.lineBar)
@ -313,10 +331,11 @@ class CtaLineBar(object):
# 与最后一个BAR的时间比对判断是否超过K线的周期
lastBar = self.lineBar[-1]
self.curTradingDay = bar.tradingDay
is_new_bar = False
if bar_is_completed:
is_new_bar = True
if self.period == PERIOD_SECOND and (bar.datetime-lastBar.datetime).seconds >= self.barTimeInterval:
is_new_bar = True
@ -327,11 +346,9 @@ class CtaLineBar(object):
elif self.period == PERIOD_HOUR:
if self.barTimeInterval == 1 and bar.datetime.hour != lastBar.datetime.hour :
is_new_bar = True
elif self.barTimeInterval == 2 and bar.datetime.hour != lastBar.datetime.hour \
and bar.datetime.hour in {1, 9, 11, 13, 15, 21, 23}:
is_new_bar = True
elif self.barTimeInterval == 4 and bar.datetime.hour != lastBar.datetime.hour \
and bar.datetime.hour in {1, 9, 13, 21}:
is_new_bar = True
@ -376,6 +393,7 @@ class CtaLineBar(object):
self.__recountMacd()
self.__recountCci()
self.__recountKF()
self.__recoundPeriod(bar)
# 回调上层调用者
self.onBarFunc(bar)
@ -392,7 +410,7 @@ class CtaLineBar(object):
else:
displayBar = self.lineBar[-1]
msg = msg + u'{0} o:{1};h{2};l:{3};c:{4},v:{5}'.\
msg = msg + u'{0} o:{1};h:{2};l:{3};c:{4},v:{5}'.\
format(displayBar.date+' '+displayBar.time, displayBar.open, displayBar.high,
displayBar.low, displayBar.close, displayBar.volume)
@ -1338,7 +1356,6 @@ class CtaLineBar(object):
但是talib中MACD的计算是bar = (dif-dea)*1
"""
if self.inputMacdFastPeriodLen <= EMPTY_INT: return
if self.inputMacdSlowPeriodLen <= EMPTY_INT: return
if self.inputMacdSignalPeriodLen <= EMPTY_INT: return
@ -1430,21 +1447,26 @@ class CtaLineBar(object):
if not self.inputKF or self.kf is None:
return
if len(self.lineBar) < min_len:
# 数量不足时不做滤波处理直接吻合若改为EMA更好
if self.mode == self.TICK_MODE and len(self.lineBar)>1:
self.lineStateMean.append(self.lineBar[-2].close)
else:
self.lineStateMean.append(self.lineBar[-1].close)
return
if len(self.lineStateMean) == 0 or len(self.lineStateCovar) == 0:
listClose = []
# 3、获取前InputN周期(不包含当前周期的K线
if self.mode == self.TICK_MODE:
listClose = [x.close for x in self.lineBar[-min_len - 1:-1]]
if len(self.lineBar)<2:
return
listClose.append(self.lineBar[-2].close)
else:
listClose = [x.close for x in self.lineBar[-min_len:]]
listClose.append(self.lineBar[-1].close)
try:
self.kf = KalmanFilter(transition_matrices=[1],
observation_matrices=[1],
initial_state_mean=listClose[-1],
initial_state_covariance=1,
transition_covariance=0.01)
except:
self.writeCtaLog(u'导入卡尔曼过滤器失败,需先安装 pip install pykalman')
self.inputKF = False
state_means, state_covariances = self.kf.filter(numpy.array(listClose, dtype=float))
m = state_means[-1].item()
@ -1471,6 +1493,240 @@ class CtaLineBar(object):
self.lineStateMean.append(m)
self.lineStateCovar.append(c)
def __recoundPeriod(self, bar):
"""重新计算周期"""
len_rsi = len(self.lineRsi1)
if self.inputKF:
if len(self.lineStateMean) < 7 or len_rsi <=0:
return
listMid = self.lineStateMean[-7:-1]
malist = ta.MA(numpy.array(listMid, dtype=float), 5)
lastMid = self.lineStateMean[-1]
else:
len_boll = len(self.lineMiddleBand)
if len_boll <= 6 or len_rsi <= 0:
return
listMid = self.lineMiddleBand[-7:-1]
lastMid = self.lineMiddleBand[-1]
malist = ta.MA(numpy.array(listMid, dtype=float), 5)
ma5 = malist[-1]
ma5_ref1 = malist[-2]
if ma5 <= 0 or ma5_ref1 <= 0:
self.writeCtaLog(u'boll中轨计算均线异常')
return
self.atan = math.atan((ma5 / ma5_ref1 - 1) * 100 * 180 / math.pi)
#atan2 = math.atan((ma5 / ma5_ref1 - 1) * 100) * 180 / math.pi
#atan3 = math.atan(ma5 / ma5_ref1 - 1)* 100
self.atan = round(self.atan,3)
#self.writeCtaLog(u'{}/{}/{}'.format(self.atan, atan2, atan3))
if self.curPeriod is None:
self.writeCtaLog(u'初始化周期为震荡')
self.curPeriod = CtaPeriod(mode=PERIOD_SHOCK, price=bar.close, pre_mode=PERIOD_INIT, dt=bar.datetime)
self.periods.append(self.curPeriod)
if len(self.atan_list) > 10:
del self.atan_list[0]
self.atan_list.append(self.atan)
if len_rsi < 3:
return
# 当前期趋势是震荡
if self.curPeriod.mode == PERIOD_SHOCK:
# 初始化模式
if self.curPeriod.pre_mode == PERIOD_INIT:
if self.atan < -0.8:
self.curPeriod = CtaPeriod(mode=PERIOD_SHORT_EXTREME, price=bar.close, pre_mode=PERIOD_SHORT,
dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度向下,Atan:{},周期{}=》{}'.
format(bar.datetime, self.atan, self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
return
elif self.atan > 0.8:
self.curPeriod = CtaPeriod(mode=PERIOD_LONG_EXTREME, price=bar.close, pre_mode=PERIOD_LONG,
dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度加速向上,Atan:{},周期:{}=>{}'.
format(bar.datetime, self.atan, self.curPeriod.pre_mode,
self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
return
# 震荡 -》 空
if self.atan <= -0.2:
self.curPeriod = CtaPeriod(mode=PERIOD_SHORT, price=bar.close, pre_mode=PERIOD_SHOCK, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度向下,Atan:{},周期{}=》{}'.
format(bar.datetime,self.atan, self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 震荡 =》 多
elif self.atan >= 0.2:
self.curPeriod = CtaPeriod(mode=PERIOD_LONG, price=bar.close, pre_mode=PERIOD_SHOCK,dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度向上,Atan:{},周期:{}=>{}'.
format(bar.datetime,self.atan, self.curPeriod.pre_mode,
self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 周期维持不变
else:
self.writeCtaLog(u'{} 角度维持Atan:{},周期维持:{}'.
format(bar.datetime, self.atan, self.curPeriod.mode))
return
# 当前期趋势是空
if self.curPeriod.mode == PERIOD_SHORT:
# 空=》空极端
if self.atan <= -0.8 and self.atan_list[-1] < self.atan_list[-2]:
self.curPeriod = CtaPeriod(mode=PERIOD_SHORT_EXTREME, price=bar.close, pre_mode=PERIOD_SHORT, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度极端向下,Atan:{},注意反弹。周期:{}=>{}'.
format(bar.datetime,self.atan, self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 空=》震荡
elif -0.2 < self.atan < 0.2 or (self.atan >= 0.2 and self.atan_list[-2] <= -0.2):
self.curPeriod = CtaPeriod(mode=PERIOD_SHOCK, price=bar.close, pre_mode=PERIOD_SHORT, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度平缓Atan:{},结束下降趋势。周期:{}=>{}'.
format(bar.datetime, self.atan, self.curPeriod.pre_mode,self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
elif self.atan > 0.2 and self.curPeriod.pre_mode == PERIOD_LONG_EXTREME and self.atan_list[-1] > self.atan_list[-2] and bar.close > lastMid:
self.curPeriod = CtaPeriod(mode=PERIOD_SHOCK, price=bar.close, pre_mode=PERIOD_SHORT, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度平缓Atan:{},结束下降趋势。周期:{}=>{}'.
format(bar.datetime, self.atan, self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 周期维持空
else:
self.writeCtaLog(u'{} 角度向下{},周期维持:{}'.
format(bar.datetime, self.atan, self.curPeriod.mode))
return
# 当前期趋势是多
if self.curPeriod.mode == PERIOD_LONG:
# 多=》多极端
if self.atan >= 0.8 and self.atan_list[-1] > self.atan_list[-2]:
self.curPeriod = CtaPeriod(mode=PERIOD_LONG_EXTREME, price=bar.close, pre_mode=PERIOD_LONG, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度加速向上,Atan:{},周期:{}=>{}'.
format(bar.datetime, self.atan, self.curPeriod.pre_mode,
self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 多=》震荡
elif -0.2 < self.atan < 0.2 or (self.atan <= -0.2 and self.atan_list[-2] >= 0.2):
self.curPeriod = CtaPeriod(mode=PERIOD_SHOCK, price=bar.close, pre_mode=PERIOD_LONG, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度平缓,Atan:{},结束上升趋势。周期:{}=>{}'.
format(bar.datetime, self.atan, self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 多=》震荡
elif self.atan < -0.2 and self.curPeriod.pre_mode == PERIOD_SHORT_EXTREME and self.atan_list[-1] < self.atan_list[-2] and bar.close < lastMid:
self.curPeriod = CtaPeriod(mode=PERIOD_SHOCK, price=bar.close, pre_mode=PERIOD_LONG, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度平缓,Atan:{},结束上升趋势。周期:{}=>{}'.
format(bar.datetime, self.atan, self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 周期保持多
else:
self.writeCtaLog(u'{} 角度向上,Atan:{},周期维持:{}'.
format(bar.datetime, self.atan, self.curPeriod.mode))
return
# 当前周期为多极端
if self.curPeriod.mode == PERIOD_LONG_EXTREME:
# 多极端 =》 空
if self.lineRsi1[-1] < self.lineRsi1[-2] \
and max(self.lineRsi1[-5:-2]) >= 50 \
and bar.close < lastMid:
self.curPeriod = CtaPeriod(mode=PERIOD_SHORT, price=bar.close, pre_mode=PERIOD_LONG_EXTREME,
dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度高位反弹向下Atan:{} , RSI {}=》{},{}下穿中轨{},周期:{}=》{}'.
format(bar.datetime, self.atan, self.lineRsi1[-2], self.lineRsi1[-1],
bar.close,lastMid,
self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 多极端 =》多
elif self.lineRsi1[-1] < self.lineRsi1[-2] \
and bar.close > lastMid:
self.curPeriod = CtaPeriod(mode=PERIOD_LONG, price=bar.close, pre_mode=PERIOD_LONG_EXTREME, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度上加速放缓Atan:{}, & RSI{}=>{},周期:{}=》{}'.
format(bar.datetime, self.atan, self.lineRsi1[-2], self.lineRsi1[-1],
self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 当前趋势保持多极端
else:
self.writeCtaLog(u'{} 角度向上加速{},周期维持:{}'.
format(bar.datetime, self.atan, self.curPeriod.mode))
return
# 当前周期为空极端
if self.curPeriod.mode == PERIOD_SHORT_EXTREME:
# 空极端 =》多
if self.lineRsi1[-1] > self.lineRsi1[-2] and min(self.lineRsi1[-5:-2]) <= 50 \
and bar.close > lastMid:
self.curPeriod = CtaPeriod(mode=PERIOD_LONG, price=bar.close, pre_mode=PERIOD_SHORT_EXTREME, dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度下极限低位反弹转折,Atan:{}, RSI:{}=>{},周期:{}=>{}'.
format(bar.datetime, self.atan, self.lineRsi1[-2], self.lineRsi1[-1],
self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 空极端=》空
elif self.lineRsi1[-1] > self.lineRsi1[-2] and bar.close < lastMid:
self.curPeriod = CtaPeriod(mode=PERIOD_SHORT, price=bar.close, pre_mode=PERIOD_SHORT_EXTREME,
dt=bar.datetime)
self.periods.append(self.curPeriod)
self.writeCtaLog(u'{} 角度下加速放缓Atan:{},RSI:{}=>{}, ,周期:{}=>{}'.
format(bar.datetime, self.atan, self.lineRsi1[-2],self.lineRsi1[-1],
self.curPeriod.pre_mode, self.curPeriod.mode))
if self.onPeriodChgFunc is not None:
self.onPeriodChgFunc(self.curPeriod)
# 保持空极端趋势
else:
self.writeCtaLog(u'{} 角度向下加速,Atan:{},周期维持:{}'.
format(bar.datetime, self.atan, self.curPeriod.mode))
return
# ----------------------------------------------------------------------
def writeCtaLog(self, content):
"""记录CTA日志"""
@ -1511,14 +1767,12 @@ class CtaDayBar(object):
self.onBarFunc = onBarFunc
self.lineBar = []
self.currTick = None
self.lastTick = None
self.shortSymbol = EMPTY_STRING # 商品的短代码
self.minDiff = 1 # 商品的最小价格单位
def onTick(self, tick):
"""行情更新"""
@ -1526,10 +1780,8 @@ class CtaDayBar(object):
self.currTick = tick
self.__drawLineBar(tick)
self.lastTick = tick
def addBar(self, bar):
"""予以外部初始化程序增加bar"""
l1 = len(self.lineBar)
@ -1556,7 +1808,6 @@ class CtaDayBar(object):
lastBar.high = max(lastBar.high, bar.high)
lastBar.low = min(lastBar.low, bar.low)
lastBar.mid4 = round((2 * lastBar.close + lastBar.high + lastBar.low) / 4, self.round_n)
lastBar.mid5 = round((2 * lastBar.close + lastBar.open + lastBar.high + lastBar.low) / 5, self.round_n)
@ -1638,7 +1889,6 @@ class CtaDayBar(object):
lastBar.low = min(lastBar.low, tick.lastPrice)
lastBar.close = tick.lastPrice
# 更新Bar的颜色
if lastBar.close > lastBar.open:
lastBar.color = COLOR_RED