修复本地合约文件未更新的bug
This commit is contained in:
parent
8d9910e566
commit
19337fd3a1
@ -88,6 +88,9 @@ class CtaEngine(object):
|
|||||||
# tick缓存
|
# tick缓存
|
||||||
self.tickDict = {}
|
self.tickDict = {}
|
||||||
|
|
||||||
|
# 未能订阅的symbols
|
||||||
|
self.pendingSubcribeSymbols = {}
|
||||||
|
|
||||||
# 注册事件监听
|
# 注册事件监听
|
||||||
self.registerEvent()
|
self.registerEvent()
|
||||||
|
|
||||||
@ -343,6 +346,11 @@ class CtaEngine(object):
|
|||||||
# 1. 获取事件的Tick数据
|
# 1. 获取事件的Tick数据
|
||||||
tick = event.dict_['data']
|
tick = event.dict_['data']
|
||||||
|
|
||||||
|
# 移除待订阅的合约清单
|
||||||
|
if tick.vtSymbol in self.pendingSubcribeSymbols:
|
||||||
|
self.writeCtaLog(u'已成功订阅{0},从待订阅清单中移除'.format(tick.vtSymbol))
|
||||||
|
del self.pendingSubcribeSymbols[tick.vtSymbol]
|
||||||
|
|
||||||
# 缓存最新tick
|
# 缓存最新tick
|
||||||
self.tickDict[tick.vtSymbol] = tick
|
self.tickDict[tick.vtSymbol] = tick
|
||||||
|
|
||||||
@ -453,6 +461,7 @@ class CtaEngine(object):
|
|||||||
|
|
||||||
# 注册持仓更新事件
|
# 注册持仓更新事件
|
||||||
self.eventEngine.register(EVENT_POSITION, self.processPositionEvent)
|
self.eventEngine.register(EVENT_POSITION, self.processPositionEvent)
|
||||||
|
self.eventEngine.register(EVENT_POSITION, self.checkUnsubscribedSymbols)
|
||||||
|
|
||||||
# 注册定时器事件
|
# 注册定时器事件
|
||||||
self.eventEngine.register(EVENT_TIMER, self.processTimerEvent)
|
self.eventEngine.register(EVENT_TIMER, self.processTimerEvent)
|
||||||
@ -597,6 +606,7 @@ class CtaEngine(object):
|
|||||||
symbols = strategy.vtSymbol.split(';')
|
symbols = strategy.vtSymbol.split(';')
|
||||||
|
|
||||||
for symbol in symbols:
|
for symbol in symbols:
|
||||||
|
self.writeCtaLog(u'添加合约{0}与策略的匹配目录'.format(symbol))
|
||||||
if symbol in self.tickStrategyDict:
|
if symbol in self.tickStrategyDict:
|
||||||
l = self.tickStrategyDict[symbol]
|
l = self.tickStrategyDict[symbol]
|
||||||
else:
|
else:
|
||||||
@ -605,7 +615,13 @@ class CtaEngine(object):
|
|||||||
l.append(strategy)
|
l.append(strategy)
|
||||||
|
|
||||||
# 3.订阅合约
|
# 3.订阅合约
|
||||||
|
self.writeCtaLog(u'向gateway订阅合约{0}'.format(symbol))
|
||||||
|
self.subscribe(strategy=strategy, symbol=symbol)
|
||||||
|
|
||||||
|
def subscribe(self, strategy, symbol):
|
||||||
|
"""订阅合约,不成功时,加入到待订阅列表"""
|
||||||
contract = self.mainEngine.getContract(symbol)
|
contract = self.mainEngine.getContract(symbol)
|
||||||
|
|
||||||
if contract:
|
if contract:
|
||||||
# 4.构造订阅请求包
|
# 4.构造订阅请求包
|
||||||
req = VtSubscribeReq()
|
req = VtSubscribeReq()
|
||||||
@ -619,7 +635,18 @@ class CtaEngine(object):
|
|||||||
# 5.调用主引擎的订阅接口
|
# 5.调用主引擎的订阅接口
|
||||||
self.mainEngine.subscribe(req, contract.gatewayName)
|
self.mainEngine.subscribe(req, contract.gatewayName)
|
||||||
else:
|
else:
|
||||||
self.writeCtaLog(u'%s的交易合约%s无法找到' %(name, symbol))
|
print u'Warning, can not find {0} in contracts'.format(symbol)
|
||||||
|
self.writeCtaLog(u'交易合约{}无法找到,添加到待订阅列表'.format (symbol))
|
||||||
|
self.pendingSubcribeSymbols[symbol]=strategy
|
||||||
|
|
||||||
|
def checkUnsubscribedSymbols(self, event):
|
||||||
|
"""持仓更新信息时,检查未提交的合约"""
|
||||||
|
for symbol in self.pendingSubcribeSymbols.keys():
|
||||||
|
contract = self.mainEngine.getContract(symbol)
|
||||||
|
if contract:
|
||||||
|
self.writeCtaLog(u'重新提交合约{0}订阅请求'.format(symbol))
|
||||||
|
strategy = self.pendingSubcribeSymbols[symbol]
|
||||||
|
self.subscribe(strategy=strategy, symbol=symbol)
|
||||||
|
|
||||||
#----------------------------------------------------------------------
|
#----------------------------------------------------------------------
|
||||||
def initStrategy(self, name, force = False):
|
def initStrategy(self, name, force = False):
|
||||||
|
@ -315,6 +315,9 @@ class CtpMdApi(MdApi):
|
|||||||
self.writeLog(text.DATA_SERVER_LOGIN)
|
self.writeLog(text.DATA_SERVER_LOGIN)
|
||||||
|
|
||||||
# 重新订阅之前订阅的合约
|
# 重新订阅之前订阅的合约
|
||||||
|
if len(self.subscribedSymbols) > 0:
|
||||||
|
print u'Resubscribe'
|
||||||
|
|
||||||
for subscribeReq in self.subscribedSymbols:
|
for subscribeReq in self.subscribedSymbols:
|
||||||
self.subscribe(subscribeReq)
|
self.subscribe(subscribeReq)
|
||||||
|
|
||||||
@ -443,7 +446,12 @@ class CtpMdApi(MdApi):
|
|||||||
# 这里的设计是,如果尚未登录就调用了订阅方法
|
# 这里的设计是,如果尚未登录就调用了订阅方法
|
||||||
# 则先保存订阅请求,登录完成后会自动订阅
|
# 则先保存订阅请求,登录完成后会自动订阅
|
||||||
if self.loginStatus:
|
if self.loginStatus:
|
||||||
|
print u'subscribe {0}'.format(str(subscribeReq.symbol))
|
||||||
self.subscribeMarketData(str(subscribeReq.symbol))
|
self.subscribeMarketData(str(subscribeReq.symbol))
|
||||||
|
self.writeLog(u'订阅合约:{0}'.format(str(subscribeReq.symbol)))
|
||||||
|
else:
|
||||||
|
print u'not login, add {0} into subscribe list'.format(str(subscribeReq.symbol))
|
||||||
|
self.writeLog(u'未连接,增加合约{0}至待订阅列表'.format(str(subscribeReq.symbol)))
|
||||||
self.subscribedSymbols.add(subscribeReq)
|
self.subscribedSymbols.add(subscribeReq)
|
||||||
|
|
||||||
#----------------------------------------------------------------------
|
#----------------------------------------------------------------------
|
||||||
@ -560,6 +568,12 @@ class CtpTdApi(TdApi):
|
|||||||
self.reqID += 1
|
self.reqID += 1
|
||||||
self.reqSettlementInfoConfirm(req, self.reqID)
|
self.reqSettlementInfoConfirm(req, self.reqID)
|
||||||
|
|
||||||
|
# 提交合约更新请求
|
||||||
|
try:
|
||||||
|
self.resentReqQryInstrument()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
# 否则,推送错误信息
|
# 否则,推送错误信息
|
||||||
else:
|
else:
|
||||||
err = VtErrorData()
|
err = VtErrorData()
|
||||||
@ -568,6 +582,11 @@ class CtpTdApi(TdApi):
|
|||||||
err.errorMsg = error['ErrorMsg'].decode('gbk')
|
err.errorMsg = error['ErrorMsg'].decode('gbk')
|
||||||
self.gateway.onError(err)
|
self.gateway.onError(err)
|
||||||
|
|
||||||
|
def resentReqQryInstrument(self):
|
||||||
|
# 查询合约代码
|
||||||
|
self.reqID += 1
|
||||||
|
self.reqQryInstrument({}, self.reqID)
|
||||||
|
|
||||||
#----------------------------------------------------------------------
|
#----------------------------------------------------------------------
|
||||||
def onRspUserLogout(self, data, error, n, last):
|
def onRspUserLogout(self, data, error, n, last):
|
||||||
"""登出回报"""
|
"""登出回报"""
|
||||||
@ -658,9 +677,6 @@ class CtpTdApi(TdApi):
|
|||||||
"""确认结算信息回报"""
|
"""确认结算信息回报"""
|
||||||
self.writeLog(text.SETTLEMENT_INFO_CONFIRMED)
|
self.writeLog(text.SETTLEMENT_INFO_CONFIRMED)
|
||||||
|
|
||||||
# 查询合约代码
|
|
||||||
self.reqID += 1
|
|
||||||
self.reqQryInstrument({}, self.reqID)
|
|
||||||
|
|
||||||
#----------------------------------------------------------------------
|
#----------------------------------------------------------------------
|
||||||
def onRspRemoveParkedOrder(self, data, error, n, last):
|
def onRspRemoveParkedOrder(self, data, error, n, last):
|
||||||
|
@ -53,7 +53,7 @@ class MainEngine(object):
|
|||||||
# 用来保存接口对象的字典
|
# 用来保存接口对象的字典
|
||||||
self.gatewayDict = OrderedDict()
|
self.gatewayDict = OrderedDict()
|
||||||
|
|
||||||
# 初始化的接口模块,以及其指定的名称
|
# 初始化的接口模块,以及其指定的名称,CTP是模块,value,是该模块下的多个连接配置文件,如 CTP_JR_connect.json
|
||||||
init_gateway_names = {'CTP': ['CTP', 'CTP_Prod', 'CTP_Post', 'CTP_EBF', 'CTP_JR']}
|
init_gateway_names = {'CTP': ['CTP', 'CTP_Prod', 'CTP_Post', 'CTP_EBF', 'CTP_JR']}
|
||||||
|
|
||||||
# 遍历接口字典并自动创建所有的接口对象
|
# 遍历接口字典并自动创建所有的接口对象
|
||||||
|
Loading…
Reference in New Issue
Block a user