diff --git a/vn.trader/ctaStrategy/ctaEngine.py b/vn.trader/ctaStrategy/ctaEngine.py index f9fecee4..28a9fc6e 100644 --- a/vn.trader/ctaStrategy/ctaEngine.py +++ b/vn.trader/ctaStrategy/ctaEngine.py @@ -88,6 +88,9 @@ class CtaEngine(object): # tick缓存 self.tickDict = {} + # 未能订阅的symbols + self.pendingSubcribeSymbols = {} + # 注册事件监听 self.registerEvent() @@ -343,6 +346,11 @@ class CtaEngine(object): # 1. 获取事件的Tick数据 tick = event.dict_['data'] + # 移除待订阅的合约清单 + if tick.vtSymbol in self.pendingSubcribeSymbols: + self.writeCtaLog(u'已成功订阅{0},从待订阅清单中移除'.format(tick.vtSymbol)) + del self.pendingSubcribeSymbols[tick.vtSymbol] + # 缓存最新tick self.tickDict[tick.vtSymbol] = tick @@ -453,6 +461,7 @@ class CtaEngine(object): # 注册持仓更新事件 self.eventEngine.register(EVENT_POSITION, self.processPositionEvent) + self.eventEngine.register(EVENT_POSITION, self.checkUnsubscribedSymbols) # 注册定时器事件 self.eventEngine.register(EVENT_TIMER, self.processTimerEvent) @@ -597,6 +606,7 @@ class CtaEngine(object): symbols = strategy.vtSymbol.split(';') for symbol in symbols: + self.writeCtaLog(u'添加合约{0}与策略的匹配目录'.format(symbol)) if symbol in self.tickStrategyDict: l = self.tickStrategyDict[symbol] else: @@ -605,21 +615,38 @@ class CtaEngine(object): l.append(strategy) # 3.订阅合约 - contract = self.mainEngine.getContract(symbol) - if contract: - # 4.构造订阅请求包 - req = VtSubscribeReq() - req.symbol = contract.symbol - req.exchange = contract.exchange + self.writeCtaLog(u'向gateway订阅合约{0}'.format(symbol)) + self.subscribe(strategy=strategy, symbol=symbol) - # 对于IB接口订阅行情时所需的货币和产品类型,从策略属性中获取 - req.currency = strategy.currency - req.productClass = strategy.productClass + def subscribe(self, strategy, symbol): + """订阅合约,不成功时,加入到待订阅列表""" + contract = self.mainEngine.getContract(symbol) - # 5.调用主引擎的订阅接口 - self.mainEngine.subscribe(req, contract.gatewayName) - else: - self.writeCtaLog(u'%s的交易合约%s无法找到' %(name, symbol)) + if contract: + # 4.构造订阅请求包 + req = VtSubscribeReq() + req.symbol = contract.symbol + req.exchange = contract.exchange + + # 对于IB接口订阅行情时所需的货币和产品类型,从策略属性中获取 + req.currency = strategy.currency + req.productClass = strategy.productClass + + # 5.调用主引擎的订阅接口 + self.mainEngine.subscribe(req, contract.gatewayName) + else: + print u'Warning, can not find {0} in contracts'.format(symbol) + self.writeCtaLog(u'交易合约{}无法找到,添加到待订阅列表'.format (symbol)) + self.pendingSubcribeSymbols[symbol]=strategy + + def checkUnsubscribedSymbols(self, event): + """持仓更新信息时,检查未提交的合约""" + for symbol in self.pendingSubcribeSymbols.keys(): + contract = self.mainEngine.getContract(symbol) + if contract: + self.writeCtaLog(u'重新提交合约{0}订阅请求'.format(symbol)) + strategy = self.pendingSubcribeSymbols[symbol] + self.subscribe(strategy=strategy, symbol=symbol) #---------------------------------------------------------------------- def initStrategy(self, name, force = False): @@ -806,7 +833,7 @@ class CtaEngine(object): newPrice = round(price / priceTick, 0) * priceTick return newPrice - # ---------------------------------------------------------------------- + # ---------------------------------------------------------------------- def getAccountInfo(self): """获取账号的实时权益、可用资金、仓位比例 Added by Incenselee diff --git a/vn.trader/gateway/ctpGateway/ctpGateway.py b/vn.trader/gateway/ctpGateway/ctpGateway.py index 0b3f74be..542d13bc 100644 --- a/vn.trader/gateway/ctpGateway/ctpGateway.py +++ b/vn.trader/gateway/ctpGateway/ctpGateway.py @@ -315,6 +315,9 @@ class CtpMdApi(MdApi): self.writeLog(text.DATA_SERVER_LOGIN) # 重新订阅之前订阅的合约 + if len(self.subscribedSymbols) > 0: + print u'Resubscribe' + for subscribeReq in self.subscribedSymbols: self.subscribe(subscribeReq) @@ -443,7 +446,12 @@ class CtpMdApi(MdApi): # 这里的设计是,如果尚未登录就调用了订阅方法 # 则先保存订阅请求,登录完成后会自动订阅 if self.loginStatus: + print u'subscribe {0}'.format(str(subscribeReq.symbol)) self.subscribeMarketData(str(subscribeReq.symbol)) + self.writeLog(u'订阅合约:{0}'.format(str(subscribeReq.symbol))) + else: + print u'not login, add {0} into subscribe list'.format(str(subscribeReq.symbol)) + self.writeLog(u'未连接,增加合约{0}至待订阅列表'.format(str(subscribeReq.symbol))) self.subscribedSymbols.add(subscribeReq) #---------------------------------------------------------------------- @@ -558,7 +566,13 @@ class CtpTdApi(TdApi): req['BrokerID'] = self.brokerID req['InvestorID'] = self.userID self.reqID += 1 - self.reqSettlementInfoConfirm(req, self.reqID) + self.reqSettlementInfoConfirm(req, self.reqID) + + # 提交合约更新请求 + try: + self.resentReqQryInstrument() + except: + pass # 否则,推送错误信息 else: @@ -567,7 +581,12 @@ class CtpTdApi(TdApi): err.errorID = error['ErrorID'] err.errorMsg = error['ErrorMsg'].decode('gbk') self.gateway.onError(err) - + + def resentReqQryInstrument(self): + # 查询合约代码 + self.reqID += 1 + self.reqQryInstrument({}, self.reqID) + #---------------------------------------------------------------------- def onRspUserLogout(self, data, error, n, last): """登出回报""" @@ -657,10 +676,7 @@ class CtpTdApi(TdApi): def onRspSettlementInfoConfirm(self, data, error, n, last): """确认结算信息回报""" self.writeLog(text.SETTLEMENT_INFO_CONFIRMED) - - # 查询合约代码 - self.reqID += 1 - self.reqQryInstrument({}, self.reqID) + #---------------------------------------------------------------------- def onRspRemoveParkedOrder(self, data, error, n, last): diff --git a/vn.trader/vtEngine.py b/vn.trader/vtEngine.py index e2eac4de..c81103dc 100644 --- a/vn.trader/vtEngine.py +++ b/vn.trader/vtEngine.py @@ -53,7 +53,7 @@ class MainEngine(object): # 用来保存接口对象的字典 self.gatewayDict = OrderedDict() - # 初始化的接口模块,以及其指定的名称 + # 初始化的接口模块,以及其指定的名称,CTP是模块,value,是该模块下的多个连接配置文件,如 CTP_JR_connect.json init_gateway_names = {'CTP': ['CTP', 'CTP_Prod', 'CTP_Post', 'CTP_EBF', 'CTP_JR']} # 遍历接口字典并自动创建所有的接口对象