diff --git a/vn.trader/ctaStrategy/ctaBacktesting.py b/vn.trader/ctaStrategy/ctaBacktesting.py index b2256b69..d5139d6d 100644 --- a/vn.trader/ctaStrategy/ctaBacktesting.py +++ b/vn.trader/ctaStrategy/ctaBacktesting.py @@ -6,6 +6,10 @@ ''' from __future__ import division +import sys +import os +cta_engine_path = os.path.abspath(os.path.dirname(__file__)) + from datetime import datetime, timedelta from collections import OrderedDict from itertools import product @@ -19,7 +23,7 @@ from vtFunction import loadMongoSetting from eventEngine import * -import MySQLdb +#import MySQLdb import json import os import sys @@ -706,7 +710,8 @@ class BacktestingEngine(object): dtStr = tick.date + ' ' + tick.time if dtStr in leg2Ticks: - self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) + pass + #self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) else: leg2Ticks[dtStr] = tick @@ -823,8 +828,152 @@ class BacktestingEngine(object): cache.close() return True + # ---------------------------------------------------------------------- + + def runBackTestingWithArbTickFile2(self, leg1MainPath,leg2MainPath, arbSymbol): + """运行套利回测(使用本地tick csv数据) + 参数:套利代码 SP rb1610&rb1701 + added by IncenseLee + 原始的tick,存放在相应市场下每天的目录中,目录包含市场各个合约的数据 + E:\ticks\SQ\201606\20160601\ + RB10.csv + RB01.csv + .... + + 目录为交易日。 + 按照回测的开始日期,到结束日期,循环每一天。 + + 读取eg1(如RB1610),读取Leg2(如RB701),合并成价差tick,灌输到策略的onTick中。 + """ + self.capital = self.initCapital # 更新设置期初资金 + + if len(arbSymbol) < 1: + self.writeCtaLog(u'套利合约为空') + return + + if not (arbSymbol.upper().index("SP") == 0 and arbSymbol.index(" ") > 0 and arbSymbol.index("&") > 0): + self.writeCtaLog(u'套利合约格式不符合') + return + + # 获得Leg1,leg2 + legs = arbSymbol[arbSymbol.index(" "):] + leg1 = legs[1:legs.index("&")] + leg2 = legs[legs.index("&") + 1:] + self.writeCtaLog(u'Leg1:{0},Leg2:{1}'.format(leg1, leg2)) + + if not self.dataStartDate: + self.writeCtaLog(u'回测开始日期未设置。') + return + # RB + if len(self.symbol) < 1: + self.writeCtaLog(u'回测对象未设置。') + return + + if not self.dataEndDate: + self.dataEndDate = datetime.today() + + # 首先根据回测模式,确认要使用的数据类 + if self.mode == self.BAR_MODE: + self.writeCtaLog(u'本回测仅支持tick模式') + return + + testdays = (self.dataEndDate - self.dataStartDate).days + + if testdays < 1: + self.writeCtaLog(u'回测时间不足') + return + + for i in range(0, testdays): + testday = self.dataStartDate + timedelta(days=i) + + self.output(u'回测日期:{0}'.format(testday)) + + # 白天数据 + self.__loadArbTicks2(leg1MainPath, leg2MainPath, testday, leg1, leg2) + + def __loadArbTicks2(self, leg1MainPath, leg2MainPath, testday, leg1Symbol, leg2Symbol): + """加载taobao csv格式tick产生的价差合约""" + + self.writeCtaLog(u'加载回测日期:{0}\{1}的价差tick'.format(leg1MainPath, testday)) + p = re.compile(r"([A-Z]+)[0-9]+", re.I) + + leg1_shortSymbol = p.match(leg1Symbol) + leg2_shortSymbol = p.match(leg2Symbol) + + if leg1_shortSymbol is None or leg2_shortSymbol is None: + self.writeCtaLog(u'{0},{1}不能正则分解'.format(leg1Symbol, leg2Symbol)) + return + + leg1_shortSymbol = leg1_shortSymbol.group(1) + leg2_shortSymbol = leg2_shortSymbol.group(1) + + arbTicks = [] + + leg1File = os.path.abspath( + os.path.join(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), + '{0}{1}_{2}.csv'.format(leg1_shortSymbol, leg1Symbol[-2:], testday.strftime('%Y%m%d')))) + + if not os.path.isfile(leg1File): + self.writeCtaLog(u'{0}文件不存在'.format(leg1File)) + return + + leg2File = os.path.abspath( + os.path.join(leg2MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), + '{0}{1}_{2}.csv'.format(leg2_shortSymbol, leg2Symbol[-2:], testday.strftime('%Y%m%d')))) + + if not os.path.isfile(leg2File): + self.writeCtaLog(u'{0}文件不存在'.format(leg2File)) + return + + # 先读取leg2的数据到目录,以日期时间为key + leg2Ticks = self.__loadTicksFromFile2(filepath=leg2File,tickDate=testday,vtSymbol=leg2Symbol) + + leg1Ticks = self.__loadTicksFromFile2(filepath=leg1File, tickDate=testday, vtSymbol=leg1Symbol) + + for dtStr,leg1_tick in leg1Ticks.iteritems(): + + if dtStr in leg2Ticks: + arbTick = CtaTickData() + + leg2_tick = leg2Ticks[dtStr] + + arbTick.vtSymbol = self.symbol + arbTick.symbol = self.symbol + arbTick.date = leg1_tick.date + arbTick.time = leg1_tick.time + arbTick.datetime = leg1_tick.datetime + arbTick.tradingDay = leg1_tick.tradingDay + + arbTick.lastPrice = EMPTY_FLOAT + arbTick.volume = EMPTY_INT + + # 排除涨停/跌停的数据 + if ((leg1_tick.askPrice1 == float('1.79769E308') or leg1_tick.askPrice1 == 0) and leg1_tick.askVolume1 == 0) \ + or ((leg1_tick.bidPrice1 == float('1.79769E308') or leg1_tick.bidPrice1 == 0) and leg1_tick.bidVolume1 == 0): + continue + + if ((leg2_tick.askPrice1 == float('1.79769E308') or leg2_tick.askPrice1 == 0) and leg2_tick.askVolume1 == 0) \ + or ((leg2_tick.bidPrice1 == float('1.79769E308') or leg2_tick.bidPrice1 == 0) and leg2_tick.bidVolume1 == 0): + continue + + # 叫卖价差=leg1.askPrice1 - leg2.bidPrice1,volume为两者最小 + arbTick.askPrice1 = leg1_tick.askPrice1 - leg2_tick.bidPrice1 + arbTick.askVolume1 = min(leg1_tick.askVolume1, leg2_tick.bidVolume1) + + # 叫买价差=leg1.bidPrice1 - leg2.askPrice1,volume为两者最小 + arbTick.bidPrice1 = leg1_tick.bidPrice1 - leg2_tick.askPrice1 + arbTick.bidVolume1 = min(leg1_tick.bidVolume1, leg2_tick.askVolume1) + + arbTicks.append(arbTick) + + del leg2Ticks[dtStr] + + for t in arbTicks: + # 推送到策略中 + self.newTick(t) + def runBackTestingWithNonStrArbTickFile(self, leg1MainPath, leg2MainPath, leg1Symbol,leg2Symbol): - """运行套利回测(使用本地tickcsv数据) + """运行套利回测(使用本地tick txt数据) 参数: leg1MainPath: leg1合约所在的市场路径 leg2MainPath: leg2合约所在的市场路径 @@ -941,7 +1090,8 @@ class BacktestingEngine(object): dtStr = tick.date + ' ' + tick.time if dtStr in ticks: - self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) + pass + #self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) else: ticks[dtStr] = tick @@ -1077,7 +1227,7 @@ class BacktestingEngine(object): return ticks dt = None csvReadFile = file(filepath, 'rb') - df = pd.read_csv(filepath, encoding='gbk') + df = pd.read_csv(filepath, encoding='gbk',parse_dates=False) df.columns = ['date', 'time', 'lastPrice', 'lastVolume', 'totalInterest', 'position', 'bidPrice1', 'bidVolume1', 'bidPrice2', 'bidVolume2', 'bidPrice3', 'bidVolume3', 'askPrice1', 'askVolume1', 'askPrice2', 'askVolume2', 'askPrice3', 'askVolume3','BS'] @@ -1129,7 +1279,9 @@ class BacktestingEngine(object): dtStr = tick.date + ' ' + tick.time if dtStr in ticks: - self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) + pass + + #self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr)) else: ticks[dtStr] = tick @@ -1151,14 +1303,23 @@ class BacktestingEngine(object): # E:\Ticks\SQ\2014\201401\20140102\ag01_20140102.csv - leg1File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \ - .format(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg1_shortSymbol, leg1Symbol[-2:]) + #leg1File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \ + # .format(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg1_shortSymbol, leg1Symbol[-2:]) + + leg1File = os.path.abspath( + os.path.join(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), + '{0}{1}_{2}.csv'.format(leg1_shortSymbol,leg1Symbol[-2:],testday.strftime('%Y%m%d')))) + if not os.path.isfile(leg1File): self.writeCtaLog(u'{0}文件不存在'.format(leg1File)) return - leg2File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \ - .format(leg2MainPath,testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg2_shortSymbol, leg2Symbol[-2:]) + #leg2File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \ + # .format(leg2MainPath,testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg2_shortSymbol, leg2Symbol[-2:]) + leg2File = os.path.abspath( + os.path.join(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), + '{0}{1}_{2}.csv'.format(leg2_shortSymbol, leg2Symbol[-2:], testday.strftime('%Y%m%d')))) + if not os.path.isfile(leg2File): self.writeCtaLog(u'{0}文件不存在'.format(leg2File)) return @@ -1779,11 +1940,14 @@ class BacktestingEngine(object): while coverVolume > 0: if len(shortTrade)==0: self.writeCtaError(u'异常,没有开空仓的数据') - break + raise RuntimeError(u'realtimeCalculate() Exception,没有开空仓的数据') + + pop_indexs = [i for i, val in enumerate(shortTrade) if val.vtSymbol == trade.vtSymbol] if len(pop_indexs) < 1: self.writeCtaError(u'没有对应的symbol:{0}开空仓数据'.format(trade.vtSymbol)) - break + raise RuntimeError(u'realtimeCalculate() Exception,没有对应的symbol:{0}开空仓数据'.format(trade.vtSymbol)) + pop_index = pop_indexs[0] # 从未平仓的空头交易 entryTrade = shortTrade.pop(pop_index) @@ -1946,12 +2110,14 @@ class BacktestingEngine(object): while sellVolume > 0: if len(longTrade) == 0: self.writeCtaError(u'异常,没有开多单') - break + raise RuntimeError(u'realtimeCalculate() Exception,没有开多单') + return pop_indexs = [i for i, val in enumerate(longTrade) if val.vtSymbol == trade.vtSymbol] if len(pop_indexs) < 1: self.writeCtaError(u'没有对应的symbol{0}开多仓数据,'.format(trade.vtSymbol)) - break + raise RuntimeError(u'realimeCalculate() Exception,没有对应的symbol{0}开多仓数据,'.format(trade.vtSymbol)) + return pop_index = pop_indexs[0] @@ -2159,27 +2325,19 @@ class BacktestingEngine(object): """实时计算交易结果2 支持多空仓位并存""" - if len(self.tradeDict) <1: - return + if len(self.tradeDict) < 1: return tradeids = self.tradeDict.keys() - resultDict = OrderedDict() # 交易结果记录 - - longTrade = [] # 未平仓的多头交易 - shortTrade = [] # 未平仓的空头交易 longid = EMPTY_STRING shortid = EMPTY_STRING - no_match_shortTrade = False - no_match_longTrade = False - # 对交易记录逐一处理 for tradeid in tradeids: try: trade = self.tradeDict[tradeid] except: - self.output(u'没有{0}的成交单'.format(tradeid)) + self.writeCtaError(u'没有{0}的成交单'.format(tradeid)) continue # buy trade @@ -2191,7 +2349,6 @@ class BacktestingEngine(object): # cover trade, elif trade.direction == DIRECTION_LONG and trade.offset == OFFSET_CLOSE: - gId = trade.tradeID # 交易组(多个平仓数为一组) gr = None # 组合的交易结果 @@ -2200,11 +2357,13 @@ class BacktestingEngine(object): while coverVolume > 0: if len(self.shortPosition) == 0: self.writeCtaError(u'异常!没有开空仓的数据') - break + raise Exception(u'realtimeCalculate2() Exception,没有开空仓的数据') + return pop_indexs = [i for i, val in enumerate(self.shortPosition) if val.vtSymbol == trade.vtSymbol] if len(pop_indexs) < 1: self.writeCtaError(u'异常,没有对应symbol:{0}的空单持仓'.format(trade.vtSymbol)) - break + raise Exception(u'realtimeCalculate2() Exception,没有对应symbol:{0}的空单持仓'.format(trade.vtSymbol)) + return pop_index = pop_indexs[0] # 从未平仓的空头交易 @@ -2245,7 +2404,6 @@ class BacktestingEngine(object): trade.tradeTime, tradeid, trade.price, entryTrade.volume, result.pnl) self.output(msg) - self.writeCtaLog(msg) if type(gr) == type(None): @@ -2256,7 +2414,6 @@ class BacktestingEngine(object): else: # 不属于组合 resultDict[entryTrade.dt] = result - # 删除平空交易单, del self.tradeDict[trade.tradeID] @@ -2336,7 +2493,6 @@ class BacktestingEngine(object): # Short Trade elif trade.direction == DIRECTION_SHORT and trade.offset == OFFSET_OPEN: - self.output(u'{0}空开:{1},{2}'.format(trade.vtSymbol, trade.volume, trade.price)) self.writeCtaLog(u'{0}空开:{1},{2}'.format(trade.vtSymbol, trade.volume, trade.price)) self.shortPosition.append(trade) @@ -2353,17 +2509,17 @@ class BacktestingEngine(object): while sellVolume > 0: if len(self.longPosition) == 0: self.writeCtaError(u'异常,没有开多单') - break + raise RuntimeError(u'realtimeCalculate2() Exception,没有开多单') + return pop_indexs = [i for i, val in enumerate(self.longPosition) if val.vtSymbol == trade.vtSymbol] if len(pop_indexs) < 1: - self.writeCtaError(u'没有对应的symbol{0}开多仓数据,'.format(trade.vtSymbol)) - break + self.writeCtaError(u'没有对应的symbol{0}多单数据,'.format(trade.vtSymbol)) + raise RuntimeError(u'realtimeCalculate2() Exception,没有对应的symbol{0}多单数据,'.format(trade.vtSymbol)) + return pop_index = pop_indexs[0] - entryTrade = self.longPosition.pop(pop_index) - # 开多volume,不大于平仓volume if sellVolume >= entryTrade.volume: self.writeCtaLog(u'{0}Sell Volume:{1} >= Entry Volume:{2}'.format(entryTrade.vtSymbol, sellVolume, entryTrade.volume)) @@ -2429,7 +2585,8 @@ class BacktestingEngine(object): # 开多volume,大于平仓volume,需要更新减少tradeDict的数量。 else: longVolume = entryTrade.volume -sellVolume - self.writeCtaLog(u'Long Volume:{0} > sell Volume:{1}'.format(entryTrade.volume,sellVolume)) + self.writeCtaLog(u'Entry Long Volume:{0} > Sell Volume:{1},Remain:{2}' + .format(entryTrade.volume, sellVolume, longVolume)) result = TradingResult(entryPrice=entryTrade.price, entryDt=entryTrade.dt, @@ -2453,10 +2610,11 @@ class BacktestingEngine(object): t['Profit'] = result.pnl self.exportTradeList.append(t) - self.writeCtaLog(u'Gid:{0} {1}[{2}:开多tid={3}:{4}]-[{5}.平多tid={6},{7},vol:{8}],净盈亏:{9}' - .format(gId, entryTrade.vtSymbol,entryTrade.tradeTime, longid, entryTrade.price, - trade.tradeTime, tradeid, trade.price, - sellVolume, result.pnl)) + msg = u'Gid:{0} {1}[{2}:开多tid={3}:{4}]-[{5}.平多tid={6},{7},vol:{8}],净盈亏:{9}'\ + .format(gId, entryTrade.vtSymbol,entryTrade.tradeTime, longid, entryTrade.price, + trade.tradeTime, tradeid, trade.price, sellVolume, result.pnl) + self.output(msg) + self.writeCtaLog(msg) # 减少开多volume,重新推进多单持仓列表中 entryTrade.volume = longVolume @@ -2557,9 +2715,11 @@ class BacktestingEngine(object): self.totalCommission += result.commission self.totalSlippage += result.slippage - self.output(u'[{5}],{6} Vol:{0},盈亏:{1},回撤:{2}/{3},权益:{4}'. - format(abs(result.volume), result.pnl, drawdown, - drawdownRate, self.capital, result.groupId, time)) + msg =u'[{0}] {1} 盈亏:{2},回撤:{3}/{4},权益:{5}'\ + .format(result.groupId, time, result.pnl, drawdown, + drawdownRate, self.capital, ) + self.output(msg) + self.writeCtaLog(msg) # 重新计算一次avaliable self.avaliable = self.capital - occupyMoney @@ -2961,7 +3121,8 @@ class BacktestingEngine(object): datetime.now().strftime('%Y%m%d_%H%M')))) fig = plt.gcf() fig.savefig(fig_file_name) - plt.show() + print (u'图表保存至:{0}'.format(fig_file_name)) + #plt.show() #---------------------------------------------------------------------- def putStrategyEvent(self, name): diff --git a/vn.trader/ctaStrategy/ctaEngine.py b/vn.trader/ctaStrategy/ctaEngine.py index 664c005d..0eee6481 100644 --- a/vn.trader/ctaStrategy/ctaEngine.py +++ b/vn.trader/ctaStrategy/ctaEngine.py @@ -608,6 +608,14 @@ class CtaEngine(object): # modifid by Incenselee 支持多个Symbol的订阅 symbols = strategy.vtSymbol.split(';') + # 判断是否有Leg1Symbol,Leg2Symbol 两个合约属性 + if hasattr(strategy, 'Leg1Symbol'): + if strategy.Leg1Symbol not in symbols: + symbols.append(strategy.Leg1Symbol) + if hasattr(strategy, 'Leg2Symbol'): + if strategy.Leg2Symbol not in symbols: + symbols.append(strategy.Leg2Symbol) + for symbol in symbols: self.writeCtaLog(u'添加合约{0}与策略的匹配目录'.format(symbol)) if symbol in self.tickStrategyDict: @@ -829,14 +837,16 @@ class CtaEngine(object): # ---------------------------------------------------------------------- def loadPosition(self): """从数据库载入策略的持仓情况""" - for strategy in self.strategyDict.values(): - flt = {'name': strategy.name, - 'vtSymbol': strategy.vtSymbol} - posData = self.mainEngine.dbQuery(POSITION_DB_NAME, strategy.className, flt) - - for d in posData: - strategy.pos = d['pos'] + try: + for strategy in self.strategyDict.values(): + flt = {'name': strategy.name, + 'vtSymbol': strategy.vtSymbol} + posData = self.mainEngine.dbQuery(POSITION_DB_NAME, strategy.className, flt) + for d in posData: + strategy.pos = d['pos'] + except: + self.writeCtaLog(u'loadPosition Exception') # ---------------------------------------------------------------------- def roundToPriceTick(self, priceTick, price): """取整价格到合约最小价格变动""" diff --git a/vn.trader/ctaStrategy/ctaGridTrade.py b/vn.trader/ctaStrategy/ctaGridTrade.py index e563269b..b0293afe 100644 --- a/vn.trader/ctaStrategy/ctaGridTrade.py +++ b/vn.trader/ctaStrategy/ctaGridTrade.py @@ -41,6 +41,8 @@ class CtaGrid(object): self.openDatetime = None self.orderDatetime = None # 委托时间 + self.lockGrids = [] # 锁单的网格,[openPrice,openPrice] + def toJson(self): """输出JSON""" @@ -58,6 +60,7 @@ class CtaGrid(object): j['orderRef'] = self.orderRef # OrderId j['openStatus'] = self.openStatus # 开仓状态 j['closeStatus'] = self.closeStatus # 平仓状态 + j['lockGrids'] = self.lockGrids # 对锁的网格 if type(self.openDatetime) == type(None): j['openDatetime'] = EMPTY_STRING @@ -325,6 +328,26 @@ class CtaGridTrade(object): self.writeCtaLog(u'异常,找不到网格[{0},{1},{2},{3},{4}]'.format(direction, openPrice, closePrice, orderRef, t)) return None + def getLastOpenedGrid(self, direction): + """获取最后一个开仓的网格""" + if direction == DIRECTION_SHORT: + opened_short_grids = self.getGrids(direction=direction, opened=True) + if opened_short_grids is None or len(opened_short_grids) ==0 : + return None + if len(opened_short_grids) > 1: + sortedGrids = sorted(opened_short_grids, key=lambda g:g.openPrice) + opened_short_grids = sortedGrids[-1:] + return opened_short_grids[0] + + if direction == DIRECTION_LONG: + opened_long_grids = self.getGrids(direction=direction, opened=True) + if opened_long_grids is None or len(opened_long_grids) ==0: + return None + if len(opened_long_grids) > 1: + sortedGrids = sorted(opened_long_grids, key=lambda g: g.openPrice) + opened_long_grids = sortedGrids[0:1] + return opened_long_grids[0] + def closeGrid(self, direction, closePrice, closeVolume): """网格交易结束""" if direction == DIRECTION_LONG: @@ -495,7 +518,6 @@ class CtaGridTrade(object): # 更新开仓均价 self.recount_avg_open_price() - path = os.path.abspath(os.path.dirname(__file__)) # 保存上网格列表 @@ -507,7 +529,6 @@ class CtaGridTrade(object): l.append(grid.toJson()) with open(jsonFileName, 'w') as f: - jsonL = json.dumps(l, indent=4) f.write(jsonL) @@ -522,7 +543,6 @@ class CtaGridTrade(object): l.append(grid.toJson()) with open(jsonFileName, 'w') as f: - jsonL = json.dumps(l, indent=4) f.write(jsonL) @@ -552,7 +572,6 @@ class CtaGridTrade(object): # 解析json文件 l = json.load(f) - grids = [] if len(l) > 0: @@ -570,6 +589,7 @@ class CtaGridTrade(object): grid.orderRef = i['orderRef'] # OrderId grid.openStatus = i['openStatus'] # 开仓状态 grid.closeStatus = i['closeStatus'] # 平仓状态 + strTime = i['openDatetime'] if strTime == EMPTY_STRING or type(strTime)==type(None): grid.openDatetime = None @@ -580,6 +600,10 @@ class CtaGridTrade(object): grid.tradedVolume = i['tradedVolume'] # 已交易的合约数量 except KeyError: grid.tradedVolume = EMPTY_INT + try: + grid.lockGrids = i['lockGrids'] + except KeyError: + grid.lockGrids = [] self.writeCtaLog(grid.toStr()) diff --git a/vn.trader/ctaStrategy/ctaHistoryData.py b/vn.trader/ctaStrategy/ctaHistoryData.py index 18304301..8094d577 100644 --- a/vn.trader/ctaStrategy/ctaHistoryData.py +++ b/vn.trader/ctaStrategy/ctaHistoryData.py @@ -4,12 +4,16 @@ 本模块中主要包含: 1. 从通联数据下载历史行情的引擎 2. 用来把MultiCharts导出的历史数据载入到MongoDB中用的函数 +3、从淘宝购买的tick csv数据导入mongodb """ from datetime import datetime, timedelta +from time import time import pymongo from time import time from multiprocessing.pool import ThreadPool +from collections import OrderedDict +import pandas as pd from ctaBase import * from vtConstant import * @@ -349,6 +353,106 @@ def loadMcCsv(fileName, dbName, symbol): print u'插入完毕,耗时:%s' % (time()-start) +def load_ticks_from_file(file_name,symbol,trading_day): + """从csv tick文件中UnicodeDictReader读取tick + file_name,文件全路径 + symbol,合约代码,RB01, RBMI 等 + trading_day,交易日字符串 + """ + # 先读取数据到Dict,以日期时间为key + ticks = OrderedDict() + + if not os.path.isfile(file_name): + print u'{0}文件不存在'.format(file_name) + return ticks + dt = None + csvReadFile = file(file_name, 'rb') + + start_time = time.clock() + df = pd.read_csv(file_name, encoding='gbk', parse_dates=False) + df.columns = ['date', 'time', 'lastPrice', 'lastVolume', 'totalInterest', 'position', + 'bidPrice1', 'bidVolume1', 'bidPrice2', 'bidVolume2', 'bidPrice3', 'bidVolume3', + 'askPrice1', 'askVolume1', 'askPrice2', 'askVolume2', 'askPrice3', 'askVolume3', 'BS'] + readed_ticks = len(df) + + for i in range(0, len(df)): + # 日期, 时间, 成交价, 成交量, 总量, 属性(持仓增减), B1价, B1量, B2价, B2量, B3价, B3量, S1价, S1量, S2价, S2量, S3价, S3量, BS + # 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 + row = df.iloc[i].to_dict() + tick = CtaTickData() + + tick.vtSymbol = symbol + tick.symbol = symbol + + tick.date = row['date'] + tick.tradingDay = trading_day + tick.time = row['time'] + + try: + tick.datetime = datetime.strptime(tick.date + ' ' + tick.time, '%Y-%m-%d %H:%M:%S') + except Exception as ex: + print u'日期转换错误:{0},{1}:{2}'.format(tick.date + ' ' + tick.time, Exception, ex) + continue + + tick.date = tick.datetime.strftime('%Y%m%d') + # 修正毫秒 + if tick.datetime.replace(microsecond=0) == dt: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick.datetime = tick.datetime.replace(microsecond=500) + tick.time = tick.datetime.strftime('%H:%M:%S.%f') + + else: + tick.datetime = tick.datetime.replace(microsecond=0) + tick.time = tick.datetime.strftime('%H:%M:%S.%f') + + dt = tick.datetime + + tick.lastPrice = float(row['lastPrice']) + tick.volume = int(float(row['lastVolume'])) + tick.bidPrice1 = float(row['bidPrice1']) # 叫买价(价格低) + tick.bidVolume1 = int(float(row['bidVolume1'])) + tick.askPrice1 = float(row['askPrice1']) # 叫卖价(价格高) + tick.askVolume1 = int(float(row['askVolume1'])) + + # 排除涨停/跌停的数据 + if (tick.bidPrice1 == float('1.79769E308') and tick.bidVolume1 == 0) : + tick.bidPrice1 = 0 + + if (tick.askPrice1 == float('1.79769E308') and tick.askVolume1 == 0): + tick.askPrice1 = 0 + + dtStr = tick.date + ' ' + tick.time + if dtStr not in ticks: + ticks[dtStr] = tick + if len(ticks)!= readed_ticks: + print u'分析tick对象数量{0}与读取数据数量{1}不一致'.format(len(ticks),readed_ticks) + + print u'读取{0},共加载{1}条数据,耗时:{2}seconds}'.format(file_name, readed_ticks, str(time.clock()-start_time)) + + return ticks + +def impot_ticks_from_folder(folder_path): + + for dirpath, _, file_names in os.walk(folder_path): + for file_name in file_names: + file_path = os.path.join(dirpath, file_name) + + if file_name.lower().find('.csv') != -1: + s = file_name.replace('.csv', '').split('_') + if len(s)!=2: + print u'{0} not match format'.format(file_path) + continue + + symbol = s[0] + trading_day = s[1] + + if len(trading_day)!=8: + print u'{0} trading_day not match format'.format(file_path) + continue + + ticks = load_ticks_from_file(file_name=file_path,symbol=symbol,trading_day=trading_day) + + print ('finish.') if __name__ == '__main__': ## 简单的测试脚本可以写在这里 @@ -358,4 +462,7 @@ if __name__ == '__main__': #e.downloadEquityDailyBar('000001') # 这里将项目中包含的股指日内分钟线csv导入MongoDB,作者电脑耗时大约3分钟 - loadMcCsv('IF0000_1min.csv', MINUTE_DB_NAME, 'IF0000') + #loadMcCsv('IF0000_1min.csv', MINUTE_DB_NAME, 'IF0000') + + csv_ticks_folder_path = '/home/ubuntu/Ticks/SQ/2017' + impot_ticks_from_folder() \ No newline at end of file diff --git a/vn.trader/ctaStrategy/ctaLineBar.py b/vn.trader/ctaStrategy/ctaLineBar.py index 05c400c1..b27eb59a 100644 --- a/vn.trader/ctaStrategy/ctaLineBar.py +++ b/vn.trader/ctaStrategy/ctaLineBar.py @@ -317,21 +317,34 @@ class CtaLineBar(object): self.curTradingDay = bar.tradingDay - if (self.period == PERIOD_SECOND and (bar.datetime-lastBar.datetime).seconds >= self.barTimeInterval) \ - or (self.period == PERIOD_MINUTE and bar.datetime.minute % self.barTimeInterval == 0 - and bar.datetime.minute != lastBar.datetime.minute) \ - or (self.period == PERIOD_HOUR and self.barTimeInterval == 1 and bar.datetime - and bar.datetime.hour != lastBar.datetime.hour) \ - or (self.period == PERIOD_HOUR and self.barTimeInterval == 2 and bar.datetime - and bar.datetime.hour != lastBar.datetime.hour - and bar.datetime.hour in {1, 9, 11, 13, 21, 23}) \ - or (self.period == PERIOD_HOUR and self.barTimeInterval == 4 and bar.datetime - and bar.datetime.hour != lastBar.datetime.hour - and bar.datetime.hour in {1, 9, 13, 21}) \ - or (self.period == PERIOD_DAY and bar.datetime.date != lastBar.datetime.date ): + is_new_bar = False + if self.period == PERIOD_SECOND and (bar.datetime-lastBar.datetime).seconds >= self.barTimeInterval: + is_new_bar = True + + elif self.period == PERIOD_MINUTE and (bar.datetime - lastBar.datetime).seconds >= self.barTimeInterval*60: + is_new_bar = True + + elif self.period == PERIOD_HOUR: + if self.barTimeInterval == 1 and bar.datetime.hour != lastBar.datetime.hour : + is_new_bar = True + + elif self.barTimeInterval == 2 and bar.datetime.hour != lastBar.datetime.hour \ + and bar.datetime.hour in {1, 9, 11, 13, 15, 21, 23}: + is_new_bar = True + + elif self.barTimeInterval == 4 and bar.datetime.hour != lastBar.datetime.hour \ + and bar.datetime.hour in {1, 9, 13, 21}: + is_new_bar = True + + elif self.period == PERIOD_DAY and bar.datetime.date != lastBar.datetime.date : + is_new_bar = True + + if is_new_bar: + # 添加新的bar self.lineBar.append(bar) - self.onBar(bar) + # 将上一个Bar推送至OnBar事件 + self.onBar(lastBar) return # 更新最后一个bar @@ -535,6 +548,8 @@ class CtaLineBar(object): # 2,分钟、小时周期,取整=0 # 3、日周期,开盘时间 # 4、不是最后一个结束tick + is_new_bar = False + if ((self.period == PERIOD_SECOND and (tick.datetime-lastBar.datetime).seconds >= self.barTimeInterval) \ or (self.period == PERIOD_MINUTE and tick.datetime.minute % self.barTimeInterval == 0 @@ -1153,9 +1168,9 @@ class CtaLineBar(object): l = len(self.lineBar) - if l < min(7, self.inputBollLen)+1: + if l < min(14, self.inputBollLen)+1: self.debugCtaLog(u'数据未充分,当前Bar数据数量:{0},计算Boll需要:{1}'. - format(len(self.lineBar), min(7, self.inputBollLen)+1)) + format(len(self.lineBar), min(14, self.inputBollLen)+1)) return if l < self.inputBollLen+2: