diff --git a/vn.trader/ctaAlgo/tools/README.md b/vn.trader/ctaAlgo/tools/README.md index 7b8350f0..67c59f15 100644 --- a/vn.trader/ctaAlgo/tools/README.md +++ b/vn.trader/ctaAlgo/tools/README.md @@ -5,3 +5,6 @@ * 贡献者:李来佳 * WeChat/QQ: 28888502 +### multiTimeFrame +* 简介:基于CTA模块扩展了回测和交易功能,允许策略中引用辅助品种信息(其他时间框架、其他合约),同时提供了一个突破策略的例子 +* 贡献者:周正舟 diff --git a/vn.trader/ctaAlgo/tools/multiTimeFrame/Screen Shot 2016-11-08 at 9.08.47 PM.png b/vn.trader/ctaAlgo/tools/multiTimeFrame/Screen Shot 2016-11-08 at 9.08.47 PM.png new file mode 100644 index 00000000..ca857f31 Binary files /dev/null and b/vn.trader/ctaAlgo/tools/multiTimeFrame/Screen Shot 2016-11-08 at 9.08.47 PM.png differ diff --git a/vn.trader/ctaAlgo/tools/multiTimeFrame/ctaBacktestMultiTF.py b/vn.trader/ctaAlgo/tools/multiTimeFrame/ctaBacktestMultiTF.py new file mode 100644 index 00000000..41a23bce --- /dev/null +++ b/vn.trader/ctaAlgo/tools/multiTimeFrame/ctaBacktestMultiTF.py @@ -0,0 +1,206 @@ +# encoding: UTF-8 + +''' +这个文件加入在CTA回测引擎的基础上加入了辅助品种信息, 保持接口的一致, 可以在原CTA引擎上执行的代码, +也可以在这个引擎上执行 +This file add multi Time Frame functionalities to CTA backtesting engine, the APIs are the +same as CTA engine. Real trading code can be directly used for backtesting. +''' + +from __future__ import division +from vtFunction import loadMongoSetting + +from ctaBacktesting import * + +class BacktestEngineMultiTF(BacktestingEngine): + + def __init__(self): + """Constructor""" + super(BacktestEngineMultiTF, self).__init__() + + self.info_symbols = [] # List, 输入辅助品种的2值tuple, 左边为数据库名, 右边为collection名 + self.InfoCursor = {} # Dict, 放置回测用辅助品种数据库 + self.initInfoCursor = {} # Dict, 放置初始化用辅助品种数据库 + self.infobar = {} # Dict, 放置辅助品种最新一个K线数据 + self.MultiOn = False # Boolean, 判断是否传入了辅助品种 + + # ---------------------------------------------------------------------- + def setDatabase(self, dbName, symbol, **kwargs): + """set database that provide historical data""" + + self.dbName = dbName + + # Set executed symbol and information symbols + self.symbol = symbol + if "info_symbol" in kwargs: + self.info_symbols = kwargs["info_symbol"] + + # Turn on MultiTF switch + if len(self.info_symbols) > 0: + self.MultiOn = True + + # ---------------------------------------------------------------------- + def loadInitData(self, collection, **kwargs): + """Load initializing data""" + # 载入初始化需要用的数据 + # Load initialised data + + # $gte means "greater and equal to" + # $lt means "less than" + flt = {'datetime': {'$gte': self.dataStartDate, + '$lt': self.strategyStartDate}} + self.initCursor = collection.find(flt) + + # 初始化辅助品种数据 + # Initializing information data + if "inf" in kwargs: + for name in kwargs["inf"]: + DB = kwargs["inf"][name] + self.initInfoCursor[name] = DB.find(flt) + + # 将数据从查询指针中读取出,并生成列表 + # Read data from cursor, generate a list + self.initData = [] + + for d in self.initCursor: + data = self.dataClass() + data.__dict__ = d + self.initData.append(data) + + # ---------------------------------------------------------------------- + def loadHistoryData(self): + """载入历史数据""" + """load historical data""" + + host, port = loadMongoSetting() + + self.dbClient = pymongo.MongoClient(host, port) + collection = self.dbClient[self.dbName][self.symbol] + + # Load historical data of information symbols, construct a dictionary of Database + # Values of dictionary are mongo.Client. + info_collection = {} + if self.MultiOn is True: + for DBname, symbol in self.info_symbols: + info_collection[DBname + " " + symbol] = self.dbClient[DBname][symbol] + + self.output("Start loading historical data") + + # 首先根据回测模式,确认要使用的数据类 + # Choose data type based on backtest mode + if self.mode == self.BAR_MODE: + self.dataClass = CtaBarData + self.func = self.newBar + else: + self.dataClass = CtaTickData + self.func = self.newTick + + # Load initializing data + self.loadInitData(collection, inf=info_collection) + + # 载入回测数据 + # Load backtest data (exclude initializing data) + if not self.dataEndDate: + # If "End Date" is not set, retreat data up to today + flt = {'datetime': {'$gte': self.strategyStartDate}} + else: + flt = {'datetime': {'$gte': self.strategyStartDate, + '$lte': self.dataEndDate}} + self.dbCursor = collection.find(flt) + + if self.MultiOn is True: + for db in info_collection: + self.InfoCursor[db] = info_collection[db].find(flt) + self.output( + "Data loading completed, data volumn: %s" % (self.initCursor.count() + self.dbCursor.count() + \ + sum([i.count() for i in self.InfoCursor.values()]))) + else: + self.output("Data loading completed, data volumn: %s" % (self.initCursor.count() + self.dbCursor.count())) + + # ---------------------------------------------------------------------- + def runBacktesting(self): + """运行回测""" + """Run backtesting""" + + # 载入历史数据 + # Load historical data + self.loadHistoryData() + + self.output("Start backtesing!") + + self.strategy.inited = True + self.strategy.onInit() + self.output("Strategy initialsing complete") + + self.strategy.trading = True + self.strategy.onStart() + self.output("Strategy started") + + self.output("Processing historical data...") + + dataClass = self.dataClass + func = self.func + for d in self.dbCursor: + data = dataClass() + data.__dict__ = d + func(data) + + self.output("No more historical data") + + # ---------------------------------------------------------------------- + def checkInformationData(self): + """Update information symbols' data""" + + # If infobar is empty, which means it is the first time calling this method + if self.infobar == {}: + for info_symbol in self.InfoCursor: + try: + self.infobar[info_symbol] = next(self.InfoCursor[info_symbol]) + except StopIteration: + print "Data of information symbols is empty! Input must be a list, not str." + raise + + temp = {} + for info_symbol in self.infobar: + + data = self.infobar[info_symbol] + + # Update data only when Time Stamp is matched + if data['datetime'] <= self.dt: + try: + temp[info_symbol] = CtaBarData() + temp[info_symbol].__dict__ = data + self.infobar[info_symbol] = next(self.InfoCursor[info_symbol]) + except StopIteration: + self.output("No more data in information database.") + else: + temp[info_symbol] = None + + return temp + + # ---------------------------------------------------------------------- + def newBar(self, bar): + """新的K线""" + """new ohlc Bar""" + self.bar = bar + self.dt = bar.datetime + self.updatePosition() # Update total position value based on new Bar + self.crossLimitOrder() # 先撮合限价单 + self.crossStopOrder() # 再撮合停止单 + if self.MultiOn is True: + self.strategy.onBar(bar, infobar=self.checkInformationData()) # 推送K线到策略中 + else: + self.strategy.onBar(bar) # 推送K线到策略中 + + # ---------------------------------------------------------------------- + def newTick(self, tick): + """新的Tick""" + """new Tick""" + self.tick = tick + self.dt = tick.datetime + self.crossLimitOrder() + self.crossStopOrder() + self.strategy.onTick(tick) + +######################################################################## + diff --git a/vn.trader/ctaAlgo/tools/multiTimeFrame/ctaStrategyMultiTF.py b/vn.trader/ctaAlgo/tools/multiTimeFrame/ctaStrategyMultiTF.py new file mode 100644 index 00000000..21eb627e --- /dev/null +++ b/vn.trader/ctaAlgo/tools/multiTimeFrame/ctaStrategyMultiTF.py @@ -0,0 +1,410 @@ +# encoding: UTF-8 +""" +This file tweaks ctaTemplate Module to suit multi-TimeFrame strategies. +""" + +from strategyAtrRsi import * +from ctaBase import * +from ctaTemplate import CtaTemplate + +######################################################################## +class TC11(CtaTemplate): + + # Strategy name and author + className = "TC11" + author = "Zenacon" + + # Set MongoDB DataBase + barDbName = "TestData" + + # Strategy parameters + pGeneric_prd = 21 + pGeneric_on = True + + pATRprd_F = 13 + pATRprd_M = 21 + pATRprd_S = 63 + + pBOSSplus_prd = 98 + pBOSSminus_prd = 22 + + if pGeneric_on == 0: + pRSIprd = 20 + pBBprd = 10 + pBB_ATRprd = 15 + pATRprd = 21 + pDMIprd = 21 + else: + pRSIprd = \ + pBBprd = \ + pBB_ATRprd = \ + pATRprd = \ + pDMIprd = pGeneric_prd + + pBOSS_Mult = 1.75 + + # Strategy variables + vOBO_initialpoint = EMPTY_FLOAT + vOBO_Stretch = EMPTY_FLOAT + vOBO_level_L = EMPTY_FLOAT + vOBO_level_S = EMPTY_FLOAT + + # parameters' list, record names of parameters + paramList = ['name', + 'className', + 'author', + 'vtSymbol'] + + # variables' list, record names of variables + varList = ['inited', + 'trading', + 'pos'] + + def __init__(self, ctaEngine, setting): + """Constructor""" + super(TC11, self).__init__(ctaEngine, setting) + + # ---------------------------------------------------------------------- + def onBar(self, bar, **kwargs): + """收到Bar推送(必须由用户继承实现)""" + # 撤销之前发出的尚未成交的委托(包括限价单和停止单) + for orderID in self.orderList: + self.cancelOrder(orderID) + self.orderList = [] + + # Record new information bar + if "infobar" in kwargs: + for i in kwargs["infobar"]: + if kwargs["infobar"][i] is None: + pass + else: + # print kwargs["infobar"][i]["close"] + self.closeArray[0:self.bufferSize - 1] = self.closeArray[1:self.bufferSize] + self.highArray[0:self.bufferSize - 1] = self.highArray[1:self.bufferSize] + self.lowArray[0:self.bufferSize - 1] = self.lowArray[1:self.bufferSize] + + self.closeArray[-1] = bar.close + self.highArray[-1] = bar.high + self.lowArray[-1] = bar.low + + """ + Record new bar + """ + self.closeArray[0:self.bufferSize - 1] = self.closeArray[1:self.bufferSize] + self.highArray[0:self.bufferSize - 1] = self.highArray[1:self.bufferSize] + self.lowArray[0:self.bufferSize - 1] = self.lowArray[1:self.bufferSize] + + self.closeArray[-1] = bar.close + self.highArray[-1] = bar.high + self.lowArray[-1] = bar.low + + self.bufferCount += 1 + if self.bufferCount < self.bufferSize: + return + + """ + Calculate Indicators + """ + + vOBO_initialpoint = self.dataHTF_filled['Open'] + vOBO_Stretch = self.vATR['htf'].m * self.pBOSS_Mult + + self.atrValue = talib.ATR(self.highArray, + self.lowArray, + self.closeArray, + self.atrLength)[-1] + self.atrArray[0:self.bufferSize - 1] = self.atrArray[1:self.bufferSize] + self.atrArray[-1] = self.atrValue + + self.atrCount += 1 + if self.atrCount < self.bufferSize: + return + + self.atrMa = talib.MA(self.atrArray, + self.atrMaLength)[-1] + self.rsiValue = talib.RSI(self.closeArray, + self.rsiLength)[-1] + + # 判断是否要进行交易 + + # 当前无仓位 + if self.pos == 0: + self.intraTradeHigh = bar.high + self.intraTradeLow = bar.low + + # ATR数值上穿其移动平均线,说明行情短期内波动加大 + # 即处于趋势的概率较大,适合CTA开仓 + if self.atrValue > self.atrMa: + # 使用RSI指标的趋势行情时,会在超买超卖区钝化特征,作为开仓信号 + if self.rsiValue > self.rsiBuy: + # 这里为了保证成交,选择超价5个整指数点下单 + self.buy(bar.close + 5, 1) + + elif self.rsiValue < self.rsiSell: + self.short(bar.close - 5, 1) + + # 持有多头仓位 + elif self.pos > 0: + # 计算多头持有期内的最高价,以及重置最低价 + self.intraTradeHigh = max(self.intraTradeHigh, bar.high) + self.intraTradeLow = bar.low + # 计算多头移动止损 + longStop = self.intraTradeHigh * (1 - self.trailingPercent / 100) + # 发出本地止损委托,并且把委托号记录下来,用于后续撤单 + orderID = self.sell(longStop, 1, stop=True) + self.orderList.append(orderID) + + # 持有空头仓位 + elif self.pos < 0: + self.intraTradeLow = min(self.intraTradeLow, bar.low) + self.intraTradeHigh = bar.high + + shortStop = self.intraTradeLow * (1 + self.trailingPercent / 100) + orderID = self.cover(shortStop, 1, stop=True) + self.orderList.append(orderID) + + # 发出状态更新事件 + self.putEvent() + +######################################################################## +class Prototype(AtrRsiStrategy): + + """ + "infoArray" 字典是用来储存辅助品种信息的, 可以是同品种的不同分钟k线, 也可以是不同品种的价格。 + + 调用的方法: + self.infoArray["数据库名 + 空格 + collection名"]["close"] + self.infoArray["数据库名 + 空格 + collection名"]["high"] + self.infoArray["数据库名 + 空格 + collection名"]["low"] + """ + infoArray = {} + initInfobar = {} + + def __int__(self): + super(Prototype, self).__int__() + + # ---------------------------------------------------------------------- + def onInit(self): + """初始化策略(必须由用户继承实现)""" + self.writeCtaLog(u'%s策略初始化' % self.name) + + # 初始化RSI入场阈值 + self.rsiBuy = 50 + self.rsiEntry + self.rsiSell = 50 - self.rsiEntry + + # 载入历史数据,并采用回放计算的方式初始化策略数值 + initData = self.loadBar(self.initDays) + for bar in initData: + + # 推送新数据, 同时检查是否有information bar需要推送 + # Update new bar, check whether the Time Stamp matching any information bar + ibar = self.checkInfoBar(bar) + self.onBar(bar, infobar=ibar) + + self.putEvent() + + # ---------------------------------------------------------------------- + def checkInfoBar(self, bar): + """在初始化时, 检查辅助品种数据的推送(初始化结束后, 回测时不会调用)""" + + initInfoCursorDict = self.ctaEngine.initInfoCursor + + # 如果"initInfobar"字典为空, 初始化字典, 插入第一个数据 + # If dictionary "initInfobar" is empty, insert first data record + if self.initInfobar == {}: + for info_symbol in initInfoCursorDict: + try: + self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol]) + except StopIteration: + print "Data of information symbols is empty! Input is a list, not str." + raise + + # 若有某一品种的 TimeStamp 和执行报价的 TimeStamp 匹配, 则将"initInfobar"中的数据推送, + # 然后更新该品种的数据 + # If any symbol's TimeStamp is matched with execution symbol's TimeStamp, return data + # in "initInfobar", and update new data. + temp = {} + for info_symbol in self.initInfobar: + + data = self.initInfobar[info_symbol] + + # Update data only when Time Stamp is matched + if data['datetime'] <= bar.datetime: + try: + temp[info_symbol] = CtaBarData() + temp[info_symbol].__dict__ = data + self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol]) + except StopIteration: + self.ctaEngine.output("No more data for initializing %s." % (info_symbol,)) + else: + temp[info_symbol] = None + + return temp + + # ---------------------------------------------------------------------- + def updateInfoArray(self, infobar): + """收到Infomation Data, 更新辅助品种缓存字典""" + + for name in infobar: + + data = infobar[name] + + # Construct empty array + if len(self.infoArray) < len(infobar) : + self.infoArray[name] = { + "close": np.zeros(self.bufferSize), + "high": np.zeros(self.bufferSize), + "low": np.zeros(self.bufferSize) + } + + if data is None: + pass + + else: + self.infoArray[name]["close"][0:self.bufferSize - 1] = \ + self.infoArray[name]["close"][1:self.bufferSize] + self.infoArray[name]["high"][0:self.bufferSize - 1] = \ + self.infoArray[name]["high"][1:self.bufferSize] + self.infoArray[name]["low"][0:self.bufferSize - 1] = \ + self.infoArray[name]["low"][1:self.bufferSize] + + self.infoArray[name]["close"][-1] = data.close + self.infoArray[name]["high"][-1] = data.high + self.infoArray[name]["low"][-1] = data.low + + # ---------------------------------------------------------------------- + def onBar(self, bar, **kwargs): + """收到Bar推送(必须由用户继承实现)""" + # 撤销之前发出的尚未成交的委托(包括限价单和停止单) + for orderID in self.orderList: + self.cancelOrder(orderID) + self.orderList = [] + + # Update infomation data + # "infobar"是由不同时间或不同品种的品种数据组成的字典, 如果和执行品种的 TimeStamp 不匹配, + # 则传入的是"None", 当time stamp和执行品种匹配时, 传入的是"Bar" + self.updateInfoArray(kwargs["infobar"]) + + # 保存K线数据 + self.closeArray[0:self.bufferSize - 1] = self.closeArray[1:self.bufferSize] + self.highArray[0:self.bufferSize - 1] = self.highArray[1:self.bufferSize] + self.lowArray[0:self.bufferSize - 1] = self.lowArray[1:self.bufferSize] + + self.closeArray[-1] = bar.close + self.highArray[-1] = bar.high + self.lowArray[-1] = bar.low + + # 若读取的缓存数据不足, 不考虑交易 + self.bufferCount += 1 + if self.bufferCount < self.bufferSize: + return + + # 计算指标数值 + + # 计算不同时间下的ATR数值 + + # Only trading when information bar changes + # 只有在30min或者1d K线更新后才可以交易 + TradeOn = False + if any([i is not None for i in kwargs["infobar"].values()]): + + TradeOn = True + self.scaledAtrValue1M = talib.ATR(self.highArray, + self.lowArray, + self.closeArray, + self.atrLength)[-1] * (25) ** (0.5) + self.atrValue30M = talib.abstract.ATR(self.infoArray["TestData @GC_30M"])[-1] + self.rsiValue = talib.abstract.RSI(self.infoArray["TestData @GC_30M"], self.rsiLength)[-1] + + self.atrCount += 1 + if self.atrCount < self.bufferSize: + return + + # 判断是否要进行交易 + + # 当前无仓位 + if (self.pos == 0 and TradeOn == True): + self.intraTradeHigh = bar.high + self.intraTradeLow = bar.low + + # 1Min调整后ATR大于30MinATR + # 即处于趋势的概率较大,适合CTA开仓 + if self.atrValue30M < self.scaledAtrValue1M: + # 使用RSI指标的趋势行情时,会在超买超卖区钝化特征,作为开仓信号 + if self.rsiValue > self.rsiBuy: + # 这里为了保证成交,选择超价5个整指数点下单 + self.buy(bar.close+5, 1) + + elif self.rsiValue < self.rsiSell: + self.short(bar.close-5, 1) + + # 下单后, 在下一个30Min K线之前不交易 + TradeOn = False + + # 持有多头仓位 + elif self.pos > 0: + # 计算多头持有期内的最高价,以及重置最低价 + self.intraTradeHigh = max(self.intraTradeHigh, bar.high) + self.intraTradeLow = bar.low + # 计算多头移动止损 + longStop = self.intraTradeHigh * (1 - self.trailingPercent / 100) + # 发出本地止损委托,并且把委托号记录下来,用于后续撤单 + orderID = self.sell(longStop, 1, stop=True) + self.orderList.append(orderID) + + # 持有空头仓位 + elif self.pos < 0: + self.intraTradeLow = min(self.intraTradeLow, bar.low) + self.intraTradeHigh = bar.high + + shortStop = self.intraTradeLow * (1 + self.trailingPercent / 100) + orderID = self.cover(shortStop, 1, stop=True) + self.orderList.append(orderID) + + # 发出状态更新事件 + self.putEvent() + + +if __name__ == '__main__': + # 提供直接双击回测的功能 + # 导入PyQt4的包是为了保证matplotlib使用PyQt4而不是PySide,防止初始化出错 + from ctaBacktestMultiTF import * + from PyQt4 import QtCore, QtGui + import time + + ''' + 创建回测引擎 + 设置引擎的回测模式为K线 + 设置回测用的数据起始日期 + 载入历史数据到引擎中 + 在引擎中创建策略对象 + + Create backtesting engine + Set backtest mode as "Bar" + Set "Start Date" of data range + Load historical data to engine + Create strategy instance in engine + ''' + engine = BacktestEngineMultiTF() + engine.setBacktestingMode(engine.BAR_MODE) + engine.setStartDate('20100101') + engine.setDatabase("TestData", "@GC_1M", info_symbol=[("TestData","@GC_30M")]) + + # Set parameters for strategy + d = {'atrLength': 11} + engine.initStrategy(Prototype, d) + + # 设置产品相关参数 + engine.setSlippage(0.2) # 股指1跳 + engine.setCommission(0.3 / 10000) # 万0.3 + engine.setSize(300) # 股指合约大小 + + # 开始跑回测 + start = time.time() + + engine.runBacktesting() + + # 显示回测结果 + engine.showBacktestingResult() + + print 'Time consumed:%s' % (time.time() - start) \ No newline at end of file diff --git a/vn.trader/ctaAlgo/tools/multiTimeFrame/strategyBreakOut.py b/vn.trader/ctaAlgo/tools/multiTimeFrame/strategyBreakOut.py new file mode 100644 index 00000000..3236ec0b --- /dev/null +++ b/vn.trader/ctaAlgo/tools/multiTimeFrame/strategyBreakOut.py @@ -0,0 +1,317 @@ +# encoding: UTF-8 +""" +This file tweaks ctaTemplate Module to suit multi-TimeFrame strategies. +""" + +from ctaBase import * +from ctaTemplate import CtaTemplate +import numpy as np + +######################################################################## +class BreakOut(CtaTemplate): + + """ + "infoArray" 字典是用来储存辅助品种信息的, 可以是同品种的不同分钟k线, 也可以是不同品种的价格。 + + 调用的方法: + 价格序列: + self.infoArray["数据库名 + 空格 + collection名"]["close"] + self.infoArray["数据库名 + 空格 + collection名"]["high"] + self.infoArray["数据库名 + 空格 + collection名"]["low"] + + 单个价格: + self.infoBar["数据库名 + 空格 + collection名"] + 返回的值为一个ctaBarData 或 None + """ + + #---------------------------------------------------------------------- + def __init__(self, ctaEngine, setting): + """日内突破交易策略, 出场方式非常多, 本文件使用指标出场""" + + className = 'BreakOut' + author = 'Joe' + super(BreakOut, self).__init__(ctaEngine, setting) + + # 设置辅助品种数据字典 + self.infoArray = {} + self.initInfobar = {} + self.infoBar = {} + + # 缓存数据量 + self.bufferSize = 100 + self.bufferCount = 0 + self.initDays = 10 + + # 设置参数 + self.pOBO_Mult = 0.5 # 计算突破点位 + # self.pProtMult = 2 # 止损的ATR倍数 + # self.pProfitMult = 2 # 止盈相对于止损的倍数 + # self.SlTp_On = False # 止损止盈功能 + # self.EODTime = 15 # 设置日内平仓时间 + + self.vOBO_stretch = EMPTY_FLOAT + self.vOBO_initialpoint = EMPTY_FLOAT + self.vOBO_level_L = EMPTY_FLOAT + self.vOBO_level_S = EMPTY_FLOAT + + self.orderList = [] + + # 参数列表,保存了参数的名称 + paramList = ['name', + 'className', + 'author', + 'pOBO_Mult', + 'pProtMult', + 'pProfitMult', + 'SlTp_On', + 'EODTime'] + + # 变量列表,保存了变量的名称 + varList = ['vOBO_stretch', + 'vOBO_initialpoint', + 'vOBO_level_L', + 'vOBO_level_S'] + + # ---------------------------------------------------------------------- + def onInit(self): + """初始化策略(必须由用户继承实现)""" + self.writeCtaLog(u'%s策略初始化' % self.name) + + # 载入历史数据,并采用回放计算的方式初始化策略数值 + initData = self.loadBar(self.initDays) + for bar in initData: + + # 推送新数据, 同时检查是否有information bar需要推送 + # Update new bar, check whether the Time Stamp matching any information bar + ibar = self.checkInfoBar(bar) + self.onBar(bar, infobar=ibar) + + self.putEvent() + + #---------------------------------------------------------------------- + def onStart(self): + """启动策略(必须由用户继承实现)""" + self.writeCtaLog(u'%s策略启动' %self.name) + self.putEvent() + + #---------------------------------------------------------------------- + def onStop(self): + """停止策略(必须由用户继承实现)""" + self.writeCtaLog(u'%s策略停止' %self.name) + self.putEvent() + + # ---------------------------------------------------------------------- + def checkInfoBar(self, bar): + """在初始化时, 检查辅助品种数据的推送(初始化结束后, 回测时不会调用)""" + + initInfoCursorDict = self.ctaEngine.initInfoCursor + + # 如果"initInfobar"字典为空, 初始化字典, 插入第一个数据 + # If dictionary "initInfobar" is empty, insert first data record + if self.initInfobar == {}: + for info_symbol in initInfoCursorDict: + try: + self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol]) + except StopIteration: + print "Data of information symbols is empty! Input is a list, not str." + raise + + # 若有某一品种的 TimeStamp 和执行报价的 TimeStamp 匹配, 则将"initInfobar"中的数据推送, + # 然后更新该品种的数据 + # If any symbol's TimeStamp is matched with execution symbol's TimeStamp, return data + # in "initInfobar", and update new data. + temp = {} + for info_symbol in self.initInfobar: + + data = self.initInfobar[info_symbol] + + # Update data only when Time Stamp is matched + + if (data is not None) and (data['datetime'] <= bar.datetime): + + try: + temp[info_symbol] = CtaBarData() + temp[info_symbol].__dict__ = data + self.initInfobar[info_symbol] = next(initInfoCursorDict[info_symbol]) + except StopIteration: + self.initInfobar[info_symbol] = None + self.ctaEngine.output("No more data for initializing %s." % (info_symbol,)) + else: + temp[info_symbol] = None + + return temp + + # ---------------------------------------------------------------------- + def updateInfoArray(self, infobar): + """收到Infomation Data, 更新辅助品种缓存字典""" + + for name in infobar: + + data = infobar[name] + + # Construct empty array + if len(self.infoArray) < len(infobar) : + self.infoArray[name] = { + "close": np.zeros(self.bufferSize), + "high": np.zeros(self.bufferSize), + "low": np.zeros(self.bufferSize), + "open": np.zeros(self.bufferSize) + } + + if data is None: + pass + + else: + self.infoArray[name]["close"][0:self.bufferSize - 1] = \ + self.infoArray[name]["close"][1:self.bufferSize] + self.infoArray[name]["high"][0:self.bufferSize - 1] = \ + self.infoArray[name]["high"][1:self.bufferSize] + self.infoArray[name]["low"][0:self.bufferSize - 1] = \ + self.infoArray[name]["low"][1:self.bufferSize] + self.infoArray[name]["open"][0:self.bufferSize - 1] = \ + self.infoArray[name]["open"][1:self.bufferSize] + + self.infoArray[name]["close"][-1] = data.close + self.infoArray[name]["high"][-1] = data.high + self.infoArray[name]["low"][-1] = data.low + self.infoArray[name]["open"][-1] = data.open + + # ---------------------------------------------------------------------- + def onBar(self, bar, **kwargs): + """收到Bar推送(必须由用户继承实现)""" + + # Update infomation data + # "infobar"是由不同时间或不同品种的品种数据组成的字典, 如果和执行品种的 TimeStamp 不匹配, + # 则传入的是"None", 当time stamp和执行品种匹配时, 传入的是"Bar" + if "infobar" in kwargs: + self.infoBar = kwargs["infobar"] + self.updateInfoArray(kwargs["infobar"]) + + # 若读取的缓存数据不足, 不考虑交易 + self.bufferCount += 1 + if self.bufferCount < self.bufferSize: + return + + # 计算指标数值 + a = np.sum(self.infoArray["TestData @GC_1D"]["close"]) + if a == 0.0: + return + + # Only updating indicators when information bar changes + # 只有在30min或者1d K线更新后才更新指标 + TradeOn = False + if any([i is not None for i in self.infoBar]): + TradeOn = True + self.vRange = self.infoArray["TestData @GC_1D"]["high"][-1] -\ + self.infoArray["TestData @GC_1D"]["low"][-1] + self.vOBO_stretch = self.vRange * self.pOBO_Mult + self.vOBO_initialpoint = self.infoArray["TestData @GC_1D"]["close"][-1] + self.vOBO_level_L = self.vOBO_initialpoint + self.vOBO_stretch + self.vOBO_level_S = self.vOBO_initialpoint - self.vOBO_stretch + + self.atrValue30M = talib.abstract.ATR(self.infoArray["TestData @GC_30M"])[-1] + + # 判断是否要进行交易 + + # 当前无仓位 + if (self.pos == 0 and TradeOn == True): + + # 撤销之前发出的尚未成交的委托(包括限价单和停止单) + for orderID in self.orderList: + self.cancelOrder(orderID) + self.orderList = [] + + # 若上一个30分钟K线的最高价大于OBO_level_L + # 且当前的价格大于OBO_level_L, 则买入 + if self.infoArray["TestData @GC_30M"]["high"][-1] > self.vOBO_level_L: + + if bar.close > self.vOBO_level_L: + + self.buy(bar.close + 0.5, 1) + + # 下单后, 在下一个30Min K线之前不交易 + TradeOn = False + + # 若上一个30分钟K线的最高价低于OBO_level_S + # 且当前的价格小于OBO_level_S, 则卖出 + elif self.infoArray["TestData @GC_30M"]["low"][-1] < self.vOBO_level_S: + + if bar.close < self.vOBO_level_S: + + self.short(bar.close - 0.5, 1) + + # 下单后, 在下一个30Min K线之前不交易 + TradeOn = False + + # 持有多头仓位 + elif self.pos > 0: + + # 当价格低于initialpoint水平, 出场 + if bar.close < self.vOBO_initialpoint: + self.sell(bar.close - 0.5 , 1) + + # 持有空头仓位 + elif self.pos < 0: + + # 当价格高于initialpoint水平, 出场 + if bar.close > self.vOBO_initialpoint: + self.cover(bar.close + 0.5, 1) + + + # 发出状态更新事件 + self.putEvent() + + # ---------------------------------------------------------------------- + def onOrder(self, order): + """收到委托变化推送(必须由用户继承实现)""" + pass + + # ---------------------------------------------------------------------- + def onTrade(self, trade): + pass + + +if __name__ == '__main__': + # 提供直接双击回测的功能 + # 导入PyQt4的包是为了保证matplotlib使用PyQt4而不是PySide,防止初始化出错 + from ctaBacktestMultiTF import * + from PyQt4 import QtCore, QtGui + import time + + ''' + 创建回测引擎 + 设置引擎的回测模式为K线 + 设置回测用的数据起始日期 + 载入历史数据到引擎中 + 在引擎中创建策略对象 + + Create backtesting engine + Set backtest mode as "Bar" + Set "Start Date" of data range + Load historical data to engine + Create strategy instance in engine + ''' + engine = BacktestEngineMultiTF() + engine.setBacktestingMode(engine.BAR_MODE) + engine.setStartDate('20120101') + engine.setEndDate('20150101') + engine.setDatabase("TestData", "@GC_1M", info_symbol=[("TestData","@GC_30M"), + ("TestData","@GC_1D")]) + + # Set parameters for strategy + engine.initStrategy(BreakOut, {}) + + # 设置产品相关参数 + engine.setSlippage(0.2) # 股指1跳 + engine.setCommission(0.3 / 10000) # 万0.3 + engine.setSize(1) # 股指合约大小 + + # 开始跑回测 + start = time.time() + + engine.runBacktesting() + + # 显示回测结果 + engine.showBacktestingResult() + + print 'Time consumed:%s' % (time.time() - start) \ No newline at end of file