diff --git a/vn.trader/ctaAlgo/ctaBacktesting.py b/vn.trader/ctaAlgo/ctaBacktesting.py index d8da0797..3415f94a 100644 --- a/vn.trader/ctaAlgo/ctaBacktesting.py +++ b/vn.trader/ctaAlgo/ctaBacktesting.py @@ -25,6 +25,7 @@ from vtGateway import VtOrderData, VtTradeData from vtFunction import loadMongoSetting import logging import copy +import pandas as pd ######################################################################## class BacktestingEngine(object): @@ -89,7 +90,11 @@ class BacktestingEngine(object): self.limitOrderDict = OrderedDict() # 限价单字典 self.workingLimitOrderDict = OrderedDict() # 活动限价单字典,用于进行撮合用 self.limitOrderCount = 0 # 限价单编号 - + + # 持仓缓存字典 + # key为vtSymbol,value为PositionBuffer对象 + self.posBufferDict = {} + self.tradeCount = 0 # 成交编号 self.tradeDict = OrderedDict() # 成交字典 @@ -420,9 +425,8 @@ class BacktestingEngine(object): def __dataToTick(self, data): """ 数据库查询返回的data结构,转换为tick对象 - added by IncenseLee - """ - tick = CtaTickData() + added by IncenseLee """ + tick = CtaTickData() symbol = data['InstrumentID'] tick.symbol = symbol @@ -775,6 +779,349 @@ class BacktestingEngine(object): cache.close() return True + def runBackTestingWithNonStrArbTickFile(self, leg1MainPath, leg2MainPath, leg1Symbol,leg2Symbol): + """运行套利回测(使用本地tickcsv数据) + 参数: + leg1MainPath: leg1合约所在的市场路径 + leg2MainPath: leg2合约所在的市场路径 + leg1Symbol: leg1合约 + Leg2Symbol:leg2合约 + added by IncenseLee + 原始的tick,分别存放在白天目录1和夜盘目录2中,每天都有各个合约的数据 + Z:\ticks\SHFE\201606\RB\0601\ + RB1610.txt + RB1701.txt + .... + Z:\ticks\SHFE_night\201606\RB\0601 + RB1610.txt + RB1701.txt + .... + + 夜盘目录为自然日,不是交易日。 + + 按照回测的开始日期,到结束日期,循环每一天。 + 每天优先读取日盘数据,再读取夜盘数据。 + 读取eg1(如RB1610),读取Leg2(如RB701),根据两者tick的时间优先顺序,逐一tick灌输到策略的onTick中。 + """ + self.capital = self.initCapital # 更新设置期初资金 + + if not self.dataStartDate: + self.writeCtaLog(u'回测开始日期未设置。') + return + # RB + if len(self.symbol)<1: + self.writeCtaLog(u'回测对象未设置。') + return + + if not self.dataEndDate: + self.dataEndDate = datetime.today() + + #首先根据回测模式,确认要使用的数据类 + if self.mode == self.BAR_MODE: + self.writeCtaLog(u'本回测仅支持tick模式') + return + + testdays = (self.dataEndDate - self.dataStartDate).days + + if testdays < 1: + self.writeCtaLog(u'回测时间不足') + return + + for i in range(0, testdays): + + testday = self.dataStartDate + timedelta(days = i) + + self.output(u'回测日期:{0}'.format(testday)) + + # 加载运行白天数据 + self.__loadNotStdArbTicks(leg1MainPath, leg2MainPath, testday, leg1Symbol,leg2Symbol) + + # 加载运行夜盘数据 + self.__loadNotStdArbTicks(leg1MainPath+'_night', leg2MainPath+'_night', testday, leg1Symbol, leg2Symbol) + + def __loadTicksFromFile(self, filepath, tickDate, vtSymbol): + """从文件中读取tick""" + # 先读取数据到Dict,以日期时间为key + ticks = OrderedDict() + + if not os.path.isfile(filepath): + self.writeCtaLog(u'{0}文件不存在'.format(filepath)) + return ticks + dt = None + csvReadFile = file(filepath, 'rb') + + reader = csv.DictReader(csvReadFile, delimiter=",") + self.writeCtaLog(u'加载{0}'.format(filepath)) + for row in reader: + tick = CtaTickData() + + tick.vtSymbol = vtSymbol + tick.symbol = vtSymbol + + tick.date = tickDate.strftime('%Y%m%d') + tick.tradingDay = tick.date + tick.time = row['Time'] + + try: + tick.datetime = datetime.strptime(tick.date + ' ' + tick.time, '%Y%m%d %H:%M:%S.%f') + except Exception as ex: + self.writeCtaError(u'日期转换错误:{0},{1}:{2}'.format(tick.date + ' ' + tick.time, Exception, ex)) + continue + + # 修正毫秒 + if tick.datetime.replace(microsecond=0) == dt: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick.datetime = tick.datetime.replace(microsecond=500) + tick.time = tick.datetime.strftime('%H:%M:%S.%f') + + else: + tick.datetime = tick.datetime.replace(microsecond=0) + tick.time = tick.datetime.strftime('%H:%M:%S.%f') + + dt = tick.datetime + + tick.lastPrice = float(row['LastPrice']) + tick.volume = int(float(row['LVolume'])) + tick.bidPrice1 = float(row['BidPrice']) # 叫买价(价格低) + tick.bidVolume1 = int(float(row['BidVolume'])) + tick.askPrice1 = float(row['AskPrice']) # 叫卖价(价格高) + tick.askVolume1 = int(float(row['AskVolume'])) + + # 排除涨停/跌停的数据 + if (tick.bidPrice1 == float('1.79769E308') and tick.bidVolume1 == 0) \ + or (tick.askPrice1 == float('1.79769E308') and tick.askVolume1 == 0): + continue + + dtStr = tick.date + ' ' + tick.time + if dtStr in ticks: + self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) + else: + ticks[dtStr] = tick + + return ticks + + def __loadNotStdArbTicks(self, leg1MainPath,leg2MainPath, testday, leg1Symbol, leg2Symbol): + + self.writeCtaLog(u'加载回测日期:{0}的价差tick'.format( testday)) + + leg1File = u'z:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}.txt' \ + .format(leg1MainPath, testday.strftime('%Y%m'), self.symbol, testday.strftime('%m%d'), leg1Symbol) + if not os.path.isfile(leg1File): + self.writeCtaLog(u'{0}文件不存在'.format(leg1File)) + return + + leg2File = u'z:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}.txt' \ + .format(leg2MainPath, testday.strftime('%Y%m'), self.symbol, testday.strftime('%m%d'), leg2Symbol) + if not os.path.isfile(leg2File): + self.writeCtaLog(u'{0}文件不存在'.format(leg2File)) + return + + leg1Ticks = self.__loadTicksFromFile(filepath=leg1File,tickDate= testday, vtSymbol=leg1Symbol) + if len(leg1Ticks) == 0: + self.writeCtaLog(u'{0}读取tick数为空'.format(leg1File)) + return + + leg2Ticks = self.__loadTicksFromFile(filepath=leg2File, tickDate=testday, vtSymbol=leg2Symbol) + if len(leg2Ticks) == 0: + self.writeCtaLog(u'{0}读取tick数为空'.format(leg1File)) + return + + leg1_tick = None + leg2_tick = None + + while not (len(leg1Ticks) == 0 or len(leg2Ticks) == 0): + if leg1_tick is None and len(leg1Ticks) > 0: + leg1_tick = leg1Ticks.popitem(last=False) + if leg2_tick is None and len(leg2Ticks) > 0: + leg2_tick = leg2Ticks.popitem(last=False) + + if leg1_tick is None and leg2_tick is not None: + self.newTick(leg2_tick[1]) + leg2_tick = None + elif leg1_tick is not None and leg2_tick is None: + self.newTick(leg1_tick[1]) + leg1_tick = None + elif leg1_tick is not None and leg2_tick is not None: + leg1 = leg1_tick[1] + leg2 = leg2_tick[1] + if leg1.datetime <= leg2.datetime: + self.newTick(leg1) + leg1_tick = None + else: + self.newTick(leg2) + leg2_tick = None + + def runBackTestingWithNonStrArbTickFile2(self, leg1MainPath, leg2MainPath, leg1Symbol, leg2Symbol): + """运行套利回测(使用本地tickcsv数据,数据从taobao标普购买) + 参数: + leg1MainPath: leg1合约所在的市场路径 + leg2MainPath: leg2合约所在的市场路径 + leg1Symbol: leg1合约 + Leg2Symbol:leg2合约 + added by IncenseLee + 原始的tick,存放在相应市场下每天的目录中,目录包含市场各个合约的数据 + E:\ticks\SQ\201606\20160601\ + RB10.csv + RB01.csv + .... + + 目录为交易日。 + 按照回测的开始日期,到结束日期,循环每一天。 + + 读取eg1(如RB1610),读取Leg2(如RB701),根据两者tick的时间优先顺序,逐一tick灌输到策略的onTick中。 + """ + self.capital = self.initCapital # 更新设置期初资金 + + if not self.dataStartDate: + self.writeCtaLog(u'回测开始日期未设置。') + return + # RB + if len(self.symbol) < 1: + self.writeCtaLog(u'回测对象未设置。') + return + + if not self.dataEndDate: + self.dataEndDate = datetime.today() + + # 首先根据回测模式,确认要使用的数据类 + if self.mode == self.BAR_MODE: + self.writeCtaLog(u'本回测仅支持tick模式') + return + + testdays = (self.dataEndDate - self.dataStartDate).days + + if testdays < 1: + self.writeCtaLog(u'回测时间不足') + return + + for i in range(0, testdays): + testday = self.dataStartDate + timedelta(days=i) + + self.output(u'回测日期:{0}'.format(testday)) + + # 加载运行每天数据 + self.__loadNotStdArbTicks2(leg1MainPath, leg2MainPath, testday, leg1Symbol, leg2Symbol) + + + def __loadTicksFromFile2(self, filepath, tickDate, vtSymbol): + """从csv文件中UnicodeDictReader读取tick""" + # 先读取数据到Dict,以日期时间为key + ticks = OrderedDict() + + if not os.path.isfile(filepath): + self.writeCtaLog(u'{0}文件不存在'.format(filepath)) + return ticks + dt = None + csvReadFile = file(filepath, 'rb') + df = pd.read_csv(filepath, encoding='gbk') + df.columns = ['date', 'time', 'lastPrice', 'lastVolume', 'totalInterest', 'position', + 'bidPrice1', 'bidVolume1', 'bidPrice2', 'bidVolume2', 'bidPrice3', 'bidVolume3', + 'askPrice1', 'askVolume1', 'askPrice2', 'askVolume2', 'askPrice3', 'askVolume3','BS'] + self.writeCtaLog(u'加载{0}'.format(filepath)) + for i in range(0,len(df)): + #日期, 时间, 成交价, 成交量, 总量, 属性(持仓增减), B1价, B1量, B2价, B2量, B3价, B3量, S1价, S1量, S2价, S2量, S3价, S3量, BS + # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 + row = df.iloc[i].to_dict() + + tick = CtaTickData() + + tick.vtSymbol = vtSymbol + tick.symbol = vtSymbol + + tick.date = row['date'] + tick.tradingDay = tickDate.strftime('%Y%m%d') + tick.time = row['time'] + + try: + tick.datetime = datetime.strptime(tick.date + ' ' + tick.time, '%Y-%m-%d %H:%M:%S') + except Exception as ex: + self.writeCtaError(u'日期转换错误:{0},{1}:{2}'.format(tick.date + ' ' + tick.time, Exception, ex)) + continue + + tick.date = tick.datetime.strftime('%Y%m%d') + # 修正毫秒 + if tick.datetime.replace(microsecond=0) == dt: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick.datetime = tick.datetime.replace(microsecond=500) + tick.time = tick.datetime.strftime('%H:%M:%S.%f') + + else: + tick.datetime = tick.datetime.replace(microsecond=0) + tick.time = tick.datetime.strftime('%H:%M:%S.%f') + + dt = tick.datetime + + tick.lastPrice = float(row['lastPrice']) + tick.volume = int(float(row['lastVolume'])) + tick.bidPrice1 = float(row['bidPrice1']) # 叫买价(价格低) + tick.bidVolume1 = int(float(row['bidVolume1'])) + tick.askPrice1 = float(row['askPrice1']) # 叫卖价(价格高) + tick.askVolume1 = int(float(row['askVolume1'])) + + # 排除涨停/跌停的数据 + if (tick.bidPrice1 == float('1.79769E308') and tick.bidVolume1 == 0) \ + or (tick.askPrice1 == float('1.79769E308') and tick.askVolume1 == 0): + continue + + dtStr = tick.date + ' ' + tick.time + if dtStr in ticks: + self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) + else: + ticks[dtStr] = tick + + return ticks + + def __loadNotStdArbTicks2(self, leg1MainPath, leg2MainPath, testday, leg1Symbol, leg2Symbol): + + self.writeCtaLog(u'加载回测日期:{0}的价差tick'.format(testday)) + + # E:\Ticks\SQ\2014\201401\20140102\ag01_20140102.csv + leg1File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \ + .format(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), self.symbol, leg1Symbol[-2:]) + if not os.path.isfile(leg1File): + self.writeCtaLog(u'{0}文件不存在'.format(leg1File)) + return + + leg2File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \ + .format(leg2MainPath,testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), self.symbol, leg2Symbol[-2:]) + if not os.path.isfile(leg2File): + self.writeCtaLog(u'{0}文件不存在'.format(leg2File)) + return + + leg1Ticks = self.__loadTicksFromFile2(filepath=leg1File, tickDate=testday, vtSymbol=leg1Symbol) + if len(leg1Ticks) == 0: + self.writeCtaLog(u'{0}读取tick数为空'.format(leg1File)) + return + + leg2Ticks = self.__loadTicksFromFile2(filepath=leg2File, tickDate=testday, vtSymbol=leg2Symbol) + if len(leg2Ticks) == 0: + self.writeCtaLog(u'{0}读取tick数为空'.format(leg1File)) + return + + leg1_tick = None + leg2_tick = None + + while not (len(leg1Ticks) == 0 or len(leg2Ticks) == 0): + if leg1_tick is None and len(leg1Ticks) > 0: + leg1_tick = leg1Ticks.popitem(last=False) + if leg2_tick is None and len(leg2Ticks) > 0: + leg2_tick = leg2Ticks.popitem(last=False) + + if leg1_tick is None and leg2_tick is not None: + self.newTick(leg2_tick[1]) + leg2_tick = None + elif leg1_tick is not None and leg2_tick is None: + self.newTick(leg1_tick[1]) + leg1_tick = None + elif leg1_tick is not None and leg2_tick is not None: + leg1 = leg1_tick[1] + leg2 = leg2_tick[1] + if leg1.datetime <= leg2.datetime: + self.newTick(leg1) + leg1_tick = None + else: + self.newTick(leg2) + leg2_tick = None #---------------------------------------------------------------------- def runBackTestingWithBarFile(self, filename): """运行回测(使用本地csv数据) @@ -831,6 +1178,9 @@ class BacktestingEngine(object): try: bar = CtaBarData() + bar.symbol = self.symbol + bar.vtSymbol = self.symbol + # 从tb导出的csv文件 #bar.open = float(row['Open']) #bar.high = float(row['High']) @@ -1130,17 +1480,19 @@ class BacktestingEngine(object): sellCrossPrice = self.bar.high # 若卖出方向限价单价格低于该价格,则会成交 buyBestCrossPrice = self.bar.open # 在当前时间点前发出的买入委托可能的最优成交价 sellBestCrossPrice = self.bar.open # 在当前时间点前发出的卖出委托可能的最优成交价 + vtSymbol = self.bar.vtSymbol else: buyCrossPrice = self.tick.askPrice1 sellCrossPrice = self.tick.bidPrice1 buyBestCrossPrice = self.tick.askPrice1 sellBestCrossPrice = self.tick.bidPrice1 + vtSymbol = self.tick.vtSymbol # 遍历限价单字典中的所有限价单 for orderID, order in self.workingLimitOrderDict.items(): # 判断是否会成交 - buyCross = order.direction==DIRECTION_LONG and order.price >= buyCrossPrice - sellCross = order.direction==DIRECTION_SHORT and order.price <= sellCrossPrice + buyCross = order.direction == DIRECTION_LONG and order.price >= buyCrossPrice and vtSymbol == order.vtSymbol + sellCross = order.direction == DIRECTION_SHORT and order.price <= sellCrossPrice and vtSymbol == order.vtSymbol # 如果发生了成交 if buyCross or sellCross: @@ -1200,16 +1552,18 @@ class BacktestingEngine(object): buyCrossPrice = self.bar.high # 若买入方向停止单价格低于该价格,则会成交 sellCrossPrice = self.bar.low # 若卖出方向限价单价格高于该价格,则会成交 bestCrossPrice = self.bar.open # 最优成交价,买入停止单不能低于,卖出停止单不能高于 + vtSymbol = self.bar.vtSymbol else: buyCrossPrice = self.tick.lastPrice sellCrossPrice = self.tick.lastPrice bestCrossPrice = self.tick.lastPrice + vtSymbol = self.tick.vtSymbol # 遍历停止单字典中的所有停止单 for stopOrderID, so in self.workingStopOrderDict.items(): # 判断是否会成交 - buyCross = so.direction==DIRECTION_LONG and so.price <= buyCrossPrice - sellCross = so.direction==DIRECTION_SHORT and so.price >= sellCrossPrice + buyCross = so.direction == DIRECTION_LONG and so.price <= buyCrossPrice and vtSymbol == so.vtSymbol + sellCross = so.direction == DIRECTION_SHORT and so.price >= sellCrossPrice and vtSymbol == so.vtSymbol # 如果发生了成交 if buyCross or sellCross: @@ -1334,9 +1688,13 @@ class BacktestingEngine(object): if len(shortTrade)==0: self.writeCtaError(u'异常,没有开空仓的数据') break - + pop_indexs = [i for i, val in enumerate(shortTrade) if val.vtSymbol == trade.vtSymbol] + if len(pop_indexs) < 1: + self.writeCtaLog(u'没有对应的symbol:{0}开空仓数据'.format(trade.vtSymbol)) + break + pop_index = pop_indexs[0] # 从未平仓的空头交易 - entryTrade = shortTrade.pop(0) + entryTrade = shortTrade.pop(pop_index) # 开空volume,不大于平仓volume if coverVolume >= entryTrade.volume: @@ -1347,6 +1705,7 @@ class BacktestingEngine(object): groupId=gId, fixcommission=self.fixCommission) t = {} + t['vtSymbol'] = entryTrade.vtSymbol t['OpenTime'] = entryTrade.tradeTime t['OpenPrice'] = entryTrade.price t['Direction'] = u'Short' @@ -1406,6 +1765,7 @@ class BacktestingEngine(object): groupId=gId, fixcommission=self.fixCommission) t = {} + t['vtSymbol'] = entryTrade.vtSymbol t['OpenTime'] = entryTrade.tradeTime t['OpenPrice'] = entryTrade.price t['Direction'] = u'Short' @@ -1463,11 +1823,18 @@ class BacktestingEngine(object): self.writeCtaLog(u'多平:{0}'.format(sellVolume)) while sellVolume > 0: - if len(longTrade)==0: + if len(longTrade) == 0: self.writeCtaError(u'异常,没有开多单') break - entryTrade = longTrade.pop(0) + pop_indexs = [i for i, val in enumerate(longTrade) if val.vtSymbol == trade.vtSymbol] + if len(pop_indexs) < 1: + self.writeCtaLog(u'没有对应的symbol{0}开多仓数据'.format(trade.vtSymbol)) + break + + pop_index = pop_indexs[0] + + entryTrade = longTrade.pop(pop_index) # 开多volume,不大于平仓volume if sellVolume >= entryTrade.volume: @@ -1479,6 +1846,7 @@ class BacktestingEngine(object): groupId=gId, fixcommission=self.fixCommission) t = {} + t['vtSymbol'] = entryTrade.vtSymbol t['OpenTime'] = entryTrade.tradeTime t['OpenPrice'] = entryTrade.price t['Direction'] = u'Long' @@ -1534,6 +1902,7 @@ class BacktestingEngine(object): groupId=gId, fixcommission=self.fixCommission) t = {} + t['vtSymbol'] = entryTrade.vtSymbol t['OpenTime'] = entryTrade.tradeTime t['OpenPrice'] = entryTrade.price t['Direction'] = u'Long' @@ -1866,7 +2235,7 @@ class BacktestingEngine(object): import csv csvWriteFile = file(csvOutputFile, 'wb') - fieldnames = ['OpenTime', 'OpenPrice', 'Direction', 'CloseTime', 'ClosePrice', 'Volume', 'Profit'] + fieldnames = ['vtSymbol','OpenTime', 'OpenPrice', 'Direction', 'CloseTime', 'ClosePrice', 'Volume', 'Profit'] writer = csv.DictWriter(f=csvWriteFile, fieldnames=fieldnames, dialect='excel') writer.writeheader()