[增强功能] CTP支持(一档、五档)行情

This commit is contained in:
msincenselee 2020-11-18 14:59:03 +08:00
parent aafedafca3
commit 1d7fbcfe1b

View File

@ -251,6 +251,7 @@ class CtpGateway(BaseGateway):
self.td_api = None
self.md_api = None
self.l2_md_api = None
self.tdx_api = None
self.rabbit_api = None
self.tq_api = None
@ -269,6 +270,7 @@ class CtpGateway(BaseGateway):
brokerid = setting["经纪商代码"]
td_address = setting["交易服务器"]
md_address = setting["行情服务器"]
md_address_level2 = setting.get("行情服务器_五档", None)
appid = setting["产品名称"]
auth_code = setting["授权编码"]
product_info = setting["产品信息"]
@ -285,6 +287,12 @@ class CtpGateway(BaseGateway):
and (not md_address.startswith("ssl://"))
):
md_address = "tcp://" + md_address
if md_address_level2:
if (
(not md_address_level2.startswith("tcp://"))
and (not md_address_level2.startswith("ssl://"))
):
md_address_level2 = "tcp://" + md_address_level2
# 获取自定义价差/价比合约的配置
try:
@ -309,6 +317,11 @@ class CtpGateway(BaseGateway):
self.md_api = CtpMdApi(self)
self.md_api.connect(md_address, userid, password, brokerid)
if not self.l2_md_api and md_address_level2:
self.write_log(f'激活五档行情配置:{md_address_level2}')
self.l2_md_api = CtpMdApi(gateway=self, level2=True)
self.l2_md_api.connect(md_address_level2, userid, password, brokerid)
if rabbit_dict:
self.write_log(f'激活RabbitMQ行情接口')
self.rabbit_api = SubMdApi(gateway=self)
@ -440,6 +453,9 @@ class CtpGateway(BaseGateway):
if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
self.write_log(f'使用天勤接口订阅{req.symbol}')
self.tq_api.subscribe(req)
if self.l2_md_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
self.write_log(f'使用五档行情接口订阅:{req.symbol}')
self.l2_md_api.subscribe(req)
else:
self.write_log(f'使用CTP接口订阅{req.symbol}')
self.md_api.subscribe(req)
@ -496,6 +512,12 @@ class CtpGateway(BaseGateway):
self.md_api = None
tmp1.close()
if self.l2_md_api:
self.write_log('断开五档行情API')
tmp1 = self.l2_md_api
self.l2_md_api = None
tmp1.close()
if self.td_api:
self.write_log('断开交易API')
tmp2 = self.td_api
@ -549,13 +571,17 @@ class CtpGateway(BaseGateway):
class CtpMdApi(MdApi):
""""""
def __init__(self, gateway):
def __init__(self, gateway, level2=False):
"""Constructor"""
super(CtpMdApi, self).__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.level2 = level2 # 5档行情
if self.level2:
self.name = "L2_"
else:
self.name = ""
self.reqid = 0
self.connect_status = False
@ -570,17 +596,19 @@ class CtpMdApi(MdApi):
"""
Callback when front server is connected.
"""
self.gateway.write_log("行情服务器连接成功")
self.gateway.write_log(f"{self.name}行情服务器连接成功")
self.login()
self.gateway.status.update({'md_con': True, 'md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
self.gateway.status.update(
{f'{self.name}md_con': True, f'{self.name}md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def onFrontDisconnected(self, reason: int):
"""
Callback when front server is disconnected.
"""
self.login_status = False
self.gateway.write_log(f"行情服务器连接断开,原因{reason}")
self.gateway.status.update({'md_con': False, 'md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
self.gateway.write_log(f"{self.name}行情服务器连接断开,原因{reason}")
self.gateway.status.update(
{f'{self.name}md_con': False, f'{self.name}md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool):
"""
@ -588,25 +616,25 @@ class CtpMdApi(MdApi):
"""
if not error["ErrorID"]:
self.login_status = True
self.gateway.write_log("行情服务器登录成功")
self.gateway.write_log(f"{self.name}行情服务器登录成功")
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
else:
self.gateway.write_error("行情服务器登录失败", error)
self.gateway.write_error(f"{self.name}行情服务器登录失败", error)
def onRspError(self, error: dict, reqid: int, last: bool):
"""
Callback when error occured.
"""
self.gateway.write_error("行情接口报错", error)
self.gateway.write_error(f"{self.name}行情接口报错", error)
def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool):
""""""
if not error or not error["ErrorID"]:
return
self.gateway.write_error("行情订阅失败", error)
self.gateway.write_error(f"{self.name}行情订阅失败", error)
def onRtnDepthMarketData(self, data: dict):
"""
@ -719,7 +747,7 @@ class CtpMdApi(MdApi):
Subscribe to tick data update.
"""
if self.login_status:
self.gateway.write_log(f'订阅:{req.exchange} {req.symbol}')
self.gateway.write_log(f'{self.name}订阅:{req.exchange} {req.symbol}')
self.subscribeMarketData(req.symbol)
self.subscribed.add(req.symbol)