This commit is contained in:
msincenselee 2017-06-03 20:12:54 +08:00
parent 973a627dd1
commit 5143e952db
5 changed files with 388 additions and 71 deletions

View File

@ -6,6 +6,10 @@
'''
from __future__ import division
import sys
import os
cta_engine_path = os.path.abspath(os.path.dirname(__file__))
from datetime import datetime, timedelta
from collections import OrderedDict
from itertools import product
@ -19,7 +23,7 @@ from vtFunction import loadMongoSetting
from eventEngine import *
import MySQLdb
#import MySQLdb
import json
import os
import sys
@ -706,7 +710,8 @@ class BacktestingEngine(object):
dtStr = tick.date + ' ' + tick.time
if dtStr in leg2Ticks:
self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
pass
#self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
else:
leg2Ticks[dtStr] = tick
@ -823,8 +828,152 @@ class BacktestingEngine(object):
cache.close()
return True
# ----------------------------------------------------------------------
def runBackTestingWithArbTickFile2(self, leg1MainPath,leg2MainPath, arbSymbol):
"""运行套利回测使用本地tick csv数据)
参数套利代码 SP rb1610&rb1701
added by IncenseLee
原始的tick存放在相应市场下每天的目录中目录包含市场各个合约的数据
E:\ticks\SQ\201606\20160601\
RB10.csv
RB01.csv
....
目录为交易日
按照回测的开始日期到结束日期循环每一天
读取eg1如RB1610读取Leg2如RB701合并成价差tick灌输到策略的onTick中
"""
self.capital = self.initCapital # 更新设置期初资金
if len(arbSymbol) < 1:
self.writeCtaLog(u'套利合约为空')
return
if not (arbSymbol.upper().index("SP") == 0 and arbSymbol.index(" ") > 0 and arbSymbol.index("&") > 0):
self.writeCtaLog(u'套利合约格式不符合')
return
# 获得Leg1leg2
legs = arbSymbol[arbSymbol.index(" "):]
leg1 = legs[1:legs.index("&")]
leg2 = legs[legs.index("&") + 1:]
self.writeCtaLog(u'Leg1:{0},Leg2:{1}'.format(leg1, leg2))
if not self.dataStartDate:
self.writeCtaLog(u'回测开始日期未设置。')
return
# RB
if len(self.symbol) < 1:
self.writeCtaLog(u'回测对象未设置。')
return
if not self.dataEndDate:
self.dataEndDate = datetime.today()
# 首先根据回测模式,确认要使用的数据类
if self.mode == self.BAR_MODE:
self.writeCtaLog(u'本回测仅支持tick模式')
return
testdays = (self.dataEndDate - self.dataStartDate).days
if testdays < 1:
self.writeCtaLog(u'回测时间不足')
return
for i in range(0, testdays):
testday = self.dataStartDate + timedelta(days=i)
self.output(u'回测日期:{0}'.format(testday))
# 白天数据
self.__loadArbTicks2(leg1MainPath, leg2MainPath, testday, leg1, leg2)
def __loadArbTicks2(self, leg1MainPath, leg2MainPath, testday, leg1Symbol, leg2Symbol):
"""加载taobao csv格式tick产生的价差合约"""
self.writeCtaLog(u'加载回测日期:{0}\{1}的价差tick'.format(leg1MainPath, testday))
p = re.compile(r"([A-Z]+)[0-9]+", re.I)
leg1_shortSymbol = p.match(leg1Symbol)
leg2_shortSymbol = p.match(leg2Symbol)
if leg1_shortSymbol is None or leg2_shortSymbol is None:
self.writeCtaLog(u'{0},{1}不能正则分解'.format(leg1Symbol, leg2Symbol))
return
leg1_shortSymbol = leg1_shortSymbol.group(1)
leg2_shortSymbol = leg2_shortSymbol.group(1)
arbTicks = []
leg1File = os.path.abspath(
os.path.join(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'),
'{0}{1}_{2}.csv'.format(leg1_shortSymbol, leg1Symbol[-2:], testday.strftime('%Y%m%d'))))
if not os.path.isfile(leg1File):
self.writeCtaLog(u'{0}文件不存在'.format(leg1File))
return
leg2File = os.path.abspath(
os.path.join(leg2MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'),
'{0}{1}_{2}.csv'.format(leg2_shortSymbol, leg2Symbol[-2:], testday.strftime('%Y%m%d'))))
if not os.path.isfile(leg2File):
self.writeCtaLog(u'{0}文件不存在'.format(leg2File))
return
# 先读取leg2的数据到目录以日期时间为key
leg2Ticks = self.__loadTicksFromFile2(filepath=leg2File,tickDate=testday,vtSymbol=leg2Symbol)
leg1Ticks = self.__loadTicksFromFile2(filepath=leg1File, tickDate=testday, vtSymbol=leg1Symbol)
for dtStr,leg1_tick in leg1Ticks.iteritems():
if dtStr in leg2Ticks:
arbTick = CtaTickData()
leg2_tick = leg2Ticks[dtStr]
arbTick.vtSymbol = self.symbol
arbTick.symbol = self.symbol
arbTick.date = leg1_tick.date
arbTick.time = leg1_tick.time
arbTick.datetime = leg1_tick.datetime
arbTick.tradingDay = leg1_tick.tradingDay
arbTick.lastPrice = EMPTY_FLOAT
arbTick.volume = EMPTY_INT
# 排除涨停/跌停的数据
if ((leg1_tick.askPrice1 == float('1.79769E308') or leg1_tick.askPrice1 == 0) and leg1_tick.askVolume1 == 0) \
or ((leg1_tick.bidPrice1 == float('1.79769E308') or leg1_tick.bidPrice1 == 0) and leg1_tick.bidVolume1 == 0):
continue
if ((leg2_tick.askPrice1 == float('1.79769E308') or leg2_tick.askPrice1 == 0) and leg2_tick.askVolume1 == 0) \
or ((leg2_tick.bidPrice1 == float('1.79769E308') or leg2_tick.bidPrice1 == 0) and leg2_tick.bidVolume1 == 0):
continue
# 叫卖价差=leg1.askPrice1 - leg2.bidPrice1volume为两者最小
arbTick.askPrice1 = leg1_tick.askPrice1 - leg2_tick.bidPrice1
arbTick.askVolume1 = min(leg1_tick.askVolume1, leg2_tick.bidVolume1)
# 叫买价差=leg1.bidPrice1 - leg2.askPrice1volume为两者最小
arbTick.bidPrice1 = leg1_tick.bidPrice1 - leg2_tick.askPrice1
arbTick.bidVolume1 = min(leg1_tick.bidVolume1, leg2_tick.askVolume1)
arbTicks.append(arbTick)
del leg2Ticks[dtStr]
for t in arbTicks:
# 推送到策略中
self.newTick(t)
def runBackTestingWithNonStrArbTickFile(self, leg1MainPath, leg2MainPath, leg1Symbol,leg2Symbol):
"""运行套利回测使用本地tickcsv数据)
"""运行套利回测使用本地tick txt数据)
参数
leg1MainPath leg1合约所在的市场路径
leg2MainPath leg2合约所在的市场路径
@ -941,7 +1090,8 @@ class BacktestingEngine(object):
dtStr = tick.date + ' ' + tick.time
if dtStr in ticks:
self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
pass
#self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
else:
ticks[dtStr] = tick
@ -1077,7 +1227,7 @@ class BacktestingEngine(object):
return ticks
dt = None
csvReadFile = file(filepath, 'rb')
df = pd.read_csv(filepath, encoding='gbk')
df = pd.read_csv(filepath, encoding='gbk',parse_dates=False)
df.columns = ['date', 'time', 'lastPrice', 'lastVolume', 'totalInterest', 'position',
'bidPrice1', 'bidVolume1', 'bidPrice2', 'bidVolume2', 'bidPrice3', 'bidVolume3',
'askPrice1', 'askVolume1', 'askPrice2', 'askVolume2', 'askPrice3', 'askVolume3','BS']
@ -1129,7 +1279,9 @@ class BacktestingEngine(object):
dtStr = tick.date + ' ' + tick.time
if dtStr in ticks:
self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
pass
#self.writeCtaError(u'日内数据重复,异常,数据时间为:{0}'.format(dtStr))
else:
ticks[dtStr] = tick
@ -1151,14 +1303,23 @@ class BacktestingEngine(object):
# E:\Ticks\SQ\2014\201401\20140102\ag01_20140102.csv
leg1File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \
.format(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg1_shortSymbol, leg1Symbol[-2:])
#leg1File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \
# .format(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg1_shortSymbol, leg1Symbol[-2:])
leg1File = os.path.abspath(
os.path.join(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'),
'{0}{1}_{2}.csv'.format(leg1_shortSymbol,leg1Symbol[-2:],testday.strftime('%Y%m%d'))))
if not os.path.isfile(leg1File):
self.writeCtaLog(u'{0}文件不存在'.format(leg1File))
return
leg2File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \
.format(leg2MainPath,testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg2_shortSymbol, leg2Symbol[-2:])
#leg2File = u'e:\\ticks\\{0}\\{1}\\{2}\\{3}\\{4}{5}_{3}.csv' \
# .format(leg2MainPath,testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'), leg2_shortSymbol, leg2Symbol[-2:])
leg2File = os.path.abspath(
os.path.join(leg1MainPath, testday.strftime('%Y'), testday.strftime('%Y%m'), testday.strftime('%Y%m%d'),
'{0}{1}_{2}.csv'.format(leg2_shortSymbol, leg2Symbol[-2:], testday.strftime('%Y%m%d'))))
if not os.path.isfile(leg2File):
self.writeCtaLog(u'{0}文件不存在'.format(leg2File))
return
@ -1779,11 +1940,14 @@ class BacktestingEngine(object):
while coverVolume > 0:
if len(shortTrade)==0:
self.writeCtaError(u'异常,没有开空仓的数据')
break
raise RuntimeError(u'realtimeCalculate() Exception,没有开空仓的数据')
pop_indexs = [i for i, val in enumerate(shortTrade) if val.vtSymbol == trade.vtSymbol]
if len(pop_indexs) < 1:
self.writeCtaError(u'没有对应的symbol:{0}开空仓数据'.format(trade.vtSymbol))
break
raise RuntimeError(u'realtimeCalculate() Exception,没有对应的symbol:{0}开空仓数据'.format(trade.vtSymbol))
pop_index = pop_indexs[0]
# 从未平仓的空头交易
entryTrade = shortTrade.pop(pop_index)
@ -1946,12 +2110,14 @@ class BacktestingEngine(object):
while sellVolume > 0:
if len(longTrade) == 0:
self.writeCtaError(u'异常,没有开多单')
break
raise RuntimeError(u'realtimeCalculate() Exception,没有开多单')
return
pop_indexs = [i for i, val in enumerate(longTrade) if val.vtSymbol == trade.vtSymbol]
if len(pop_indexs) < 1:
self.writeCtaError(u'没有对应的symbol{0}开多仓数据,'.format(trade.vtSymbol))
break
raise RuntimeError(u'realimeCalculate() Exception,没有对应的symbol{0}开多仓数据,'.format(trade.vtSymbol))
return
pop_index = pop_indexs[0]
@ -2159,27 +2325,19 @@ class BacktestingEngine(object):
"""实时计算交易结果2
支持多空仓位并存"""
if len(self.tradeDict) <1:
return
if len(self.tradeDict) < 1: return
tradeids = self.tradeDict.keys()
resultDict = OrderedDict() # 交易结果记录
longTrade = [] # 未平仓的多头交易
shortTrade = [] # 未平仓的空头交易
longid = EMPTY_STRING
shortid = EMPTY_STRING
no_match_shortTrade = False
no_match_longTrade = False
# 对交易记录逐一处理
for tradeid in tradeids:
try:
trade = self.tradeDict[tradeid]
except:
self.output(u'没有{0}的成交单'.format(tradeid))
self.writeCtaError(u'没有{0}的成交单'.format(tradeid))
continue
# buy trade
@ -2191,7 +2349,6 @@ class BacktestingEngine(object):
# cover trade
elif trade.direction == DIRECTION_LONG and trade.offset == OFFSET_CLOSE:
gId = trade.tradeID # 交易组(多个平仓数为一组)
gr = None # 组合的交易结果
@ -2200,11 +2357,13 @@ class BacktestingEngine(object):
while coverVolume > 0:
if len(self.shortPosition) == 0:
self.writeCtaError(u'异常!没有开空仓的数据')
break
raise Exception(u'realtimeCalculate2() Exception,没有开空仓的数据')
return
pop_indexs = [i for i, val in enumerate(self.shortPosition) if val.vtSymbol == trade.vtSymbol]
if len(pop_indexs) < 1:
self.writeCtaError(u'异常没有对应symbol:{0}的空单持仓'.format(trade.vtSymbol))
break
raise Exception(u'realtimeCalculate2() Exception,没有对应symbol:{0}的空单持仓'.format(trade.vtSymbol))
return
pop_index = pop_indexs[0]
# 从未平仓的空头交易
@ -2245,7 +2404,6 @@ class BacktestingEngine(object):
trade.tradeTime, tradeid, trade.price,
entryTrade.volume, result.pnl)
self.output(msg)
self.writeCtaLog(msg)
if type(gr) == type(None):
@ -2256,7 +2414,6 @@ class BacktestingEngine(object):
else:
# 不属于组合
resultDict[entryTrade.dt] = result
# 删除平空交易单,
del self.tradeDict[trade.tradeID]
@ -2336,7 +2493,6 @@ class BacktestingEngine(object):
# Short Trade
elif trade.direction == DIRECTION_SHORT and trade.offset == OFFSET_OPEN:
self.output(u'{0}空开:{1},{2}'.format(trade.vtSymbol, trade.volume, trade.price))
self.writeCtaLog(u'{0}空开:{1},{2}'.format(trade.vtSymbol, trade.volume, trade.price))
self.shortPosition.append(trade)
@ -2353,17 +2509,17 @@ class BacktestingEngine(object):
while sellVolume > 0:
if len(self.longPosition) == 0:
self.writeCtaError(u'异常,没有开多单')
break
raise RuntimeError(u'realtimeCalculate2() Exception,没有开多单')
return
pop_indexs = [i for i, val in enumerate(self.longPosition) if val.vtSymbol == trade.vtSymbol]
if len(pop_indexs) < 1:
self.writeCtaError(u'没有对应的symbol{0}开多仓数据,'.format(trade.vtSymbol))
break
self.writeCtaError(u'没有对应的symbol{0}多单数据,'.format(trade.vtSymbol))
raise RuntimeError(u'realtimeCalculate2() Exception,没有对应的symbol{0}多单数据,'.format(trade.vtSymbol))
return
pop_index = pop_indexs[0]
entryTrade = self.longPosition.pop(pop_index)
# 开多volume不大于平仓volume
if sellVolume >= entryTrade.volume:
self.writeCtaLog(u'{0}Sell Volume:{1} >= Entry Volume:{2}'.format(entryTrade.vtSymbol, sellVolume, entryTrade.volume))
@ -2429,7 +2585,8 @@ class BacktestingEngine(object):
# 开多volume,大于平仓volume需要更新减少tradeDict的数量。
else:
longVolume = entryTrade.volume -sellVolume
self.writeCtaLog(u'Long Volume:{0} > sell Volume:{1}'.format(entryTrade.volume,sellVolume))
self.writeCtaLog(u'Entry Long Volume:{0} > Sell Volume:{1},Remain:{2}'
.format(entryTrade.volume, sellVolume, longVolume))
result = TradingResult(entryPrice=entryTrade.price,
entryDt=entryTrade.dt,
@ -2453,10 +2610,11 @@ class BacktestingEngine(object):
t['Profit'] = result.pnl
self.exportTradeList.append(t)
self.writeCtaLog(u'Gid:{0} {1}[{2}:开多tid={3}:{4}]-[{5}.平多tid={6},{7},vol:{8}],净盈亏:{9}'
.format(gId, entryTrade.vtSymbol,entryTrade.tradeTime, longid, entryTrade.price,
trade.tradeTime, tradeid, trade.price,
sellVolume, result.pnl))
msg = u'Gid:{0} {1}[{2}:开多tid={3}:{4}]-[{5}.平多tid={6},{7},vol:{8}],净盈亏:{9}'\
.format(gId, entryTrade.vtSymbol,entryTrade.tradeTime, longid, entryTrade.price,
trade.tradeTime, tradeid, trade.price, sellVolume, result.pnl)
self.output(msg)
self.writeCtaLog(msg)
# 减少开多volume,重新推进多单持仓列表中
entryTrade.volume = longVolume
@ -2557,9 +2715,11 @@ class BacktestingEngine(object):
self.totalCommission += result.commission
self.totalSlippage += result.slippage
self.output(u'[{5}],{6} Vol:{0},盈亏:{1},回撤:{2}/{3},权益:{4}'.
format(abs(result.volume), result.pnl, drawdown,
drawdownRate, self.capital, result.groupId, time))
msg =u'[{0}] {1} 盈亏:{2},回撤:{3}/{4},权益:{5}'\
.format(result.groupId, time, result.pnl, drawdown,
drawdownRate, self.capital, )
self.output(msg)
self.writeCtaLog(msg)
# 重新计算一次avaliable
self.avaliable = self.capital - occupyMoney
@ -2961,7 +3121,8 @@ class BacktestingEngine(object):
datetime.now().strftime('%Y%m%d_%H%M'))))
fig = plt.gcf()
fig.savefig(fig_file_name)
plt.show()
print (u'图表保存至:{0}'.format(fig_file_name))
#plt.show()
#----------------------------------------------------------------------
def putStrategyEvent(self, name):

View File

@ -608,6 +608,14 @@ class CtaEngine(object):
# modifid by Incenselee 支持多个Symbol的订阅
symbols = strategy.vtSymbol.split(';')
# 判断是否有Leg1Symbol,Leg2Symbol 两个合约属性
if hasattr(strategy, 'Leg1Symbol'):
if strategy.Leg1Symbol not in symbols:
symbols.append(strategy.Leg1Symbol)
if hasattr(strategy, 'Leg2Symbol'):
if strategy.Leg2Symbol not in symbols:
symbols.append(strategy.Leg2Symbol)
for symbol in symbols:
self.writeCtaLog(u'添加合约{0}与策略的匹配目录'.format(symbol))
if symbol in self.tickStrategyDict:
@ -829,14 +837,16 @@ class CtaEngine(object):
# ----------------------------------------------------------------------
def loadPosition(self):
"""从数据库载入策略的持仓情况"""
for strategy in self.strategyDict.values():
flt = {'name': strategy.name,
'vtSymbol': strategy.vtSymbol}
posData = self.mainEngine.dbQuery(POSITION_DB_NAME, strategy.className, flt)
for d in posData:
strategy.pos = d['pos']
try:
for strategy in self.strategyDict.values():
flt = {'name': strategy.name,
'vtSymbol': strategy.vtSymbol}
posData = self.mainEngine.dbQuery(POSITION_DB_NAME, strategy.className, flt)
for d in posData:
strategy.pos = d['pos']
except:
self.writeCtaLog(u'loadPosition Exception')
# ----------------------------------------------------------------------
def roundToPriceTick(self, priceTick, price):
"""取整价格到合约最小价格变动"""

View File

@ -41,6 +41,8 @@ class CtaGrid(object):
self.openDatetime = None
self.orderDatetime = None # 委托时间
self.lockGrids = [] # 锁单的网格,[openPrice,openPrice]
def toJson(self):
"""输出JSON"""
@ -58,6 +60,7 @@ class CtaGrid(object):
j['orderRef'] = self.orderRef # OrderId
j['openStatus'] = self.openStatus # 开仓状态
j['closeStatus'] = self.closeStatus # 平仓状态
j['lockGrids'] = self.lockGrids # 对锁的网格
if type(self.openDatetime) == type(None):
j['openDatetime'] = EMPTY_STRING
@ -325,6 +328,26 @@ class CtaGridTrade(object):
self.writeCtaLog(u'异常,找不到网格[{0},{1},{2},{3},{4}]'.format(direction, openPrice, closePrice, orderRef, t))
return None
def getLastOpenedGrid(self, direction):
"""获取最后一个开仓的网格"""
if direction == DIRECTION_SHORT:
opened_short_grids = self.getGrids(direction=direction, opened=True)
if opened_short_grids is None or len(opened_short_grids) ==0 :
return None
if len(opened_short_grids) > 1:
sortedGrids = sorted(opened_short_grids, key=lambda g:g.openPrice)
opened_short_grids = sortedGrids[-1:]
return opened_short_grids[0]
if direction == DIRECTION_LONG:
opened_long_grids = self.getGrids(direction=direction, opened=True)
if opened_long_grids is None or len(opened_long_grids) ==0:
return None
if len(opened_long_grids) > 1:
sortedGrids = sorted(opened_long_grids, key=lambda g: g.openPrice)
opened_long_grids = sortedGrids[0:1]
return opened_long_grids[0]
def closeGrid(self, direction, closePrice, closeVolume):
"""网格交易结束"""
if direction == DIRECTION_LONG:
@ -495,7 +518,6 @@ class CtaGridTrade(object):
# 更新开仓均价
self.recount_avg_open_price()
path = os.path.abspath(os.path.dirname(__file__))
# 保存上网格列表
@ -507,7 +529,6 @@ class CtaGridTrade(object):
l.append(grid.toJson())
with open(jsonFileName, 'w') as f:
jsonL = json.dumps(l, indent=4)
f.write(jsonL)
@ -522,7 +543,6 @@ class CtaGridTrade(object):
l.append(grid.toJson())
with open(jsonFileName, 'w') as f:
jsonL = json.dumps(l, indent=4)
f.write(jsonL)
@ -552,7 +572,6 @@ class CtaGridTrade(object):
# 解析json文件
l = json.load(f)
grids = []
if len(l) > 0:
@ -570,6 +589,7 @@ class CtaGridTrade(object):
grid.orderRef = i['orderRef'] # OrderId
grid.openStatus = i['openStatus'] # 开仓状态
grid.closeStatus = i['closeStatus'] # 平仓状态
strTime = i['openDatetime']
if strTime == EMPTY_STRING or type(strTime)==type(None):
grid.openDatetime = None
@ -580,6 +600,10 @@ class CtaGridTrade(object):
grid.tradedVolume = i['tradedVolume'] # 已交易的合约数量
except KeyError:
grid.tradedVolume = EMPTY_INT
try:
grid.lockGrids = i['lockGrids']
except KeyError:
grid.lockGrids = []
self.writeCtaLog(grid.toStr())

View File

@ -4,12 +4,16 @@
本模块中主要包含
1. 从通联数据下载历史行情的引擎
2. 用来把MultiCharts导出的历史数据载入到MongoDB中用的函数
3从淘宝购买的tick csv数据导入mongodb
"""
from datetime import datetime, timedelta
from time import time
import pymongo
from time import time
from multiprocessing.pool import ThreadPool
from collections import OrderedDict
import pandas as pd
from ctaBase import *
from vtConstant import *
@ -349,6 +353,106 @@ def loadMcCsv(fileName, dbName, symbol):
print u'插入完毕,耗时:%s' % (time()-start)
def load_ticks_from_file(file_name,symbol,trading_day):
"""从csv tick文件中UnicodeDictReader读取tick
file_name,文件全路径
symbol合约代码RB01, RBMI
trading_day,交易日字符串
"""
# 先读取数据到Dict以日期时间为key
ticks = OrderedDict()
if not os.path.isfile(file_name):
print u'{0}文件不存在'.format(file_name)
return ticks
dt = None
csvReadFile = file(file_name, 'rb')
start_time = time.clock()
df = pd.read_csv(file_name, encoding='gbk', parse_dates=False)
df.columns = ['date', 'time', 'lastPrice', 'lastVolume', 'totalInterest', 'position',
'bidPrice1', 'bidVolume1', 'bidPrice2', 'bidVolume2', 'bidPrice3', 'bidVolume3',
'askPrice1', 'askVolume1', 'askPrice2', 'askVolume2', 'askPrice3', 'askVolume3', 'BS']
readed_ticks = len(df)
for i in range(0, len(df)):
# 日期, 时间, 成交价, 成交量, 总量, 属性(持仓增减), B1价, B1量, B2价, B2量, B3价, B3量, S1价, S1量, S2价, S2量, S3价, S3量, BS
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
row = df.iloc[i].to_dict()
tick = CtaTickData()
tick.vtSymbol = symbol
tick.symbol = symbol
tick.date = row['date']
tick.tradingDay = trading_day
tick.time = row['time']
try:
tick.datetime = datetime.strptime(tick.date + ' ' + tick.time, '%Y-%m-%d %H:%M:%S')
except Exception as ex:
print u'日期转换错误:{0},{1}:{2}'.format(tick.date + ' ' + tick.time, Exception, ex)
continue
tick.date = tick.datetime.strftime('%Y%m%d')
# 修正毫秒
if tick.datetime.replace(microsecond=0) == dt:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
tick.datetime = tick.datetime.replace(microsecond=500)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
else:
tick.datetime = tick.datetime.replace(microsecond=0)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')
dt = tick.datetime
tick.lastPrice = float(row['lastPrice'])
tick.volume = int(float(row['lastVolume']))
tick.bidPrice1 = float(row['bidPrice1']) # 叫买价(价格低)
tick.bidVolume1 = int(float(row['bidVolume1']))
tick.askPrice1 = float(row['askPrice1']) # 叫卖价(价格高)
tick.askVolume1 = int(float(row['askVolume1']))
# 排除涨停/跌停的数据
if (tick.bidPrice1 == float('1.79769E308') and tick.bidVolume1 == 0) :
tick.bidPrice1 = 0
if (tick.askPrice1 == float('1.79769E308') and tick.askVolume1 == 0):
tick.askPrice1 = 0
dtStr = tick.date + ' ' + tick.time
if dtStr not in ticks:
ticks[dtStr] = tick
if len(ticks)!= readed_ticks:
print u'分析tick对象数量{0}与读取数据数量{1}不一致'.format(len(ticks),readed_ticks)
print u'读取{0},共加载{1}条数据,耗时:{2}seconds}'.format(file_name, readed_ticks, str(time.clock()-start_time))
return ticks
def impot_ticks_from_folder(folder_path):
for dirpath, _, file_names in os.walk(folder_path):
for file_name in file_names:
file_path = os.path.join(dirpath, file_name)
if file_name.lower().find('.csv') != -1:
s = file_name.replace('.csv', '').split('_')
if len(s)!=2:
print u'{0} not match format'.format(file_path)
continue
symbol = s[0]
trading_day = s[1]
if len(trading_day)!=8:
print u'{0} trading_day not match format'.format(file_path)
continue
ticks = load_ticks_from_file(file_name=file_path,symbol=symbol,trading_day=trading_day)
print ('finish.')
if __name__ == '__main__':
## 简单的测试脚本可以写在这里
@ -358,4 +462,7 @@ if __name__ == '__main__':
#e.downloadEquityDailyBar('000001')
# 这里将项目中包含的股指日内分钟线csv导入MongoDB作者电脑耗时大约3分钟
loadMcCsv('IF0000_1min.csv', MINUTE_DB_NAME, 'IF0000')
#loadMcCsv('IF0000_1min.csv', MINUTE_DB_NAME, 'IF0000')
csv_ticks_folder_path = '/home/ubuntu/Ticks/SQ/2017'
impot_ticks_from_folder()

View File

@ -317,21 +317,34 @@ class CtaLineBar(object):
self.curTradingDay = bar.tradingDay
if (self.period == PERIOD_SECOND and (bar.datetime-lastBar.datetime).seconds >= self.barTimeInterval) \
or (self.period == PERIOD_MINUTE and bar.datetime.minute % self.barTimeInterval == 0
and bar.datetime.minute != lastBar.datetime.minute) \
or (self.period == PERIOD_HOUR and self.barTimeInterval == 1 and bar.datetime
and bar.datetime.hour != lastBar.datetime.hour) \
or (self.period == PERIOD_HOUR and self.barTimeInterval == 2 and bar.datetime
and bar.datetime.hour != lastBar.datetime.hour
and bar.datetime.hour in {1, 9, 11, 13, 21, 23}) \
or (self.period == PERIOD_HOUR and self.barTimeInterval == 4 and bar.datetime
and bar.datetime.hour != lastBar.datetime.hour
and bar.datetime.hour in {1, 9, 13, 21}) \
or (self.period == PERIOD_DAY and bar.datetime.date != lastBar.datetime.date ):
is_new_bar = False
if self.period == PERIOD_SECOND and (bar.datetime-lastBar.datetime).seconds >= self.barTimeInterval:
is_new_bar = True
elif self.period == PERIOD_MINUTE and (bar.datetime - lastBar.datetime).seconds >= self.barTimeInterval*60:
is_new_bar = True
elif self.period == PERIOD_HOUR:
if self.barTimeInterval == 1 and bar.datetime.hour != lastBar.datetime.hour :
is_new_bar = True
elif self.barTimeInterval == 2 and bar.datetime.hour != lastBar.datetime.hour \
and bar.datetime.hour in {1, 9, 11, 13, 15, 21, 23}:
is_new_bar = True
elif self.barTimeInterval == 4 and bar.datetime.hour != lastBar.datetime.hour \
and bar.datetime.hour in {1, 9, 13, 21}:
is_new_bar = True
elif self.period == PERIOD_DAY and bar.datetime.date != lastBar.datetime.date :
is_new_bar = True
if is_new_bar:
# 添加新的bar
self.lineBar.append(bar)
self.onBar(bar)
# 将上一个Bar推送至OnBar事件
self.onBar(lastBar)
return
# 更新最后一个bar
@ -535,6 +548,8 @@ class CtaLineBar(object):
# 2分钟、小时周期取整=0
# 3、日周期开盘时间
# 4、不是最后一个结束tick
is_new_bar = False
if ((self.period == PERIOD_SECOND and (tick.datetime-lastBar.datetime).seconds >= self.barTimeInterval) \
or
(self.period == PERIOD_MINUTE and tick.datetime.minute % self.barTimeInterval == 0
@ -1153,9 +1168,9 @@ class CtaLineBar(object):
l = len(self.lineBar)
if l < min(7, self.inputBollLen)+1:
if l < min(14, self.inputBollLen)+1:
self.debugCtaLog(u'数据未充分,当前Bar数据数量{0}计算Boll需要{1}'.
format(len(self.lineBar), min(7, self.inputBollLen)+1))
format(len(self.lineBar), min(14, self.inputBollLen)+1))
return
if l < self.inputBollLen+2: