[Del]移除CTA策略模块下的tools目录

This commit is contained in:
vn.py 2017-11-05 13:41:26 +08:00
parent eac2665249
commit 934f472f14
6 changed files with 0 additions and 1784 deletions

View File

@ -1,10 +0,0 @@
# CTA策略开发相关的工具代码
### ctaLineBar.py
* 简介CTA策略开发中常用的K线类可以基于tick自动生成K线并提供EMA、DMI、ATR、RSI等常用技术指标的计算
* 贡献者:李来佳
* WeChat/QQ: 28888502
### multiTimeFrame
* 简介基于CTA模块扩展了回测和交易功能允许策略中引用辅助品种信息其他时间框架、其他合约同时提供了一个突破策略的例子
* 贡献者:周正舟

View File

@ -1,842 +0,0 @@
# encoding: UTF-8
# AUTHOR:李来佳
# WeChat/QQ: 28888502
from vtConstant import *
from vnpy.trader.vtObject import VtBarData
from datetime import datetime
import talib as ta
import numpy
import copy,csv
DEBUGCTALOG = True
class CtaLineBar(object):
"""CTA K线"""
""" 使用方法:
1在策略构造函数__init()中初始化
self.lineM = None # 1分钟K线
lineMSetting = {}
lineMSetting['name'] = u'M1'
lineMSetting['barTimeInterval'] = 60 # 1分钟对应60秒
lineMSetting['inputEma1Len'] = 7 # EMA线1的周期
lineMSetting['inputEma2Len'] = 21 # EMA线2的周期
lineMSetting['inputBollLen'] = 20 # 布林特线周期
lineMSetting['inputBollStdRate'] = 2 # 布林特线标准差
lineMSetting['minDiff'] = self.minDiff # 最小条
lineMSetting['shortSymbol'] = self.shortSymbol #商品短号
self.lineM = CtaLineBar(self, self.onBar, lineMSetting)
2在onTick()需要导入tick数据
self.lineM.onTick(tick)
self.lineM5.onTick(tick) # 如果你使用2个周期
3在onBar事件中按照k线结束使用其他任何情况下bar内使用通过对象使用即可self.lineM.lineBar[-1].close
"""
# 参数列表,保存了参数的名称
paramList = ['vtSymbol']
def __init__(self, strategy, onBarFunc, setting=None,):
# OnBar事件回调函数
self.onBarFunc = onBarFunc
# 参数列表
self.paramList.append('barTimeInterval')
self.paramList.append('inputPreLen')
self.paramList.append('inputEma1Len')
self.paramList.append('inputEma2Len')
self.paramList.append('inputDmiLen')
self.paramList.append('inputDmiMax')
self.paramList.append('inputAtr1Len')
self.paramList.append('inputAtr2Len')
self.paramList.append('inputAtr3Len')
self.paramList.append('inputVolLen')
self.paramList.append('inputRsiLen')
self.paramList.append('inputCmiLen')
self.paramList.append('inputBollLen')
self.paramList.append('inputBollStdRate')
self.paramList.append('minDiff')
self.paramList.append('shortSymbol')
self.paramList.append('activeDayJump')
self.paramList.append('name')
# 输入参数
self.name = u'LineBar'
self.barTimeInterval = 300
self.inputPreLen = EMPTY_INT #1
self.inputEma1Len = EMPTY_INT # 13
self.inputEma2Len = EMPTY_INT # 21
self.inputDmiLen = EMPTY_INT # 14 # DMI的计算周期
self.inputDmiMax = EMPTY_FLOAT # 30 # Dpi和Mdi的突破阈值
self.inputAtr1Len = EMPTY_INT # 10 # ATR波动率的计算周期(近端)
self.inputAtr2Len = EMPTY_INT # 26 # ATR波动率的计算周期常用
self.inputAtr3Len = EMPTY_INT # 50 # ATR波动率的计算周期远端
self.inputVolLen = EMPTY_INT # 14 # 平均交易量的计算周期
self.inputRsiLen = EMPTY_INT # 7 # RSI 相对强弱指数
self.shortSymbol = EMPTY_STRING # 商品的短代码
self.minDiff = 1 # 商品的最小价格单位
self.activeDayJump = False # 隔夜跳空
# 当前的Tick
self.curTick = None
# K 线服务的策略
self.strategy = strategy
# K线保存数据
self.bar = None # K线数据对象
self.lineBar = [] # K线缓存数据队列
self.barFirstTick =False # K线的第一条Tick数据
# K 线的相关计算结果数据
self.preHigh = [] # K线的前inputPreLen的的最高
self.preLow = [] # K线的前inputPreLen的的最低
self.lineEma1 = [] # K线的EMA1均线周期是InputEmaLen1包含当前bar
self.lineEma1MtmRate = [] # K线的EMA1均线 的momentum(3) 动能
self.lineEma2 = [] # K线的EMA2均线周期是InputEmaLen2包含当前bar
self.lineEma2MtmRate = [] # K线的EMA2均线 的momentum(3) 动能
# K线的DMI( PdiMdiADXAdxr) 计算数据
self.barPdi = EMPTY_FLOAT # bar内的升动向指标即做多的比率
self.barMdi = EMPTY_FLOAT # bar内的下降动向指标即做空的比率
self.linePdi = [] # 升动向指标,即做多的比率
self.lineMdi = [] # 下降动向指标,即做空的比率
self.lineDx = [] # 趋向指标列表最大长度为inputM*2
self.barAdx = EMPTY_FLOAT # Bar内计算的平均趋向指标
self.lineAdx = [] # 平均趋向指标
self.barAdxr = EMPTY_FLOAT # 趋向平均值为当日ADX值与M日前的ADX值的均值
self.lineAdxr = [] # 平均趋向变化指标
# K线的基于DMI、ADX计算的结果
self.barAdxTrend = EMPTY_FLOAT # ADX值持续高于前一周期时市场行情将维持原趋势
self.barAdxrTrend = EMPTY_FLOAT # ADXR值持续高于前一周期时,波动率比上一周期高
self.buyFilterCond = False # 多过滤器条件,做多趋势的判断ADX高于前一天上升动向> inputMM
self.sellFilterCond = False # 空过滤器条件,做空趋势的判断ADXR高于前一天下降动向> inputMM
# K线的ATR技术数据
self.lineAtr1 = [] # K线的ATR1,周期为inputAtr1Len
self.lineAtr2 = [] # K线的ATR2,周期为inputAtr2Len
self.lineAtr3 = [] # K线的ATR3,周期为inputAtr3Len
self.barAtr1 = EMPTY_FLOAT
self.barAtr2 = EMPTY_FLOAT
self.barAtr3 = EMPTY_FLOAT
# K线的交易量平均
self.lineAvgVol = [] # K 线的交易量平均
# K线的RSI计算数据
self.lineRsi = [] # 记录K线对应的RSI数值只保留inputRsiLen*8
self.lowRsi = 30 # RSI的最低线
self.highRsi = 70 # RSI的最高线
self.lineRsiTop = [] # 记录RSI的最高峰只保留 inputRsiLen个
self.lineRsiButtom = [] # 记录RSI的最低谷只保留 inputRsiLen个
self.lastRsiTopButtom = None # 最近的一个波峰/波谷
# K线的CMI计算数据
self.inputCmiLen = EMPTY_INT
self.lineCmi = [] # 记录K线对应的Cmi数值只保留inputCmiLen*8
# K线的布林特计算数据
self.inputBollLen = EMPTY_INT # K线周期
self.inputBollStdRate = 1.5 # 两倍标准差
self.lineUpperBand = [] # 上轨
self.lineMiddleBand = [] # 中线
self.lineLowerBand = [] # 下轨
if setting:
self.setParam(setting)
def setParam(self, setting):
"""设置参数"""
d = self.__dict__
for key in self.paramList:
if key in setting:
d[key] = setting[key]
def onTick(self, tick):
"""行情更新
:type tick: object
"""
# Tick 有效性检查
#if (tick.datetime- datetime.now()).seconds > 10:
# self.writeCtaLog(u'无效的tick时间:{0}'.format(tick.datetime))
# return
if tick.datetime.hour == 8 or tick.datetime.hour == 20:
self.writeCtaLog(u'竞价排名tick时间:{0}'.format(tick.datetime))
return
self.curTick = tick
# 3.生成x K线若形成新Bar则触发OnBar事件
self.__drawLineBar(tick)
def addBar(self,bar):
"""予以外部初始化程序增加bar"""
l1 = len(self.lineBar)
if l1 == 0:
self.lineBar.append(bar)
self.onBar(bar)
return
# 与最后一个BAR的时间比对判断是否超过K线的周期
lastBar = self.lineBar[-1]
if (bar.datetime - lastBar.datetime).seconds >= self.barTimeInterval:
self.lineBar.append(bar)
self.onBar(bar)
return
# 更新最后一个bar
lastBar.close = bar.close
lastBar.high = max(lastBar.high, bar.high)
lastBar.low = min(lastBar.low, bar.low)
lastBar.volume = lastBar.volume + bar.volume
def onBar(self, bar):
"""OnBar事件"""
# 计算相关数据
self.__recountPreHighLow()
self.__recountEma()
self.__recountDmi()
self.__recountAtr()
self.__recountAvgVol()
self.__recountRsi()
self.__recountCmi()
self.__recountBoll()
# 回调上层调用者
self.onBarFunc(bar)
def __firstTick(self,tick):
""" K线的第一个Tick数据"""
self.bar = VtBarData() # 创建新的K线
self.bar.vtSymbol = tick.vtSymbol
self.bar.symbol = tick.symbol
self.bar.exchange = tick.exchange
self.bar.open = tick.lastPrice # O L H C
self.bar.high = tick.lastPrice
self.bar.low = tick.lastPrice
self.bar.close = tick.lastPrice
# K线的日期时间
self.bar.date = tick.date # K线的日期时间去除秒设为第一个Tick的时间
self.bar.time = tick.time # K线的日期时间去除秒设为第一个Tick的时间
self.bar.datetime = tick.datetime
self.bar.volume = tick.volume
self.bar.openInterest = tick.openInterest
self.barFirstTick = True # 标识该Tick属于该Bar的第一个tick数据
self.lineBar.append(self.bar) # 推入到lineBar队列
# ----------------------------------------------------------------------
def __drawLineBar(self, tick):
"""生成 line Bar """
l1 = len(self.lineBar)
# 保存第一个K线数据
if l1 == 0:
self.__firstTick(tick)
self.onBar(self.bar)
return
# 清除8交易小时前的数据
if l1 > 60 * 8:
del self.lineBar[0]
# 与最后一个BAR的时间比对判断是否超过5分钟
lastBar = self.lineBar[-1]
# 专门处理隔夜跳空。隔夜跳空会造成开盘后EMA和ADX的计算错误。
if len(self.lineAtr2) < 1:
priceInBar = 5 * self.minDiff
else:
priceInBar = self.lineAtr2[-1]
jumpBars = int(abs(tick.lastPrice - lastBar.close)/priceInBar)
# 开盘时间
if (tick.datetime.hour == 9 or tick.datetime.hour == 21) \
and tick.datetime.minute == 0 and tick.datetime.second == 0 \
and lastBar.datetime.hour != tick.datetime.hour \
and jumpBars > 0 and self.activeDayJump:
priceInYesterday = lastBar.close
self.writeCtaLog(u'line Bar jumpbars:{0}'.format(jumpBars))
if tick.lastPrice > priceInYesterday: # 价格往上跳
# 生成砖块递增K线,减小ATR变动
for i in range(0, jumpBars, 1):
upbar = copy.deepcopy(lastBar)
upbar.open = priceInYesterday + float(i * priceInBar)
upbar.low = upbar.open
upbar.close = priceInYesterday + float((i+1) * priceInBar)
upbar.high = upbar.close
upbar.volume = 0
self.lineBar.append(upbar)
self.onBar(upbar)
else: # 价格往下跳
# 生成递减K线,减小ATR变动
for i in range(0, jumpBars, 1):
downbar = copy.deepcopy(lastBar)
downbar.open = priceInYesterday - float(i * priceInBar)
downbar.high = downbar.open
downbar.close = priceInYesterday - float((i+1) * priceInBar)
downbar.low = downbar.close
downbar.volume = 0
self.lineBar.append(downbar)
self.onBar(downbar)
# 生成平移K线减小PdiMdi、ADX变动
for i in range(0, jumpBars*2, 1):
equalbar=copy.deepcopy(self.lineBar[-1])
equalbar.volume = 0
self.lineBar.append(equalbar)
self.onBar(equalbar)
# 重新指定为最后一个Bar
lastBar = self.lineBar[-1]
# 处理日内的间隔时段最后一个tick如10:15分11:30分15:00 和 2:30分
endtick = False
if (tick.datetime.hour == 10 and tick.datetime.minute == 15 ) \
or (tick.datetime.hour == 11 and tick.datetime.minute == 30 ) \
or (tick.datetime.hour == 15 and tick.datetime.minute == 00 ) \
or (tick.datetime.hour == 2 and tick.datetime.minute == 30 ):
endtick = True
if self.shortSymbol in NIGHT_MARKET_SQ2 and tick.datetime.hour == 1 and tick.datetime.minute == 00:
endtick = True
if self.shortSymbol in NIGHT_MARKET_SQ3 and tick.datetime.hour == 23 and tick.datetime.minute == 00:
endtick = True
if self.shortSymbol in NIGHT_MARKET_ZZ or self.shortSymbol in NIGHT_MARKET_DL:
if tick.datetime.hour == 23 and tick.datetime.minute == 30:
endtick = True
# 满足时间要求
if (tick.datetime-lastBar.datetime).seconds >= self.barTimeInterval and not endtick:
# 创建并推入新的Bar
self.__firstTick(tick)
# 触发OnBar事件
self.onBar(lastBar)
else:
# 更新当前最后一个bar
self.barFirstTick = False
# 更新最高价、最低价、收盘价、成交量
lastBar.high = max(lastBar.high, tick.lastPrice)
lastBar.low = min(lastBar.low, tick.lastPrice)
lastBar.close = tick.lastPrice
lastBar.volume = lastBar.volume + tick.volume
# 更新Bar的颜色
if lastBar.close > lastBar.open:
lastBar.color = COLOR_RED
elif lastBar.close < lastBar.open:
lastBar.color = COLOR_BLUE
else:
lastBar.color = COLOR_EQUAL
# ----------------------------------------------------------------------
def __recountPreHighLow(self):
"""计算 K线的前周期最高和最低"""
if self.inputPreLen <= 0: # 不计算
return
# 1、lineBar满足长度才执行计算
if len(self.lineBar) < self.inputPreLen:
self.writeCtaLog(u'数据未充分,当前Bar数据数量{0}计算High、Low需要{1}'.
format(len(self.lineBar), self.inputPreLen))
return
# 2.计算前inputPreLen周期内(不包含当前周期的Bar高点和低点
preHigh = EMPTY_FLOAT
preLow = EMPTY_FLOAT
for i in range(len(self.lineBar)-2, len(self.lineBar)-2-self.inputPreLen, -1):
if self.lineBar[i].high > preHigh or preHigh == EMPTY_FLOAT:
preHigh = self.lineBar[i].high # 前InputPreLen周期高点
if self.lineBar[i].low < preLow or preLow == EMPTY_FLOAT:
preLow = self.lineBar[i].low # 前InputPreLen周期低点
# 保存
if len(self.preHigh) > self.inputPreLen * 8:
del self.preHigh[0]
self.preHigh.append(preHigh)
# 保存
if len(self.preLow)> self.inputPreLen * 8:
del self.preLow[0]
self.preLow.append(preLow)
#----------------------------------------------------------------------
def __recountEma(self):
"""计算K线的EMA1 和EMA2"""
l = len(self.lineBar)
# 1、lineBar满足长度才执行计算
if len(self.lineBar) < max(7, self.inputEma1Len, self.inputEma2Len)+2:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算EMA需要{1}'.
format(len(self.lineBar), max(7, self.inputEma1Len, self.inputEma2Len)+2))
return
# 计算第一条EMA均线
if self.inputEma1Len > 0:
if self.inputEma1Len > l:
ema1Len = l
else:
ema1Len = self.inputEma1Len
# 3、获取前InputN周期(不包含当前周期)的自适应均线
listClose=[x.close for x in self.lineBar[-ema1Len - 1:-1]]
barEma1 = ta.EMA(numpy.array(listClose, dtype=float), ema1Len)[-1]
barEma1 = round(float(barEma1), 3)
if len(self.lineEma1) > self.inputEma1Len*8:
del self.lineEma1[0]
self.lineEma1.append(barEma1)
# 计算第二条EMA均线
if self.inputEma2Len > 0:
if self.inputEma2Len > l:
ema2Len = l
else:
ema2Len = self.inputEma2Len
# 3、获取前InputN周期(不包含当前周期)的自适应均线
listClose=[x.close for x in self.lineBar[-ema2Len - 1:-1]]
barEma2 = ta.EMA(numpy.array(listClose, dtype=float), ema2Len)[-1]
barEma2 = round(float(barEma2), 3)
if len(self.lineEma2) > self.inputEma1Len*8:
del self.lineEma2[0]
self.lineEma2.append(barEma2)
def __recountDmi(self):
"""计算K线的DMI数据和条件"""
if self.inputDmiLen <= 0: # 不计算
return
# 1、lineMx满足长度才执行计算
if len(self.lineBar) < self.inputDmiLen+1:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算DMI需要{1}'.format(len(self.lineBar), self.inputDmiLen+1))
return
# 2、根据当前HighLow(不包含当前周期重新计算TR1PDMMDM和ATR
barTr1 = EMPTY_FLOAT # 获取InputP周期内的价差最大值之和
barPdm = EMPTY_FLOAT # InputP周期内的做多价差之和
barMdm = EMPTY_FLOAT # InputP周期内的做空价差之和
for i in range(len(self.lineBar)-2, len(self.lineBar)-2-self.inputDmiLen, -1): # 周期 inputDmiLen
# 3.1、计算TR1
# 当前周期最高与最低的价差
high_low_spread = self.lineBar[i].high - self.lineBar[i].low
# 当前周期最高与昨收价的价差
high_preclose_spread = abs(self.lineBar[i].high - self.lineBar[i - 1].close)
# 当前周期最低与昨收价的价差
low_preclose_spread = abs(self.lineBar[i].low - self.lineBar[i - 1].close)
# 最大价差
max_spread = max(high_low_spread, high_preclose_spread, low_preclose_spread)
barTr1 = barTr1 + float(max_spread)
# 今高与昨高的价差
high_prehigh_spread = self.lineBar[i].high - self.lineBar[i - 1].high
# 昨低与今低的价差
low_prelow_spread = self.lineBar[i - 1].low - self.lineBar[i].low
# 3.2、计算周期内的做多价差之和
if high_prehigh_spread > 0 and high_prehigh_spread > low_prelow_spread:
barPdm = barPdm + high_prehigh_spread
# 3.3、计算周期内的做空价差之和
if low_prelow_spread > 0 and low_prelow_spread > high_prehigh_spread:
barMdm = barMdm + low_prelow_spread
# 6、计算上升动向指标即做多的比率
if barTr1 == 0:
self.barPdi = 0
else:
self.barPdi = barPdm * 100 / barTr1
if len(self.linePdi) > self.inputDmiLen+1:
del self.linePdi[0]
self.linePdi.append(self.barPdi)
# 7、计算下降动向指标即做空的比率
if barTr1 == 0:
self.barMdi = 0
else:
self.barMdi = barMdm * 100 / barTr1
# 8、计算平均趋向指标 AdxAdxr
if self.barMdi + self.barPdi == 0:
dx = 0
else:
dx = 100 * abs(self.barMdi - self.barPdi) / (self.barMdi + self.barPdi)
if len(self.lineMdi) > self.inputDmiLen+1:
del self.lineMdi[0]
self.lineMdi.append(self.barMdi)
if len(self.lineDx) > self.inputDmiLen+1:
del self.lineDx[0]
self.lineDx.append(dx)
# 平均趋向指标MA计算
if len(self.lineDx) < self.inputDmiLen+1:
self.barAdx = dx
else:
self.barAdx = ta.EMA(numpy.array(self.lineDx, dtype=float), self.inputDmiLen)[-1]
# 保存Adx值
if len(self.lineAdx) > self.inputDmiLen+1:
del self.lineAdx[0]
self.lineAdx.append(self.barAdx)
# 趋向平均值为当日ADX值与1周期前的ADX值的均值
if len(self.lineAdx) == 1:
self.barAdxr = self.lineAdx[-1]
else:
self.barAdxr = (self.lineAdx[-1] + self.lineAdx[-2]) / 2
# 保存Adxr值
if len(self.lineAdxr) > self.inputDmiLen+1:
del self.lineAdxr[0]
self.lineAdxr.append(self.barAdxr)
# 7、计算AADX值持续高于前一周期时市场行情将维持原趋势
if len(self.lineAdx) < 2:
self.barAdxTrend = False
elif self.lineAdx[-1] > self.lineAdx[-2]:
self.barAdxTrend = True
else:
self.barAdxTrend = False
# ADXR值持续高于前一周期时,波动率比上一周期高
if len(self.lineAdxr) < 2:
self.barAdxrTrend = False
elif self.lineAdxr[-1] > self.lineAdxr[-2]:
self.barAdxrTrend = True
else:
self.barAdxrTrend = False
# 多过滤器条件,做多趋势ADX高于前一天上升动向> inputDmiMax
if self.barPdi > self.barMdi and self.barAdxTrend and self.barAdxrTrend and self.barPdi >= self.inputDmiMax:
self.buyFilterCond = True
self.writeCtaLog(u'{0}[DEBUG]Buy Signal On Bar,Pdi:{1}>Mdi:{2},adx[-1]:{3}>Adx[-2]:{4}'
.format(self.curTick.datetime, self.barPdi, self.barMdi, self.lineAdx[-1], self.lineAdx[-2]))
else:
self.buyFilterCond = False
# 空过滤器条件 做空趋势ADXR高于前一天下降动向> inputMM
if self.barPdi < self.barMdi and self.barAdxTrend and self.barAdxrTrend and self.barMdi >= self.inputDmiMax:
self.sellFilterCond = True
self.writeCtaLog(u'{0}[DEBUG]Short Signal On Bar,Pdi:{1}<Mdi:{2},adx[-1]:{3}>Adx[-2]:{4}'
.format(self.curTick.datetime, self.barPdi, self.barMdi, self.lineAdx[-1], self.lineAdx[-2]))
else:
self.sellFilterCond = False
def __recountAtr(self):
"""计算Mx K线的各类数据和条件"""
# 1、lineMx满足长度才执行计算
maxAtrLen = max(self.inputAtr1Len, self.inputAtr2Len, self.inputAtr3Len)
if maxAtrLen <= 0: # 不计算
return
if len(self.lineBar) < maxAtrLen+1:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算ATR需要{1}'.
format(len(self.lineBar), maxAtrLen+1))
return
# 首次计算
if (self.inputAtr1Len > 0 and len(self.lineAtr1) < 1) \
or (self.inputAtr2Len > 0 and len(self.lineAtr2) < 1) \
or (self.inputAtr3Len > 0 and len(self.lineAtr3) < 1):
# 根据当前HighLow(不包含当前周期重新计算TR1和ATR
barTr1 = EMPTY_FLOAT # 获取inputAtr1Len周期内的价差最大值之和
barTr2 = EMPTY_FLOAT # 获取inputAtr2Len周期内的价差最大值之和
barTr3 = EMPTY_FLOAT # 获取inputAtr3Len周期内的价差最大值之和
j = 0
for i in range(len(self.lineBar)-2, len(self.lineBar)-2-maxAtrLen, -1): # 周期 inputP
# 3.1、计算TR
# 当前周期最高与最低的价差
high_low_spread = self.lineBar[i].high - self.lineBar[i].low
# 当前周期最高与昨收价的价差
high_preclose_spread = abs(self.lineBar[i].high - self.lineBar[i - 1].close)
# 当前周期最低与昨收价的价差
low_preclose_spread = abs(self.lineBar[i].low - self.lineBar[i - 1].close)
# 最大价差
max_spread = max(high_low_spread, high_preclose_spread, low_preclose_spread)
if j < self.inputAtr1Len:
barTr1 = barTr1 + float(max_spread)
if j < self.inputAtr2Len:
barTr2 = barTr2 + float(max_spread)
if j < self.inputAtr3Len:
barTr3 = barTr3 + float(max_spread)
j = j + 1
else: # 只计算一个
# 当前周期最高与最低的价差
high_low_spread = self.lineBar[-2].high - self.lineBar[-2].low
# 当前周期最高与昨收价的价差
high_preclose_spread = abs(self.lineBar[-2].high - self.lineBar[-3].close)
# 当前周期最低与昨收价的价差
low_preclose_spread = abs(self.lineBar[-2].low - self.lineBar[-3].close)
# 最大价差
barTr1 = max(high_low_spread, high_preclose_spread, low_preclose_spread)
barTr2 = barTr1
barTr3 = barTr1
# 计算 ATR
if self.inputAtr1Len > 0:
if len(self.lineAtr1) < 1:
self.barAtr1 = round(barTr1 / self.inputAtr1Len, 3)
else:
self.barAtr1 = round((self.lineAtr1[-1]*(self.inputAtr1Len -1) + barTr1) / self.inputAtr1Len, 3)
if len(self.lineAtr1) > self. inputAtr1Len+1 :
del self.lineAtr1[0]
self.lineAtr1.append(self.barAtr1)
if self.inputAtr2Len > 0:
if len(self.lineAtr2) < 1:
self.barAtr2 = round(barTr2 / self.inputAtr2Len, 3)
else:
self.barAtr2 = round((self.lineAtr2[-1]*(self.inputAtr2Len -1) + barTr2) / self.inputAtr2Len, 3)
if len(self.lineAtr2) > self. inputAtr2Len+1:
del self.lineAtr2[0]
self.lineAtr2.append(self.barAtr2)
if self.inputAtr3Len > 0:
if len(self.lineAtr3) < 1:
self.barAtr3 = round(barTr3 / self.inputAtr3Len, 3)
else:
self.barAtr3 = round((self.lineAtr3[-1]*(self.inputAtr3Len -1) + barTr3) / self.inputAtr3Len, 3)
if len(self.lineAtr3) > self. inputAtr3Len+1:
del self.lineAtr3[0]
self.lineAtr3.append(self.barAtr3)
#----------------------------------------------------------------------
def __recountAvgVol(self):
"""计算平均成交量"""
# 1、lineBar满足长度才执行计算
if self.inputVolLen <= 0: # 不计算
return
if len(self.lineBar) < self.inputVolLen+1:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算Avg Vol需要{1}'.
format(len(self.lineBar), self.inputVolLen+1))
return
listVol = [x.volume for x in self.lineBar[-self.inputVolLen-1: -1]]
sumVol = ta.SUM(numpy.array(listVol, dtype=float), timeperiod=self.inputVolLen)[-1]
avgVol = round(sumVol/self.inputVolLen, 0)
self.lineAvgVol.append(avgVol)
# ----------------------------------------------------------------------
def __recountRsi(self):
"""计算K线的RSI"""
if self.inputRsiLen <= 0: return
# 1、lineBar满足长度才执行计算
if len(self.lineBar) < self.inputRsiLen+2:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算RSI需要{1}'.
format(len(self.lineBar), self.inputRsiLen+2))
return
# 3、inputRsiLen(包含当前周期)的相对强弱
listClose=[x.close for x in self.lineBar[-self.inputRsiLen - 2:]]
barRsi = ta.RSI(numpy.array(listClose, dtype=float), self.inputRsiLen)[-1]
barRsi = round(float(barRsi), 3)
l = len(self.lineRsi)
if l > self.inputRsiLen*8:
del self.lineRsi[0]
self.lineRsi.append(barRsi)
if l > 3:
# 峰
if self.lineRsi[-1] < self.lineRsi[-2] and self.lineRsi[-3] < self.lineRsi[-2]:
t={}
t["Type"] = u'T'
t["RSI"] = self.lineRsi[-2]
t["Close"] = self.lineBar[-2].close
if len(self.lineRsiTop) > self.inputRsiLen:
del self.lineRsiTop[0]
self.lineRsiTop.append( t )
self.lastRsiTopButtom = self.lineRsiTop[-1]
# 谷
elif self.lineRsi[-1] > self.lineRsi[-2] and self.lineRsi[-3] > self.lineRsi[-2]:
b={}
b["Type"] = u'B'
b["RSI"] = self.lineRsi[-2]
b["Close"] = self.lineBar[-2].close
if len(self.lineRsiButtom) > self.inputRsiLen:
del self.lineRsiButtom[0]
self.lineRsiButtom.append(b)
self.lastRsiTopButtom = self.lineRsiButtom[-1]
def __recountCmi(self):
"""市场波动指数Choppy Market IndexCMI是一个用来判断市场走势类型的技术分析指标。
它通过计算当前收盘价与一定周期前的收盘价的差值与这段时间内价格波动的范围的比值来判断目前的股价走势是趋势还是盘整
市场波动指数CMI的计算公式
CMI=(Abs(Close-ref(close,(n-1)))*100/(HHV(high,n)-LLV(low,n))
其中Abs是绝对值
n是周期数例如30
市场波动指数CMI的使用方法
这个指标的重要用途是来区分目前的股价走势类型盘整趋势当CMI指标小于市场走势是盘整当CMI指标大于市场在趋势期
CMI指标还可以用于预测股价走势类型的转变因为物极必反当CMI长期处于附近此时股价走势很可能从盘整转为趋势当CMI长期处于附近此时股价趋势很可能变弱形成盘整
"""
if self.inputCmiLen <= EMPTY_INT: return
# 1、lineBar满足长度才执行计算
if len(self.lineBar) < self.inputCmiLen:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算CMI需要{1}'.
format(len(self.lineBar), self.inputCmiLen))
return
listClose =[x.close for x in self.lineBar[-self.inputCmiLen:]]
hhv = max(listClose)
llv = min(listClose)
if hhv==llv:
cmi = 100
else:
cmi = abs(self.lineBar[-1].close-self.lineBar[-2].close)*100/(hhv-llv)
cmi = round(cmi, 2)
if len(self.lineCmi) > self.inputCmiLen:
del self.lineCmi[0]
self.lineCmi.append(cmi)
def __recountBoll(self):
"""布林特线"""
if self.inputBollLen < EMPTY_INT: return
l = len(self.lineBar)
if l < min(7, self.inputBollLen)+1:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算Boll需要{1}'.
format(len(self.lineBar), min(7, self.inputBollLen)+1))
return
if l < self.inputBollLen+2:
bollLen = l-1
else:
bollLen = self.inputBollLen
# 不包含当前最新的Bar
listClose=[x.close for x in self.lineBar[-bollLen - 1:-1]]
#
upper, middle, lower = ta.BBANDS(numpy.array(listClose, dtype=float),
timeperiod=bollLen, nbdevup=self.inputBollStdRate,
nbdevdn=self.inputBollStdRate, matype=0)
self.lineUpperBand.append(upper[-1])
self.lineMiddleBand.append(middle[-1])
self.lineLowerBand.append(lower[-1])
# ----------------------------------------------------------------------
def writeCtaLog(self, content):
"""记录CTA日志"""
self.strategy.writeCtaLog(u'['+self.name+u']'+content)
def debugCtaLog(self,content):
"""记录CTA日志"""
if DEBUGCTALOG:
self.strategy.writeCtaLog(u'['+self.name+u'-DEBUG]'+content)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 382 KiB

View File

@ -1,204 +0,0 @@
# encoding: UTF-8
'''
这个文件加入在CTA回测引擎的基础上加入了辅助品种信息, 保持接口的一致, 可以在原CTA引擎上执行的代码,
也可以在这个引擎上执行
This file add multi Time Frame functionalities to CTA backtesting engine, the APIs are the
same as CTA engine. Real trading code can be directly used for backtesting.
'''
from __future__ import division
from vnpy.trader.vtObject import VtTickData, VtBarData
from ctaBacktesting import *
class BacktestEngineMultiTF(BacktestingEngine):
def __init__(self):
"""Constructor"""
super(BacktestEngineMultiTF, self).__init__()
self.info_symbols = [] # List, 输入辅助品种的2值tuple, 左边为数据库名, 右边为collection名
self.InfoCursor = {} # Dict, 放置回测用辅助品种数据库
self.initInfoCursor = {} # Dict, 放置初始化用辅助品种数据库
self.infobar = {} # Dict, 放置辅助品种最新一个K线数据
self.MultiOn = False # Boolean, 判断是否传入了辅助品种
# ----------------------------------------------------------------------
def setDatabase(self, dbName, symbol, **kwargs):
"""set database that provide historical data"""
self.dbName = dbName
# Set executed symbol and information symbols
self.symbol = symbol
if "info_symbol" in kwargs:
self.info_symbols = kwargs["info_symbol"]
# Turn on MultiTF switch
if len(self.info_symbols) > 0:
self.MultiOn = True
# ----------------------------------------------------------------------
def loadInitData(self, collection, **kwargs):
"""Load initializing data"""
# 载入初始化需要用的数据
# Load initialised data
# $gte means "greater and equal to"
# $lt means "less than"
flt = {'datetime': {'$gte': self.dataStartDate,
'$lt': self.strategyStartDate}}
self.initCursor = collection.find(flt)
# 初始化辅助品种数据
# Initializing information data
if "inf" in kwargs:
for name in kwargs["inf"]:
DB = kwargs["inf"][name]
self.initInfoCursor[name] = DB.find(flt)
# 将数据从查询指针中读取出,并生成列表
# Read data from cursor, generate a list
self.initData = []
for d in self.initCursor:
data = self.dataClass()
data.__dict__ = d
self.initData.append(data)
# ----------------------------------------------------------------------
def loadHistoryData(self):
"""载入历史数据"""
self.dbClient = pymongo.MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'])
collection = self.dbClient[self.dbName][self.symbol]
# Load historical data of information symbols, construct a dictionary of Database
# Values of dictionary are mongo.Client.
info_collection = {}
if self.MultiOn is True:
for DBname, symbol in self.info_symbols:
info_collection[DBname + " " + symbol] = self.dbClient[DBname][symbol]
self.output("Start loading historical data")
# 首先根据回测模式,确认要使用的数据类
# Choose data type based on backtest mode
if self.mode == self.BAR_MODE:
self.dataClass = VtBarData
self.func = self.newBar
else:
self.dataClass = VtTickData
self.func = self.newTick
# Load initializing data
self.loadInitData(collection, inf=info_collection)
# 载入回测数据
# Load backtest data (exclude initializing data)
if not self.dataEndDate:
# If "End Date" is not set, retreat data up to today
flt = {'datetime': {'$gte': self.strategyStartDate}}
else:
flt = {'datetime': {'$gte': self.strategyStartDate,
'$lte': self.dataEndDate}}
self.dbCursor = collection.find(flt)
if self.MultiOn is True:
for db in info_collection:
self.InfoCursor[db] = info_collection[db].find(flt)
self.output(
"Data loading completed, data volumn: %s" % (self.initCursor.count() + self.dbCursor.count() + \
sum([i.count() for i in self.InfoCursor.values()])))
else:
self.output("Data loading completed, data volumn: %s" % (self.initCursor.count() + self.dbCursor.count()))
# ----------------------------------------------------------------------
def runBacktesting(self):
"""运行回测"""
"""Run backtesting"""
# 载入历史数据
# Load historical data
self.loadHistoryData()
self.output("Start backtesing!")
self.strategy.inited = True
self.strategy.onInit()
self.output("Strategy initialsing complete")
self.strategy.trading = True
self.strategy.onStart()
self.output("Strategy started")
self.output("Processing historical data...")
dataClass = self.dataClass
func = self.func
for d in self.dbCursor:
data = dataClass()
data.__dict__ = d
func(data)
self.output("No more historical data")
# ----------------------------------------------------------------------
def checkInformationData(self):
"""Update information symbols' data"""
# If infobar is empty, which means it is the first time calling this method
if self.infobar == {}:
for info_symbol in self.InfoCursor:
try:
self.infobar[info_symbol] = next(self.InfoCursor[info_symbol])
except StopIteration:
print "Data of information symbols is empty! Input must be a list, not str."
raise
temp = {}
for info_symbol in self.infobar:
data = self.infobar[info_symbol]
# Update data only when Time Stamp is matched
if (data is not None) and (data['datetime'] <= self.dt):
try:
temp[info_symbol] = VtBarData()
temp[info_symbol].__dict__ = data
self.infobar[info_symbol] = next(self.InfoCursor[info_symbol])
except StopIteration:
self.infobar[info_symbol] = None
self.output("No more data in information database.")
else:
temp[info_symbol] = None
return temp
# ----------------------------------------------------------------------
def newBar(self, bar):
"""新的K线"""
"""new ohlc Bar"""
self.bar = bar
self.dt = bar.datetime
self.updatePosition() # Update total position value based on new Bar
self.crossLimitOrder() # 先撮合限价单
self.crossStopOrder() # 再撮合停止单
if self.MultiOn is True:
self.strategy.onBar(bar, infobar=self.checkInformationData()) # 推送K线到策略中
else:
self.strategy.onBar(bar) # 推送K线到策略中
# ----------------------------------------------------------------------
def newTick(self, tick):
"""新的Tick"""
"""new Tick"""
self.tick = tick
self.dt = tick.datetime
self.crossLimitOrder()
self.crossStopOrder()
self.strategy.onTick(tick)
########################################################################

View File

@ -1,411 +0,0 @@
# encoding: UTF-8
"""
This file tweaks ctaTemplate Module to suit multi-TimeFrame strategies.
"""
from strategyAtrRsi import *
from ctaBase import *
from ctaTemplate import CtaTemplate
########################################################################
class TC11(CtaTemplate):
# Strategy name and author
className = "TC11"
author = "Zenacon"
# Set MongoDB DataBase
barDbName = "TestData"
# Strategy parameters
pGeneric_prd = 21
pGeneric_on = True
pATRprd_F = 13
pATRprd_M = 21
pATRprd_S = 63
pBOSSplus_prd = 98
pBOSSminus_prd = 22
if pGeneric_on == 0:
pRSIprd = 20
pBBprd = 10
pBB_ATRprd = 15
pATRprd = 21
pDMIprd = 21
else:
pRSIprd = \
pBBprd = \
pBB_ATRprd = \
pATRprd = \
pDMIprd = pGeneric_prd
pBOSS_Mult = 1.75
# Strategy variables
vOBO_initialpoint = EMPTY_FLOAT
vOBO_Stretch = EMPTY_FLOAT
vOBO_level_L = EMPTY_FLOAT
vOBO_level_S = EMPTY_FLOAT
# parameters' list, record names of parameters
paramList = ['name',
'className',
'author',
'vtSymbol']
# variables' list, record names of variables
varList = ['inited',
'trading',
'pos']
def __init__(self, ctaEngine, setting):
"""Constructor"""
super(TC11, self).__init__(ctaEngine, setting)
# ----------------------------------------------------------------------
def onBar(self, bar, **kwargs):
"""收到Bar推送必须由用户继承实现"""
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
for orderID in self.orderList:
self.cancelOrder(orderID)
self.orderList = []
# Record new information bar
if "infobar" in kwargs:
for i in kwargs["infobar"]:
if kwargs["infobar"][i] is None:
pass
else:
# print kwargs["infobar"][i]["close"]
self.closeArray[0:self.bufferSize - 1] = self.closeArray[1:self.bufferSize]
self.highArray[0:self.bufferSize - 1] = self.highArray[1:self.bufferSize]
self.lowArray[0:self.bufferSize - 1] = self.lowArray[1:self.bufferSize]
self.closeArray[-1] = bar.close
self.highArray[-1] = bar.high
self.lowArray[-1] = bar.low
"""
Record new bar
"""
self.closeArray[0:self.bufferSize - 1] = self.closeArray[1:self.bufferSize]
self.highArray[0:self.bufferSize - 1] = self.highArray[1:self.bufferSize]
self.lowArray[0:self.bufferSize - 1] = self.lowArray[1:self.bufferSize]
self.closeArray[-1] = bar.close
self.highArray[-1] = bar.high
self.lowArray[-1] = bar.low
self.bufferCount += 1
if self.bufferCount < self.bufferSize:
return
"""
Calculate Indicators
"""
vOBO_initialpoint = self.dataHTF_filled['Open']
vOBO_Stretch = self.vATR['htf'].m * self.pBOSS_Mult
self.atrValue = talib.ATR(self.highArray,
self.lowArray,
self.closeArray,
self.atrLength)[-1]
self.atrArray[0:self.bufferSize - 1] = self.atrArray[1:self.bufferSize]
self.atrArray[-1] = self.atrValue
self.atrCount += 1
if self.atrCount < self.bufferSize:
return
self.atrMa = talib.MA(self.atrArray,
self.atrMaLength)[-1]
self.rsiValue = talib.RSI(self.closeArray,
self.rsiLength)[-1]
# 判断是否要进行交易
# 当前无仓位
if self.pos == 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = bar.low
# ATR数值上穿其移动平均线说明行情短期内波动加大
# 即处于趋势的概率较大适合CTA开仓
if self.atrValue > self.atrMa:
# 使用RSI指标的趋势行情时会在超买超卖区钝化特征作为开仓信号
if self.rsiValue > self.rsiBuy:
# 这里为了保证成交选择超价5个整指数点下单
self.buy(bar.close + 5, 1)
elif self.rsiValue < self.rsiSell:
self.short(bar.close - 5, 1)
# 持有多头仓位
elif self.pos > 0:
# 计算多头持有期内的最高价,以及重置最低价
self.intraTradeHigh = max(self.intraTradeHigh, bar.high)
self.intraTradeLow = bar.low
# 计算多头移动止损
longStop = self.intraTradeHigh * (1 - self.trailingPercent / 100)
# 发出本地止损委托,并且把委托号记录下来,用于后续撤单
orderID = self.sell(longStop, 1, stop=True)
self.orderList.append(orderID)
# 持有空头仓位
elif self.pos < 0:
self.intraTradeLow = min(self.intraTradeLow, bar.low)
self.intraTradeHigh = bar.high
shortStop = self.intraTradeLow * (1 + self.trailingPercent / 100)
orderID = self.cover(shortStop, 1, stop=True)
self.orderList.append(orderID)
# 发出状态更新事件
self.putEvent()
########################################################################
class Prototype(AtrRsiStrategy):
"""
"infoArray" 字典是用来储存辅助品种信息的, 可以是同品种的不同分钟k线, 也可以是不同品种的价格
调用的方法:
self.infoArray["数据库名 + 空格 + collection名"]["close"]
self.infoArray["数据库名 + 空格 + collection名"]["high"]
self.infoArray["数据库名 + 空格 + collection名"]["low"]
"""
infoArray = {}
initInfobar = {}
def __int__(self):
super(Prototype, self).__int__()
# ----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略初始化' % self.name)
# 初始化RSI入场阈值
self.rsiBuy = 50 + self.rsiEntry
self.rsiSell = 50 - self.rsiEntry
# 载入历史数据,并采用回放计算的方式初始化策略数值
initData = self.loadBar(self.initDays)
for bar in initData:
# 推送新数据, 同时检查是否有information bar需要推送
# Update new bar, check whether the Time Stamp matching any information bar
ibar = self.checkInfoBar(bar)
self.onBar(bar, infobar=ibar)
self.putEvent()
# ----------------------------------------------------------------------
def checkInfoBar(self, bar):
"""在初始化时, 检查辅助品种数据的推送(初始化结束后, 回测时不会调用)"""
initInfoCursorDict = self.ctaEngine.initInfoCursor
# 如果"initInfobar"字典为空, 初始化字典, 插入第一个数据
# If dictionary "initInfobar" is empty, insert first data record
if self.initInfobar == {}:
for info_symbol in initInfoCursorDict:
try:
self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol])
except StopIteration:
print "Data of information symbols is empty! Input is a list, not str."
raise
# 若有某一品种的 TimeStamp 和执行报价的 TimeStamp 匹配, 则将"initInfobar"中的数据推送,
# 然后更新该品种的数据
# If any symbol's TimeStamp is matched with execution symbol's TimeStamp, return data
# in "initInfobar", and update new data.
temp = {}
for info_symbol in self.initInfobar:
data = self.initInfobar[info_symbol]
# Update data only when Time Stamp is matched
if data['datetime'] <= bar.datetime:
try:
temp[info_symbol] = CtaBarData()
temp[info_symbol].__dict__ = data
self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol])
except StopIteration:
self.ctaEngine.output("No more data for initializing %s." % (info_symbol,))
else:
temp[info_symbol] = None
return temp
# ----------------------------------------------------------------------
def updateInfoArray(self, infobar):
"""收到Infomation Data, 更新辅助品种缓存字典"""
for name in infobar:
data = infobar[name]
# Construct empty array
if len(self.infoArray) < len(infobar) :
self.infoArray[name] = {
"close": np.zeros(self.bufferSize),
"high": np.zeros(self.bufferSize),
"low": np.zeros(self.bufferSize)
}
if data is None:
pass
else:
self.infoArray[name]["close"][0:self.bufferSize - 1] = \
self.infoArray[name]["close"][1:self.bufferSize]
self.infoArray[name]["high"][0:self.bufferSize - 1] = \
self.infoArray[name]["high"][1:self.bufferSize]
self.infoArray[name]["low"][0:self.bufferSize - 1] = \
self.infoArray[name]["low"][1:self.bufferSize]
self.infoArray[name]["close"][-1] = data.close
self.infoArray[name]["high"][-1] = data.high
self.infoArray[name]["low"][-1] = data.low
# ----------------------------------------------------------------------
def onBar(self, bar, **kwargs):
"""收到Bar推送必须由用户继承实现"""
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
for orderID in self.orderList:
self.cancelOrder(orderID)
self.orderList = []
# Update infomation data
# "infobar"是由不同时间或不同品种的品种数据组成的字典, 如果和执行品种的 TimeStamp 不匹配,
# 则传入的是"None", 当time stamp和执行品种匹配时, 传入的是"Bar"
self.updateInfoArray(kwargs["infobar"])
# 保存K线数据
self.closeArray[0:self.bufferSize - 1] = self.closeArray[1:self.bufferSize]
self.highArray[0:self.bufferSize - 1] = self.highArray[1:self.bufferSize]
self.lowArray[0:self.bufferSize - 1] = self.lowArray[1:self.bufferSize]
self.closeArray[-1] = bar.close
self.highArray[-1] = bar.high
self.lowArray[-1] = bar.low
# 若读取的缓存数据不足, 不考虑交易
self.bufferCount += 1
if self.bufferCount < self.bufferSize:
return
# 计算指标数值
# 计算不同时间下的ATR数值
# Only trading when information bar changes
# 只有在30min或者1d K线更新后才可以交易
TradeOn = False
if any([i is not None for i in kwargs["infobar"].values()]):
TradeOn = True
self.scaledAtrValue1M = talib.ATR(self.highArray,
self.lowArray,
self.closeArray,
self.atrLength)[-1] * (25) ** (0.5)
self.atrValue30M = talib.abstract.ATR(self.infoArray["TestData @GC_30M"])[-1]
self.rsiValue = talib.abstract.RSI(self.infoArray["TestData @GC_30M"], self.rsiLength)[-1]
self.atrCount += 1
if self.atrCount < self.bufferSize:
return
# 判断是否要进行交易
# 当前无仓位
if (self.pos == 0 and TradeOn == True):
self.intraTradeHigh = bar.high
self.intraTradeLow = bar.low
# 1Min调整后ATR大于30MinATR
# 即处于趋势的概率较大适合CTA开仓
if self.atrValue30M < self.scaledAtrValue1M:
# 使用RSI指标的趋势行情时会在超买超卖区钝化特征作为开仓信号
if self.rsiValue > self.rsiBuy:
# 这里为了保证成交选择超价5个整指数点下单
self.buy(bar.close+5, 1)
elif self.rsiValue < self.rsiSell:
self.short(bar.close-5, 1)
# 下单后, 在下一个30Min K线之前不交易
TradeOn = False
# 持有多头仓位
elif self.pos > 0:
# 计算多头持有期内的最高价,以及重置最低价
self.intraTradeHigh = max(self.intraTradeHigh, bar.high)
self.intraTradeLow = bar.low
# 计算多头移动止损
longStop = self.intraTradeHigh * (1 - self.trailingPercent / 100)
# 发出本地止损委托,并且把委托号记录下来,用于后续撤单
orderID = self.sell(longStop, 1, stop=True)
self.orderList.append(orderID)
# 持有空头仓位
elif self.pos < 0:
self.intraTradeLow = min(self.intraTradeLow, bar.low)
self.intraTradeHigh = bar.high
shortStop = self.intraTradeLow * (1 + self.trailingPercent / 100)
orderID = self.cover(shortStop, 1, stop=True)
self.orderList.append(orderID)
# 发出状态更新事件
self.putEvent()
if __name__ == '__main__':
# 提供直接双击回测的功能
# 导入PyQt4的包是为了保证matplotlib使用PyQt4而不是PySide防止初始化出错
from ctaBacktestMultiTF import *
from PyQt4 import QtCore, QtGui
import time
'''
创建回测引擎
设置引擎的回测模式为K线
设置回测用的数据起始日期
载入历史数据到引擎中
在引擎中创建策略对象
Create backtesting engine
Set backtest mode as "Bar"
Set "Start Date" of data range
Load historical data to engine
Create strategy instance in engine
'''
engine = BacktestEngineMultiTF()
engine.setBacktestingMode(engine.BAR_MODE)
engine.setStartDate('20100101')
engine.setDatabase("TestData", "@GC_1M", info_symbol=[("TestData","@GC_30M")])
# Set parameters for strategy
d = {'atrLength': 11}
engine.initStrategy(Prototype, d)
# 设置产品相关参数
engine.setSlippage(0.2) # 股指1跳
engine.setCommission(0.3 / 10000) # 万0.3
engine.setSize(300) # 股指合约大小
# 开始跑回测
start = time.time()
engine.runBacktesting()
# 显示回测结果
engine.showBacktestingResult()
print 'Time consumed%s' % (time.time() - start)

View File

@ -1,317 +0,0 @@
# encoding: UTF-8
"""
This file tweaks ctaTemplate Module to suit multi-TimeFrame strategies.
"""
from ctaBase import *
from ctaTemplate import CtaTemplate
import numpy as np
########################################################################
class BreakOut(CtaTemplate):
"""
"infoArray" 字典是用来储存辅助品种信息的, 可以是同品种的不同分钟k线, 也可以是不同品种的价格
调用的方法:
价格序列:
self.infoArray["数据库名 + 空格 + collection名"]["close"]
self.infoArray["数据库名 + 空格 + collection名"]["high"]
self.infoArray["数据库名 + 空格 + collection名"]["low"]
单个价格:
self.infoBar["数据库名 + 空格 + collection名"]
返回的值为一个ctaBarData None
"""
#----------------------------------------------------------------------
def __init__(self, ctaEngine, setting):
"""日内突破交易策略, 出场方式非常多, 本文件使用指标出场"""
className = 'BreakOut'
author = 'Joe'
super(BreakOut, self).__init__(ctaEngine, setting)
# 设置辅助品种数据字典
self.infoArray = {}
self.initInfobar = {}
self.infoBar = {}
# 缓存数据量
self.bufferSize = 100
self.bufferCount = 0
self.initDays = 10
# 设置参数
self.pOBO_Mult = 0.5 # 计算突破点位
# self.pProtMult = 2 # 止损的ATR倍数
# self.pProfitMult = 2 # 止盈相对于止损的倍数
# self.SlTp_On = False # 止损止盈功能
# self.EODTime = 15 # 设置日内平仓时间
self.vOBO_stretch = EMPTY_FLOAT
self.vOBO_initialpoint = EMPTY_FLOAT
self.vOBO_level_L = EMPTY_FLOAT
self.vOBO_level_S = EMPTY_FLOAT
self.orderList = []
# 参数列表,保存了参数的名称
paramList = ['name',
'className',
'author',
'pOBO_Mult',
'pProtMult',
'pProfitMult',
'SlTp_On',
'EODTime']
# 变量列表,保存了变量的名称
varList = ['vOBO_stretch',
'vOBO_initialpoint',
'vOBO_level_L',
'vOBO_level_S']
# ----------------------------------------------------------------------
def onInit(self):
"""初始化策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略初始化' % self.name)
# 载入历史数据,并采用回放计算的方式初始化策略数值
initData = self.loadBar(self.initDays)
for bar in initData:
# 推送新数据, 同时检查是否有information bar需要推送
# Update new bar, check whether the Time Stamp matching any information bar
ibar = self.checkInfoBar(bar)
self.onBar(bar, infobar=ibar)
self.putEvent()
#----------------------------------------------------------------------
def onStart(self):
"""启动策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略启动' %self.name)
self.putEvent()
#----------------------------------------------------------------------
def onStop(self):
"""停止策略(必须由用户继承实现)"""
self.writeCtaLog(u'%s策略停止' %self.name)
self.putEvent()
# ----------------------------------------------------------------------
def checkInfoBar(self, bar):
"""在初始化时, 检查辅助品种数据的推送(初始化结束后, 回测时不会调用)"""
initInfoCursorDict = self.ctaEngine.initInfoCursor
# 如果"initInfobar"字典为空, 初始化字典, 插入第一个数据
# If dictionary "initInfobar" is empty, insert first data record
if self.initInfobar == {}:
for info_symbol in initInfoCursorDict:
try:
self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol])
except StopIteration:
print "Data of information symbols is empty! Input is a list, not str."
raise
# 若有某一品种的 TimeStamp 和执行报价的 TimeStamp 匹配, 则将"initInfobar"中的数据推送,
# 然后更新该品种的数据
# If any symbol's TimeStamp is matched with execution symbol's TimeStamp, return data
# in "initInfobar", and update new data.
temp = {}
for info_symbol in self.initInfobar:
data = self.initInfobar[info_symbol]
# Update data only when Time Stamp is matched
if (data is not None) and (data['datetime'] <= bar.datetime):
try:
temp[info_symbol] = CtaBarData()
temp[info_symbol].__dict__ = data
self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol])
except StopIteration:
self.initInfobar[info_symbol] = None
self.ctaEngine.output("No more data for initializing %s." % (info_symbol,))
else:
temp[info_symbol] = None
return temp
# ----------------------------------------------------------------------
def updateInfoArray(self, infobar):
"""收到Infomation Data, 更新辅助品种缓存字典"""
for name in infobar:
data = infobar[name]
# Construct empty array
if len(self.infoArray) < len(infobar) :
self.infoArray[name] = {
"close": np.zeros(self.bufferSize),
"high": np.zeros(self.bufferSize),
"low": np.zeros(self.bufferSize),
"open": np.zeros(self.bufferSize)
}
if data is None:
pass
else:
self.infoArray[name]["close"][0:self.bufferSize - 1] = \
self.infoArray[name]["close"][1:self.bufferSize]
self.infoArray[name]["high"][0:self.bufferSize - 1] = \
self.infoArray[name]["high"][1:self.bufferSize]
self.infoArray[name]["low"][0:self.bufferSize - 1] = \
self.infoArray[name]["low"][1:self.bufferSize]
self.infoArray[name]["open"][0:self.bufferSize - 1] = \
self.infoArray[name]["open"][1:self.bufferSize]
self.infoArray[name]["close"][-1] = data.close
self.infoArray[name]["high"][-1] = data.high
self.infoArray[name]["low"][-1] = data.low
self.infoArray[name]["open"][-1] = data.open
# ----------------------------------------------------------------------
def onBar(self, bar, **kwargs):
"""收到Bar推送必须由用户继承实现"""
# Update infomation data
# "infobar"是由不同时间或不同品种的品种数据组成的字典, 如果和执行品种的 TimeStamp 不匹配,
# 则传入的是"None", 当time stamp和执行品种匹配时, 传入的是"Bar"
if "infobar" in kwargs:
self.infoBar = kwargs["infobar"]
self.updateInfoArray(kwargs["infobar"])
# 若读取的缓存数据不足, 不考虑交易
self.bufferCount += 1
if self.bufferCount < self.bufferSize:
return
# 计算指标数值
a = np.sum(self.infoArray["TestData @GC_1D"]["close"])
if a == 0.0:
return
# Only updating indicators when information bar changes
# 只有在30min或者1d K线更新后才更新指标
TradeOn = False
if any([i is not None for i in self.infoBar]):
TradeOn = True
self.vRange = self.infoArray["TestData @GC_1D"]["high"][-1] -\
self.infoArray["TestData @GC_1D"]["low"][-1]
self.vOBO_stretch = self.vRange * self.pOBO_Mult
self.vOBO_initialpoint = self.infoArray["TestData @GC_1D"]["close"][-1]
self.vOBO_level_L = self.vOBO_initialpoint + self.vOBO_stretch
self.vOBO_level_S = self.vOBO_initialpoint - self.vOBO_stretch
self.atrValue30M = talib.abstract.ATR(self.infoArray["TestData @GC_30M"])[-1]
# 判断是否要进行交易
# 当前无仓位
if (self.pos == 0 and TradeOn == True):
# 撤销之前发出的尚未成交的委托(包括限价单和停止单)
for orderID in self.orderList:
self.cancelOrder(orderID)
self.orderList = []
# 若上一个30分钟K线的最高价大于OBO_level_L
# 且当前的价格大于OBO_level_L, 则买入
if self.infoArray["TestData @GC_30M"]["high"][-1] > self.vOBO_level_L:
if bar.close > self.vOBO_level_L:
self.buy(bar.close + 0.5, 1)
# 下单后, 在下一个30Min K线之前不交易
TradeOn = False
# 若上一个30分钟K线的最高价低于OBO_level_S
# 且当前的价格小于OBO_level_S, 则卖出
elif self.infoArray["TestData @GC_30M"]["low"][-1] < self.vOBO_level_S:
if bar.close < self.vOBO_level_S:
self.short(bar.close - 0.5, 1)
# 下单后, 在下一个30Min K线之前不交易
TradeOn = False
# 持有多头仓位
elif self.pos > 0:
# 当价格低于initialpoint水平, 出场
if bar.close < self.vOBO_initialpoint:
self.sell(bar.close - 0.5 , 1)
# 持有空头仓位
elif self.pos < 0:
# 当价格高于initialpoint水平, 出场
if bar.close > self.vOBO_initialpoint:
self.cover(bar.close + 0.5, 1)
# 发出状态更新事件
self.putEvent()
# ----------------------------------------------------------------------
def onOrder(self, order):
"""收到委托变化推送(必须由用户继承实现)"""
pass
# ----------------------------------------------------------------------
def onTrade(self, trade):
pass
if __name__ == '__main__':
# 提供直接双击回测的功能
# 导入PyQt4的包是为了保证matplotlib使用PyQt4而不是PySide防止初始化出错
from ctaBacktestMultiTF import *
from PyQt4 import QtCore, QtGui
import time
'''
创建回测引擎
设置引擎的回测模式为K线
设置回测用的数据起始日期
载入历史数据到引擎中
在引擎中创建策略对象
Create backtesting engine
Set backtest mode as "Bar"
Set "Start Date" of data range
Load historical data to engine
Create strategy instance in engine
'''
engine = BacktestEngineMultiTF()
engine.setBacktestingMode(engine.BAR_MODE)
engine.setStartDate('20120101')
engine.setEndDate('20150101')
engine.setDatabase("TestData", "@GC_1M", info_symbol=[("TestData","@GC_30M"),
("TestData","@GC_1D")])
# Set parameters for strategy
engine.initStrategy(BreakOut, {})
# 设置产品相关参数
engine.setSlippage(0.2) # 股指1跳
engine.setCommission(0.3 / 10000) # 万0.3
engine.setSize(1) # 股指合约大小
# 开始跑回测
start = time.time()
engine.runBacktesting()
# 显示回测结果
engine.showBacktestingResult()
print 'Time consumed%s' % (time.time() - start)