之前修改的CTP的上期所持仓依旧有所问题,基于持仓数据缓存的模式进行了修复,同时修复回测引擎中的一个tick回测相关的bug。

This commit is contained in:
chenxy123 2016-04-27 22:49:28 +08:00
parent e68afab909
commit 1fdddd2033
3 changed files with 73 additions and 46 deletions

View File

@ -1,5 +0,0 @@
<?xml version="1.0"?>
<SubscriptionDataContainer xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="urn:Microsoft.VisualStudio.WindowsAzure.CommonAzureTools.Authentication.CacheManagement">
<Items />
<TokenCache>AQAAANCMnd8BFdERjHoAwE/Cl+sBAAAAhE4p+D02F0WYPy8qCmMXxAAAAAACAAAAAAAQZgAAAAEAACAAAABqqZf7eO/8AqQFqurp2lQDI7ZnZtjWpYLxdEZZGWoeTgAAAAAOgAAAAAIAACAAAACO1demA+YL03Dkjrvv0fYv1k0mGB4XD5t7YYCfD7RdCBAAAACmf9hwg57+FQ2Y5W5BmkdtQAAAAO2Nz3wpQbXLHeGtVRYjAqDcPS8ZJZJVEQ17yhmhHxf12PU4Ru3qr5bEI9j6zaanFd0dF00D4aUwUNKJ5o5raII=</TokenCache>
</SubscriptionDataContainer>

View File

@ -174,6 +174,7 @@ class BacktestingEngine(object):
def newTick(self, tick): def newTick(self, tick):
"""新的Tick""" """新的Tick"""
self.tick = tick self.tick = tick
self.dt = tick.datetime
self.crossLimitOrder() self.crossLimitOrder()
self.crossStopOrder() self.crossStopOrder()
self.strategy.onTick(tick) self.strategy.onTick(tick)

View File

@ -17,6 +17,7 @@ from vnctptd import TdApi
from ctpDataType import * from ctpDataType import *
from vtGateway import * from vtGateway import *
# 以下为一些VT类型和CTP类型的映射字典 # 以下为一些VT类型和CTP类型的映射字典
# 价格类型映射 # 价格类型映射
priceTypeMap = {} priceTypeMap = {}
@ -437,7 +438,7 @@ class CtpTdApi(TdApi):
self.frontID = EMPTY_INT # 前置机编号 self.frontID = EMPTY_INT # 前置机编号
self.sessionID = EMPTY_INT # 会话编号 self.sessionID = EMPTY_INT # 会话编号
self.posDict = {} # 缓存持仓数据的字典 self.posBufferDict = {} # 缓存持仓数据的字典
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onFrontConnected(self): def onFrontConnected(self):
@ -627,44 +628,18 @@ class CtpTdApi(TdApi):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onRspQryInvestorPosition(self, data, error, n, last): def onRspQryInvestorPosition(self, data, error, n, last):
"""持仓查询回报""" """持仓查询回报"""
# 获取缓存字典中的持仓对象,若无则创建并初始化 # 获取缓存字典中的持仓缓存,若无则创建并初始化
positionName = '.'.join([data['InstrumentID'], data['PosiDirection']]) positionName = '.'.join([data['InstrumentID'], data['PosiDirection']])
if positionName in self.posDict: if positionName in self.posBufferDict:
pos = self.posDict[positionName] posBuffer = self.posBufferDict[positionName]
else: else:
pos = VtPositionData() posBuffer = PositionBuffer(data, self.gatewayName)
self.posDict[positionName] = pos self.posBufferDict[positionName] = posBuffer
pos.gatewayName = self.gatewayName # 更新持仓缓存并获取VT系统中持仓对象的返回值
pos = posBuffer.updateBuffer(data)
# 保存代码 self.gateway.onPosition(pos)
pos.symbol = data['InstrumentID']
pos.vtSymbol = pos.symbol # 这里因为data中没有ExchangeID这个字段
# 方向
pos.direction = posiDirectionMapReverse.get(data['PosiDirection'], '')
# VT系统持仓名
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
# 持仓冻结数量
if pos.direction == DIRECTION_NET or pos.direction == DIRECTION_LONG:
pos.frozen = data['LongFrozen']
elif pos.direction == DIRECTION_SHORT:
pos.frozen = data['ShortFrozen']
# 持仓量
pos.position = data['Position']
pos.ydPosition = data['Position'] - data['TodayPosition']
# 持仓均价
if pos.position:
pos.price = data['PositionCost'] / pos.position
# 推送
newpos = copy(pos)
self.gateway.onPosition(newpos)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def onRspQryTradingAccount(self, data, error, n, last): def onRspQryTradingAccount(self, data, error, n, last):
@ -1255,12 +1230,9 @@ class CtpTdApi(TdApi):
req['VolumeTotalOriginal'] = orderReq.volume req['VolumeTotalOriginal'] = orderReq.volume
# 下面如果由于传入的类型本接口不支持,则会返回空字符串 # 下面如果由于传入的类型本接口不支持,则会返回空字符串
try: req['OrderPriceType'] = priceTypeMap.get(orderReq.priceType, '')
req['OrderPriceType'] = priceTypeMap[orderReq.priceType] req['Direction'] = directionMap.get(orderReq.direction, '')
req['Direction'] = directionMap[orderReq.direction] req['CombOffsetFlag'] = offsetMap.get(orderReq.offset, '')
req['CombOffsetFlag'] = offsetMap[orderReq.offset]
except KeyError:
return ''
req['OrderRef'] = str(self.orderRef) req['OrderRef'] = str(self.orderRef)
req['InvestorID'] = self.userID req['InvestorID'] = self.userID
@ -1275,6 +1247,16 @@ class CtpTdApi(TdApi):
req['VolumeCondition'] = defineDict['THOST_FTDC_VC_AV'] # 任意成交量 req['VolumeCondition'] = defineDict['THOST_FTDC_VC_AV'] # 任意成交量
req['MinVolume'] = 1 # 最小成交量为1 req['MinVolume'] = 1 # 最小成交量为1
# 判断FAK和FOK
if orderReq.priceType == PRICETYPE_FAK:
req['OrderPriceType'] = defineDict["THOST_FTDC_OPT_LimitPrice"]
req['TimeCondition'] = defineDict['THOST_FTDC_TC_IOC']
req['VolumeCondition'] = defineDict['THOST_FTDC_VC_AV']
if orderReq.priceType == PRICETYPE_FOK:
req['OrderPriceType'] = defineDict["THOST_FTDC_OPT_LimitPrice"]
req['TimeCondition'] = defineDict['THOST_FTDC_TC_IOC']
req['VolumeCondition'] = defineDict['THOST_FTDC_VC_CV']
self.reqOrderInsert(req, self.reqID) self.reqOrderInsert(req, self.reqID)
# 返回订单号(字符串),便于某些算法进行动态管理 # 返回订单号(字符串),便于某些算法进行动态管理
@ -1306,6 +1288,55 @@ class CtpTdApi(TdApi):
self.exit() self.exit()
########################################################################
class PositionBuffer(object):
"""用来缓存持仓的数据,处理上期所的数据返回分今昨的问题"""
#----------------------------------------------------------------------
def __init__(self, data, gatewayName):
"""Constructor"""
self.symbol = data['InstrumentID']
self.direction = posiDirectionMapReverse.get(data['PosiDirection'], '')
self.todayPosition = EMPTY_INT
self.ydPosition = EMPTY_INT
self.todayPositionCost = EMPTY_FLOAT
self.ydPositionCost = EMPTY_FLOAT
# 通过提前创建持仓数据对象并重复使用的方式来降低开销
pos = VtPositionData()
pos.symbol = self.symbol
pos.vtSymbol = self.symbol
pos.gatewayName = gatewayName
pos.direction = self.direction
pos.vtPositionName = '.'.join([pos.vtSymbol, pos.direction])
self.pos = pos
#----------------------------------------------------------------------
def updateBuffer(self, data):
"""更新缓存,返回更新后的持仓数据"""
# 昨仓和今仓的数据更新是分在两条记录里的,因此需要判断检查该条记录对应仓位
if data['TodayPosition']:
self.todayPosition = data['Position']
self.todayPositionCost = data['PositionCost']
elif data['YdPosition']:
self.ydPosition = data['Position']
self.ydPositionCost = data['PositionCost']
# 持仓的昨仓和今仓相加后为总持仓
self.pos.position = self.todayPosition + self.ydPosition
self.pos.ydPosition = self.ydPosition
# 如果手头还有持仓,则通过加权平均方式计算持仓均价
if self.todayPosition or self.ydPosition:
self.pos.price = ((self.todayPositionCost + self.ydPositionCost)/
(self.todayPosition + self.ydPosition))
# 否则价格为0
else:
self.pos.price = 0
return copy(self.pos)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def test(): def test():
"""测试""" """测试"""