diff --git a/vn.data/mysqldb.sql b/vn.data/mysqldb.sql new file mode 100644 index 00000000..494bb437 --- /dev/null +++ b/vn.data/mysqldb.sql @@ -0,0 +1,14 @@ +use BackTest; + +create table TB_Trade +( + Id varchar(255), + symbol varchar(20), + orderRef varchar(20), + tradeID varchar(20), + direction varchar(10), + offset varchar(10), + price float, + volume int + +); \ No newline at end of file diff --git a/vn.strategy/strategydemo/backtestingEngine.py b/vn.strategy/strategydemo/backtestingEngine.py index c3d82544..467b5fbe 100644 --- a/vn.strategy/strategydemo/backtestingEngine.py +++ b/vn.strategy/strategydemo/backtestingEngine.py @@ -25,6 +25,9 @@ class LimitOrder(object): self.direction = None self.offset = None + #Modified by Incense Lee + self.orderTime = datetime.now() #下单时间 + ######################################################################## class BacktestingEngine(object): @@ -37,15 +40,15 @@ class BacktestingEngine(object): #---------------------------------------------------------------------- def __init__(self): """Constructor""" - self.eventEngine = EventEngine() + self.eventEngine = EventEngine() # 实例化 # 策略引擎 - self.strategyEngine = None - + self.strategyEngine = None # 通过setStrategyEngine进行设置 + # TICK历史数据列表,由于要使用For循环来实现仿真回放 # 使用list的速度比Numpy和Pandas都要更快 self.listDataHistory = [] - + # 限价单字典 self.dictOrder = {} @@ -60,6 +63,9 @@ class BacktestingEngine(object): # 成交编号 self.tradeID = 0 + + # 回测编号 + self.Id = datetime.now().strftime('%Y%m%d-%H%M%S') #---------------------------------------------------------------------- def setStrategyEngine(self, engine): @@ -114,6 +120,7 @@ class BacktestingEngine(object): #---------------------------------------------------------------------- def loadMysqlDataHistory(self, symbol, startDate, endDate): """从Mysql载入历史TICK数据,""" + #Todo :判断开始和结束时间,如果间隔天过长,数据量会过大,需要批次提取。 try: if self.__mysqlConnected: @@ -147,18 +154,41 @@ class BacktestingEngine(object): self.writeLog(sqlstring) count = cur.execute(sqlstring) + self.writeLog(u'历史TICK数据共{0}条'.format(count)) # 将TICK数据读入内存 - self.listDataHistory = cur.fetchall() + #self.listDataHistory = cur.fetchall() + + fetch_counts = 0 + fetch_size = 10000 + + while True: + results = cur.fetchmany(fetch_size) + + if not results: + break + + fetch_counts = fetch_counts+fetch_size + + if not self.listDataHistory: + + self.listDataHistory =results + + else: + self.listDataHistory = self.listDataHistory + results + + self.writeLog(u'历史TICK数据载入{0}条'.format(fetch_counts)) + + self.writeLog(u'历史TICK数据载入完成,{1}~{2},共{0}条'.format(count,startDate,endDate)) - self.writeLog(u'历史TICK数据载入完成,共{0}条'.format(count)) else: self.writeLog(u'MysqlDB未连接,请检查') except MySQLdb.Error, e: - self.writeLog(u'MysqlDB载入数据失败,请检查.Error {0}: {1}'.format(e.arg[0],e.arg[1])) + self.writeLog(u'MysqlDB载入数据失败,请检查.Error {0}'.format(e)) #---------------------------------------------------------------------- def getMysqlDeltaDate(self,symbol, startDate, decreaseDays): + """从mysql库中获取交易日前若干天""" try: if self.__mysqlConnected: @@ -193,7 +223,10 @@ class BacktestingEngine(object): #---------------------------------------------------------------------- def processLimitOrder(self): - """处理限价单""" + """ + 处理限价单 + 为体现准确性,回测引擎需要真实tick数据的买一或卖一价比对。 + """ for ref, order in self.dictOrder.items(): # 如果是买单,且限价大于等于当前TICK的卖一价,则假设成交 if order.direction == DIRECTION_BUY and \ @@ -206,7 +239,10 @@ class BacktestingEngine(object): #---------------------------------------------------------------------- def executeLimitOrder(self, ref, order, price): - """限价单成交处理""" + """ + 模拟限价单成交处理 + 回测引擎模拟成交 + """ # 成交回报 self.tradeID = self.tradeID + 1 @@ -218,6 +254,7 @@ class BacktestingEngine(object): tradeData['OffsetFlag'] = order.offset tradeData['Price'] = price tradeData['Volume'] = order.volume + tradeData['TradeTime'] = order.insertTime print tradeData @@ -243,10 +280,10 @@ class BacktestingEngine(object): orderEvent = Event() orderEvent.dict_['data'] = orderData self.strategyEngine.updateOrder(orderEvent) - + # 记录该成交到列表中 self.listTrade.append(tradeData) - + # 删除该限价单 del self.dictOrder[ref] @@ -265,16 +302,17 @@ class BacktestingEngine(object): # 记录最新的TICK数据 self.currentData = data - + # 处理限价单 self.processLimitOrder() - + # 推送到策略引擎中 event = Event() event.dict_['data'] = data self.strategyEngine.updateMarketData(event) - - self.saveTradeData() + + #保存到数据库中 + self.saveTradeDataToMysql() t2 = datetime.now() self.writeLog(u'回测结束,{0},耗时:{1}秒'.format(str(t2),(t2-t1).seconds)) @@ -282,13 +320,15 @@ class BacktestingEngine(object): #---------------------------------------------------------------------- - def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset): + def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset, orderTime=datetime.now()): """回测发单""" order = LimitOrder(instrumentid) order.price = price order.direction = direction order.volume = volume order.offset = offset + order.orderTime = orderTime + self.orderRef = self.orderRef + 1 self.dictOrder[str(self.orderRef)] = order @@ -330,4 +370,37 @@ class BacktestingEngine(object): """仿真订阅合约""" pass + #---------------------------------------------------------------------- + def saveTradeDataToMysql(self): + """保存交易记录到mysql,added by Incense Lee""" + if self.__mysqlConnected: + sql='insert into BackTest.TB_Trade values ' + values = '' + + for tradeItem in self.listTrade: + + if len(values) > 0: + values = values + ',' + + values = values + '(\'{0}\',\'{1}\',\'{2}\',\'{3}\',\'{4}\',\'{5}\',{6},{7})'.format( + self.Id, + tradeItem['InstrumentID'], + tradeItem['OrderRef'], + tradeItem['TradeID'], + tradeItem['Direction'], + tradeItem['OffsetFlag'], + tradeItem['Price'], + tradeItem['Volume']) + + cur = self.__mysqlConnection.cursor(MySQLdb.cursors.DictCursor) + + + try: + cur.execute(sql+values) + self.__mysqlConnection.commit() + except Exception, e: + print e + + else: + self.saveTradeData() diff --git a/vn.strategy/strategydemo/demoBacktesting.py b/vn.strategy/strategydemo/demoBacktesting.py index a849bed0..9042b402 100644 --- a/vn.strategy/strategydemo/demoBacktesting.py +++ b/vn.strategy/strategydemo/demoBacktesting.py @@ -24,7 +24,7 @@ if __name__ == '__main__': be.connectMysql() #be.loadMongoDataHistory(symbol, datetime(2015,5,1), datetime.today()) #be.loadMongoDataHistory(symbol, datetime(2012,1,9), datetime(2012,1,14)) - be.loadMysqlDataHistory(symbol, datetime(2012,1,9), datetime(2012,3,30)) + be.loadMysqlDataHistory(symbol, datetime(2012,1,9), datetime(2012,1,30)) # 创建策略对象 setting = {} diff --git a/vn.strategy/strategydemo/demoEngine.py b/vn.strategy/strategydemo/demoEngine.py index cb55ec8d..e36f35b7 100644 --- a/vn.strategy/strategydemo/demoEngine.py +++ b/vn.strategy/strategydemo/demoEngine.py @@ -27,7 +27,7 @@ class MainEngine: def __init__(self): """Constructor""" self.ee = EventEngine() # 创建事件驱动引擎 - print u'demoEngine.MainEngine create EventEngine() success' + print u'demoEngine.MainEngineee create EventEngine() success' self.md = DemoMdApi(self.ee) # 创建API接口 print u'demoEngine.MainEngine create DemoMdApi() success' diff --git a/vn.strategy/strategydemo/demoStrategy.py b/vn.strategy/strategydemo/demoStrategy.py index 4d8d449b..fa605114 100644 --- a/vn.strategy/strategydemo/demoStrategy.py +++ b/vn.strategy/strategydemo/demoStrategy.py @@ -230,7 +230,7 @@ class SimpleEmaStrategy(StrategyTemplate): #---------------------------------------------------------------------- def onBar(self, o, h, l, c, volume, time): - """K线数据更新""" + """K线数据更新,同时进行策略的买入、卖出逻辑计算""" # 保存K线序列数据 self.listOpen.append(o) self.listHigh.append(h) diff --git a/vn.strategy/strategydemo/strategyEngine.py b/vn.strategy/strategydemo/strategyEngine.py index 390da8b3..baa11f8f 100644 --- a/vn.strategy/strategydemo/strategyEngine.py +++ b/vn.strategy/strategydemo/strategyEngine.py @@ -90,6 +90,8 @@ class Trade(object): self.offset = None # 开平 self.price = 0 # 成交价 self.volume = 0 # 成交量 + + self.tradeTime = '' # 成交时间 ######################################################################## @@ -144,8 +146,8 @@ class StrategyEngine(object): #---------------------------------------------------------------------- def __init__(self, eventEngine, mainEngine, backtesting=False): """Constructor""" - self.__eventEngine = eventEngine - self.mainEngine = mainEngine + self.__eventEngine = eventEngine # 引用事件引擎 + self.mainEngine = mainEngine # 主引擎,在回测中,为backtestingEngin,在交易中,为demoEngine self.backtesting = backtesting # 是否在进行回测 # 获取代表今日的datetime @@ -192,7 +194,7 @@ class StrategyEngine(object): #---------------------------------------------------------------------- def createStrategy(self, strategyName, strategySymbol, strategyClass, strategySetting): - """创建策略""" + """创建策略(实例化)""" strategy = strategyClass(strategyName, strategySymbol, self) self.writeLog(u"创建策略:{0}".format(strategyName)) @@ -203,7 +205,7 @@ class StrategyEngine(object): # 订阅合约行情,注意这里因为是CTP,所以ExchangeID可以忽略 self.mainEngine.subscribe(strategySymbol, None) - + # 注册策略监听 self.registerStrategy(strategySymbol, strategy) @@ -225,7 +227,8 @@ class StrategyEngine(object): # symbol = data['InstrumentID'] # data['date'] = self.today # self.__mongoTickDB[symbol].insert(data) - # # + # + # #---------------------------------------------------------------------- # def loadTickFromMongo(self, symbol, startDate, endDate=None): # """从MongoDB中读取Tick数据""" @@ -257,7 +260,8 @@ class StrategyEngine(object): # symbol = data['InstrumentID'] # data['date'] = self.today # self.__mongoTickDB[symbol].insert(data) - pass; + pass + #---------------------------------------------------------------------- def loadTickFromMysql(self, symbol, startDate, endDate=None): """从MysqlDB中读取Tick数据""" @@ -295,7 +299,7 @@ class StrategyEngine(object): cx = cur.fetchall() - self.writeLog(u'历史TICK数据载入完成,共{0}条'.format(count)) + self.writeLog(u'历史TICK数据载入完成,{1}~{2},共{0}条'.format(count,startDate,endDate)) return cx else: @@ -303,6 +307,7 @@ class StrategyEngine(object): #---------------------------------------------------------------------- def getMysqlDeltaDate(self,symbol, startDate, decreaseDays): + """从mysql获取交易日天数差""" try: if self.__mysqlConnected: @@ -329,7 +334,7 @@ class StrategyEngine(object): self.writeLog(u'MysqlDB未连接,请检查') except MySQLdb.Error, e: - self.writeLog(u'MysqlDB载入数据失败,请检查.Error %d: %s'.format(e.arg[0],e.arg[1])) + self.writeLog(u'MysqlDB载入数据失败,请检查.Error %s'.format(e)) td = timedelta(days=3) @@ -432,7 +437,7 @@ class StrategyEngine(object): # 以当日涨停价发出限价单买入 print u'sendOrder({0},{1},{2},{3},{4}'.format(symbol,'Direction_Buy',so.offset,upperLimit,so.volume) ref = self.sendOrder(symbol, DIRECTION_BUY, so.offset, - upperLimit, so.volume, strategy) + upperLimit, so.volume,tick.time, strategy) # 触发策略的止损单发出更新 so.strategy.onStopOrder(ref) @@ -444,7 +449,7 @@ class StrategyEngine(object): elif so.direction == DIRECTION_SELL and lastPrice <= so.price: print u'sendOrder({0},{1},{2},{3},{4}'.format(symbol,'Direction_Sell',so.offset,upperLimit,so.volume) ref = self.sendOrder(symbol, DIRECTION_SELL, so.offset, - lowerLimit, so.volume, strategy) + lowerLimit, so.volume,tick.time, strategy) so.strategy.onStopOrder(ref) @@ -527,7 +532,7 @@ class StrategyEngine(object): #print u'strategyEngine.py updateTrade() end.' #---------------------------------------------------------------------- - def sendOrder(self, symbol, direction, offset, price, volume, strategy): + def sendOrder(self, symbol, direction, offset, price, volume, orderTime, strategy): """ 发单(仅允许限价单) symbol:合约代码 @@ -535,6 +540,7 @@ class StrategyEngine(object): offset:开平,OFFSET_OPEN/OFFSET_CLOSE price:下单价格 volume:下单手数 + orderTime:下单时间(回归测试使用) strategy:策略对象 """ @@ -542,14 +548,15 @@ class StrategyEngine(object): contract = self.mainEngine.selectInstrument(symbol) if contract: - #放入事件引擎 + #调用主引擎的发单函数 ref = self.mainEngine.sendOrder(symbol, contract['ExchangeID'], price, PRICETYPE_LIMIT, volume, direction, - offset) + offset, + orderTime) self.__dictOrderRefStrategy[ref] = strategy @@ -568,6 +575,7 @@ class StrategyEngine(object): contract = self.mainEngine.selectInstrument(symbol) if contract: + #调用主引擎的撤单函数 self.mainEngine.cancelOrder(symbol, contract['ExchangeID'], orderRef, @@ -616,7 +624,7 @@ class StrategyEngine(object): #---------------------------------------------------------------------- def placeStopOrder(self, symbol, direction, offset, price, volume, strategy): """ - 下停止单(运行于本地引擎中) + 下停止单(止损单,运行于本地引擎中) 注意这里的price是停止单的触发价 """ # 创建止损单对象 @@ -741,7 +749,7 @@ class StrategyTemplate(object): raise NotImplementedError #---------------------------------------------------------------------- - def buy(self, price, volume, stopOrder=False): + def buy(self, price, volume, stopOrder=False, orderTime=datetime.now()): """买入开仓""" print u'strategyEngine.py StrategyTemplate({3}) buy() begin. symbol:{0}, price:{1},volume:{2}'.format(self.symbol, price, volume, self.name) @@ -752,7 +760,7 @@ class StrategyTemplate(object): return so else: ref = self.engine.sendOrder(self.symbol, DIRECTION_BUY, - OFFSET_OPEN, price, volume, self) + OFFSET_OPEN, price, volume, orderTime, self) return ref else: return None @@ -760,7 +768,7 @@ class StrategyTemplate(object): #print (u'strategyEngine.py buy() end.') #---------------------------------------------------------------------- - def cover(self, price, volume, stopOrder=False): + def cover(self, price, volume, stopOrder=False, orderTime=datetime.now()): """买入平仓""" print u'strategyEngine.py StrategyTemplate({3}) cover() begin. symbol:{0}, price:{1},volume:{2}'.format(self.symbol, price, volume, self.name) @@ -772,14 +780,14 @@ class StrategyTemplate(object): return so else: ref = self.engine.sendOrder(self.symbol, DIRECTION_BUY, - OFFSET_CLOSE, price, volume, self) + OFFSET_CLOSE, price, volume, orderTime, self) return ref else: return None print (u'strategyEngine.py cover() end.') #---------------------------------------------------------------------- - def sell(self, price, volume, stopOrder=False): + def sell(self, price, volume, stopOrder=False, orderTime=datetime.now()): """卖出平仓""" print u'strategyEngine.py StrategyTemplate({3}) sell() begin. symbol:{0}, price:{1},volume:{2}'.format(self.symbol, price, volume, self.name) @@ -791,14 +799,14 @@ class StrategyTemplate(object): return so else: ref = self.engine.sendOrder(self.symbol, DIRECTION_SELL, - OFFSET_CLOSE, price, volume, self) + OFFSET_CLOSE, price, volume, orderTime, self) return ref else: return None #print u'strategyEngine.py sell() end.' #---------------------------------------------------------------------- - def short(self, price, volume, stopOrder=False): + def short(self, price, volume, stopOrder=False, orderTime=datetime.now()): """卖出开仓""" print u'strategyEngine.py StrategyTemplate({3}) short() begin. symbol:{0}, price:{1},volume:{2}'.format(self.symbol, price, volume, self.name) if self.trading: @@ -808,7 +816,7 @@ class StrategyTemplate(object): return so else: ref = self.engine.sendOrder(self.symbol, DIRECTION_SELL, - OFFSET_OPEN, price, volume, self) + OFFSET_OPEN, price, volume, orderTime, self) return ref else: return None