From 1d7fbcfe1b7c82ecba8a0fe64c4e34bc4de38fe8 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Wed, 18 Nov 2020 14:59:03 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=A2=9E=E5=BC=BA=E5=8A=9F=E8=83=BD]=20CTP?= =?UTF-8?q?=E6=94=AF=E6=8C=81=EF=BC=88=E4=B8=80=E6=A1=A3=E3=80=81=E4=BA=94?= =?UTF-8?q?=E6=A1=A3=EF=BC=89=E8=A1=8C=E6=83=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/gateway/ctp/ctp_gateway.py | 50 +++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index d909ebdd..4084aa4b 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -251,6 +251,7 @@ class CtpGateway(BaseGateway): self.td_api = None self.md_api = None + self.l2_md_api = None self.tdx_api = None self.rabbit_api = None self.tq_api = None @@ -269,6 +270,7 @@ class CtpGateway(BaseGateway): brokerid = setting["经纪商代码"] td_address = setting["交易服务器"] md_address = setting["行情服务器"] + md_address_level2 = setting.get("行情服务器_五档", None) appid = setting["产品名称"] auth_code = setting["授权编码"] product_info = setting["产品信息"] @@ -285,6 +287,12 @@ class CtpGateway(BaseGateway): and (not md_address.startswith("ssl://")) ): md_address = "tcp://" + md_address + if md_address_level2: + if ( + (not md_address_level2.startswith("tcp://")) + and (not md_address_level2.startswith("ssl://")) + ): + md_address_level2 = "tcp://" + md_address_level2 # 获取自定义价差/价比合约的配置 try: @@ -309,6 +317,11 @@ class CtpGateway(BaseGateway): self.md_api = CtpMdApi(self) self.md_api.connect(md_address, userid, password, brokerid) + if not self.l2_md_api and md_address_level2: + self.write_log(f'激活五档行情配置:{md_address_level2}') + self.l2_md_api = CtpMdApi(gateway=self, level2=True) + self.l2_md_api.connect(md_address_level2, userid, password, brokerid) + if rabbit_dict: self.write_log(f'激活RabbitMQ行情接口') self.rabbit_api = SubMdApi(gateway=self) @@ -440,6 +453,9 @@ class CtpGateway(BaseGateway): if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]: self.write_log(f'使用天勤接口订阅{req.symbol}') self.tq_api.subscribe(req) + if self.l2_md_api and req.exchange in [Exchange.SHFE, Exchange.INE]: + self.write_log(f'使用五档行情接口订阅:{req.symbol}') + self.l2_md_api.subscribe(req) else: self.write_log(f'使用CTP接口订阅{req.symbol}') self.md_api.subscribe(req) @@ -496,6 +512,12 @@ class CtpGateway(BaseGateway): self.md_api = None tmp1.close() + if self.l2_md_api: + self.write_log('断开五档行情API') + tmp1 = self.l2_md_api + self.l2_md_api = None + tmp1.close() + if self.td_api: self.write_log('断开交易API') tmp2 = self.td_api @@ -549,13 +571,17 @@ class CtpGateway(BaseGateway): class CtpMdApi(MdApi): """""" - def __init__(self, gateway): + def __init__(self, gateway, level2=False): """Constructor""" super(CtpMdApi, self).__init__() self.gateway = gateway self.gateway_name = gateway.gateway_name - + self.level2 = level2 # 5档行情 + if self.level2: + self.name = "L2_" + else: + self.name = "" self.reqid = 0 self.connect_status = False @@ -570,17 +596,19 @@ class CtpMdApi(MdApi): """ Callback when front server is connected. """ - self.gateway.write_log("行情服务器连接成功") + self.gateway.write_log(f"{self.name}行情服务器连接成功") self.login() - self.gateway.status.update({'md_con': True, 'md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) + self.gateway.status.update( + {f'{self.name}md_con': True, f'{self.name}md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onFrontDisconnected(self, reason: int): """ Callback when front server is disconnected. """ self.login_status = False - self.gateway.write_log(f"行情服务器连接断开,原因{reason}") - self.gateway.status.update({'md_con': False, 'md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) + self.gateway.write_log(f"{self.name}行情服务器连接断开,原因{reason}") + self.gateway.status.update( + {f'{self.name}md_con': False, f'{self.name}md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool): """ @@ -588,25 +616,25 @@ class CtpMdApi(MdApi): """ if not error["ErrorID"]: self.login_status = True - self.gateway.write_log("行情服务器登录成功") + self.gateway.write_log(f"{self.name}行情服务器登录成功") for symbol in self.subscribed: self.subscribeMarketData(symbol) else: - self.gateway.write_error("行情服务器登录失败", error) + self.gateway.write_error(f"{self.name}行情服务器登录失败", error) def onRspError(self, error: dict, reqid: int, last: bool): """ Callback when error occured. """ - self.gateway.write_error("行情接口报错", error) + self.gateway.write_error(f"{self.name}行情接口报错", error) def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool): """""" if not error or not error["ErrorID"]: return - self.gateway.write_error("行情订阅失败", error) + self.gateway.write_error(f"{self.name}行情订阅失败", error) def onRtnDepthMarketData(self, data: dict): """ @@ -719,7 +747,7 @@ class CtpMdApi(MdApi): Subscribe to tick data update. """ if self.login_status: - self.gateway.write_log(f'订阅:{req.exchange} {req.symbol}') + self.gateway.write_log(f'{self.name}订阅:{req.exchange} {req.symbol}') self.subscribeMarketData(req.symbol) self.subscribed.add(req.symbol)