增加通达信行情

This commit is contained in:
msincenselee 2019-03-15 11:00:03 +08:00
parent d5c1bcfcb4
commit 335aee7fc7

View File

@ -18,9 +18,20 @@ from vnpy.trader.vtConstant import *
from vnpy.trader.vtGateway import *
from vnpy.trader.gateway.ctpGateway.language import text
from vnpy.trader.gateway.ctpGateway.ctpDataType import *
from vnpy.trader.vtFunction import getJsonPath
from vnpy.trader.vtFunction import getJsonPath,getShortSymbol
from vnpy.trader.app.ctaStrategy.ctaBase import MARKET_DAY_ONLY,NIGHT_MARKET_SQ1,NIGHT_MARKET_SQ2,NIGHT_MARKET_SQ3,NIGHT_MARKET_ZZ,NIGHT_MARKET_DL
from datetime import datetime,timedelta
# 通达信行情相关
from threading import Thread
from time import sleep
from pytdx.exhq import TdxExHq_API
from queue import Queue, Empty
from multiprocessing.dummy import Pool
import traceback
import copy
# 以下为一些VT类型和CTP类型的映射字典
# 价格类型映射
priceTypeMap = {}
@ -96,15 +107,21 @@ class CtpGateway(VtGateway):
self.mdApi = None # 行情API
self.tdApi = None # 交易API
self.tdxApi = None # 通达信指数行情API
self.mdConnected = False # 行情API连接状态登录完成后为True
self.tdConnected = False # 交易API连接状态
self.tdxConnected = False # 通达信指数行情API得连接状态
self.redisConnected = False # redis行情API的连接状态
self.qryEnabled = False # 是否要启动循环查询
self.subscribedSymbols = set() # 已订阅合约代码
self.requireAuthentication = False
self.tdx_pool_count = 2 # 通达信连接池内连接数
#----------------------------------------------------------------------
def connect(self):
"""连接"""
@ -130,7 +147,7 @@ class CtpGateway(VtGateway):
# 解析json文件
setting = json.load(f)
except IOError:
self.writeLog('{} {}'.format(filePath,text.LOADING_ERROR))
self.writeError('{} {}'.format(filePath,text.LOADING_ERROR))
return
try:
@ -149,6 +166,25 @@ class CtpGateway(VtGateway):
authCode = None
userProductInfo = None
# 如果没有初始化tdxApi
if self.tdxApi is None:
self.writeLog(u'通达信接口未实例化,创建实例')
self.tdxApi = TdxMdApi(self) # 通达信行情API
# 获取tdx配置
tdx_conf = setting.get('tdx',None)
if tdx_conf is not None and isinstance(tdx_conf,dict):
if self.tdxApi is None:
self.writeLog(u'通达信接口未实例化,创建实例')
self.tdxApi = TdxMdApi(self) # 通达信行情API
ip_list = tdx_conf.get('ip_list',None)
if ip_list is not None and len(ip_list)>0:
self.writeLog(u'使用配置文件的tdx服务器清单:{}'.format(ip_list))
self.tdxApi.ip_list = copy.copy(ip_list)
# 获取通达信得缺省连接池数量
self.tdx_pool_count = tdx_conf.get('pool_count', self.tdx_pool_count)
except KeyError:
self.writeLog(text.CONFIG_KEY_MISSING)
return
@ -158,18 +194,34 @@ class CtpGateway(VtGateway):
self.mdApi.connect(userID, password, brokerID, mdAddress)
self.writeLog(u'连接交易服务器')
self.tdApi.connect(userID, password, brokerID, tdAddress, authCode, userProductInfo)
self.setQryEnabled(True)
# 初始化并启动查询
self.initQuery()
for req in self.subscribedSymbols:
self.mdApi.subscribe(req)
for req in list(self.subscribedSymbols):
# 指数合约从tdx行情订阅
if req.symbol[-2:] in ['99']:
req.symbol = req.symbol.upper()
if self.tdxApi is not None:
self.writeLog(u'有指数订阅,连接通达信行情服务器')
self.tdxApi.connect(self.tdx_pool_count)
self.tdxApi.subscribe(req)
else:
self.mdApi.subscribe(req)
#----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅行情"""
if self.mdApi is not None:
self.mdApi.subscribe(subscribeReq)
# 指数合约从tdx行情订阅
if subscribeReq.symbol[-2:] in ['99']:
subscribeReq.symbol = subscribeReq.symbol.upper()
if self.tdxApi:
self.tdxApi.subscribe(subscribeReq)
else:
self.mdApi.subscribe(subscribeReq)
# Allow the strategies to start before the connection
self.subscribedSymbols.add(subscribeReq)
@ -204,6 +256,9 @@ class CtpGateway(VtGateway):
def checkStatus(self):
"""查询md/td的状态"""
if self.tdxApi is not None:
self.tdxApi.checkStatus()
if self.tdApi is None or self.mdApi is None:
return False
@ -228,7 +283,15 @@ class CtpGateway(VtGateway):
tmp2.close()
self.tdConnected = False
self.writeLog(u'主动断开连接')
if self.tdxApi is not None:
self.writeLog(u'断开通达信行情API')
tmp1 = self.tdxApi
self.tdxApi.connection_status = False
self.tdxApi = None
tmp1.close()
self.tdxConnected = False
self.writeLog(u'CTP Gateway 主动断开连接')
#----------------------------------------------------------------------
def initQuery(self):
@ -1616,7 +1679,470 @@ class CtpTdApi(TdApi):
log.logContent = content
self.gateway.onLog(log)
print(u'fininsh load ctpGateway.py')
class TdxMdApi():
"""
通达信数据行情API实现
通过线程池仅仅查询订阅的行情更新合约的数据
"""
def __init__(self, gateway):
self.gateway = gateway # gateway对象
self.gatewayName = gateway.gatewayName # gateway对象名称
self.req_interval = 0.5 # 操作请求间隔500毫秒
self.req_id = EMPTY_INT # 操作请求编号
self.connection_status = False # 连接状态
self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典
self.symbol_market_dict = {} # tdx合约与tdx市场的字典
self.symbol_vn_dict = {} # tdx合约与vtSymbol的对应
self.symbol_tick_dict = {} # tdx合约与最后一个Tick得字典
self.registed_symbol_set = set()
#self.queue = Queue() # 请求队列
self.pool = None # 线程池
#self.req_thread = None # 定时器线程
self.ip_list = [{'ip': '112.74.214.43', 'port': 7727},
{'ip': '59.175.238.38', 'port': 7727},
{'ip': '124.74.236.94', 'port': 7721},
{'ip': '124.74.236.94', 'port': 7721},
{'ip': '58.246.109.27', 'port': 7721}
]
# 调出 {'ip': '218.80.248.229', 'port': 7721},
self.best_ip = {'ip': None, 'port': None}
self.api_dict = {} # API 的连接会话对象字典
self.last_tick_dt = {} # 记录该会话对象的最后一个tick时间
self.instrument_count = 50000
# ----------------------------------------------------------------------
def ping(self, ip, port=7709):
"""
ping行情服务器
:param ip:
:param port:
:param type_:
:return:
"""
apix = TdxExHq_API()
__time1 = datetime.now()
try:
with apix.connect(ip, port):
if apix.get_instrument_count() > 10000:
_timestamp = datetime.now() - __time1
self.writeLog('服务器{}:{},耗时:{}'.format(ip,port,_timestamp))
return _timestamp
else:
self.writeLog(u'该服务器IP {}无响应'.format(ip))
return timedelta(9, 9, 0)
except:
self.writeError(u'tdx ping服务器异常的响应{}'.format(ip))
return timedelta(9, 9, 0)
# ----------------------------------------------------------------------
def select_best_ip(self):
"""
选择行情服务器
:return:
"""
self.writeLog(u'选择通达信行情服务器')
data_future = [self.ping(x['ip'], x['port']) for x in self.ip_list]
best_future_ip = self.ip_list[data_future.index(min(data_future))]
self.writeLog(u'选取 {}:{}'.format(
best_future_ip['ip'], best_future_ip['port']))
return best_future_ip
def connect(self,n=3):
"""
连接通达讯行情服务器
:param n:
:return:
"""
if self.connection_status:
for api in self.api_dict:
if api is not None or getattr(api,"client",None) is not None:
self.writeLog(u'当前已经连接,不需要重新连接')
return
self.writeLog(u'开始通达信行情服务器')
# 选取最佳服务器
if self.best_ip['ip'] is None and self.best_ip['port'] is None:
self.best_ip = self.select_best_ip()
# 创建n个api连接对象实例
for i in range(n):
try:
api = TdxExHq_API( heartbeat=True, auto_retry=True,raise_exception=True)
api.connect(self.best_ip['ip'], self.best_ip['port'])
# 尝试获取市场合约统计
c = api.get_instrument_count()
if c is None or c < 10:
err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'],self.best_ip['port'])
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = -1
err.errorMsg = err_msg
self.gateway.onError(err)
else:
self.writeLog(u'创建第{}个tdx连接'.format(i+1))
self.api_dict[i] = api
self.last_tick_dt[i] = datetime.now()
self.connection_status = True
self.instrument_count = c
except Exception as ex:
self.writeError(u'连接服务器tdx[{}]异常:{},{}'.format(i,str(ex),traceback.format_exc()))
return
# 更新 symbol_exchange_dict , symbol_market_dict
self.qryInstrument()
#self.req_thread = Thread(target=self.addReq)
#self.req_thread.start()
# 创建连接池每个连接都调用run方法
self.pool = Pool(n)
self.pool.map_async(self.run,range(n))
def reconnect(self,i):
"""
重连
:param i:
:return:
"""
try:
self.best_ip = self.select_best_ip()
api = TdxExHq_API(heartbeat=True, auto_retry=True)
api.connect(self.best_ip['ip'], self.best_ip['port'])
# 尝试获取市场合约统计
c = api.get_instrument_count()
if c is None or c < 10:
err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'], self.best_ip['port'])
err = VtErrorData()
err.gatewayName = self.gatewayName
err.errorID = -1
err.errorMsg = err_msg
self.gateway.onError(err)
else:
self.writeLog(u'重新创建第{}个tdx连接'.format(i + 1))
self.api_dict[i] = api
sleep(1)
except Exception as ex:
self.writeError(u'重新连接服务器tdx[{}]异常:{},{}'.format(i, str(ex), traceback.format_exc()))
return
def close(self):
"""退出API"""
self.connection_status = False
if self.pool is not None:
self.pool.close()
self.pool.join()
# ----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅合约"""
# 这里的设计是,如果尚未登录就调用了订阅方法
# 则先保存订阅请求,登录完成后会自动订阅
vn_symbol = str(subscribeReq.symbol)
vn_symbol = vn_symbol.upper()
self.writeLog(u'通达信行情订阅 {}'.format(str(vn_symbol)))
if vn_symbol[-2:] != '99':
self.writeLog(u'{}不是指数合约,不能订阅'.format(vn_symbol))
return
tdx_symbol = vn_symbol[0:-2] + 'L9'
tdx_symbol = tdx_symbol.upper()
self.writeLog(u'{}=>{}'.format(vn_symbol,tdx_symbol))
self.symbol_vn_dict[tdx_symbol] = vn_symbol
if tdx_symbol not in self.registed_symbol_set:
self.registed_symbol_set.add(tdx_symbol)
self.checkStatus()
def checkStatus(self):
#self.writeLog(u'检查tdx接口状态')
if len(self.registed_symbol_set) ==0:
return
# 若还没有启动连接,就启动连接
over_time = [((datetime.now()-dt).total_seconds() > 60) for dt in self.last_tick_dt.values()]
if not self.connection_status or len(self.api_dict) == 0 or any(over_time):
self.writeLog(u'tdx还没有启动连接就启动连接')
self.close()
self.pool = None
self.api_dict = {}
pool_cout = getattr(self.gateway,'tdx_pool_count',3)
self.connect(pool_cout)
#self.writeLog(u'tdx接口状态正常')
def qryInstrument(self):
"""
查询/更新合约信息
:return:
"""
if not self.connection_status:
return
api = self.api_dict.get(0)
if api is None:
self.writeLog(u'取不到api连接更新合约信息失败')
return
# 取得所有的合约信息
num = api.get_instrument_count()
if not isinstance(num,int):
return
all_contacts = sum([api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)],[])
#[{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}]
# 对所有合约处理,更新字典 指数合约-tdx市场指数合约-交易所
for tdx_contract in all_contacts:
tdx_symbol = tdx_contract.get('code', None)
if tdx_symbol is None or tdx_symbol[-2:] not in ['L9']:
continue
tdx_market_id = tdx_contract.get('market')
self.symbol_market_dict[tdx_symbol] = tdx_market_id
if tdx_market_id == 47: # 中金所
self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_CFFEX
elif tdx_market_id == 28: # 郑商所
self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_CZCE
elif tdx_market_id == 29: # 大商所
self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_DCE
elif tdx_market_id == 30: # 上期所+能源
self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_SHFE
# 如果有预定的订阅合约,提前订阅
def run(self, i):
"""
版本1Pool内得线程持续运行,每个线程从queue中获取一个请求并处理
版本2Pool内线程从订阅合约集合中取出符合自己下标 mode n = 0的合约并发送请求
:param i:
:return:
"""
"""
# 版本1
while self.connection_status:
try:
req = self.queue.get(timeout=self.req_interval)
self.processReq(req,i)
except Exception as ex:
self.writeLog(u'tdx[{}] exception:{},{}'.format(i,str(ex),traceback.format_exc()))
"""
# 版本2
try:
api_count = len(self.api_dict)
last_dt = datetime.now()
self.writeLog(u'开始运行tdx[{}],{}'.format(i,last_dt))
while self.connection_status:
symbols = set()
for idx,tdx_symbol in enumerate(list(self.registed_symbol_set)):
#self.writeLog(u'tdx[{}], api_count:{}, idx:{}, tdx_symbol:{}'.format(i, api_count, idx, tdx_symbol))
if idx % api_count == i:
try:
symbols.add(tdx_symbol)
self.processReq(tdx_symbol, i)
except BrokenPipeError as bex:
self.writeError(u'BrokenPipeError{},重试重连tdx[{}]'.format(str(bex),i))
self.reconnect(i)
sleep(5)
break
except Exception as ex:
self.writeError(u'tdx[{}] exception:{},{}'.format(i, str(ex), traceback.format_exc()))
#api = self.api_dict.get(i,None)
#if api is None or getattr(api,'client') is None:
self.writeError(u'重试重连tdx[{}]'.format(i))
print(u'重试重连tdx[{}]'.format(i),file=sys.stderr)
self.reconnect(i)
#self.writeLog(u'tdx[{}] sleep'.format(i))
sleep(self.req_interval)
dt = datetime.now()
if last_dt.minute != dt.minute:
self.writeLog('tdx[{}] check point. {}, process symbols:{}'.format(i,dt,symbols))
last_dt = dt
except Exception as ex:
self.writeError(u'tdx[{}] pool.run exception:{},{}'.format(i, str(ex), traceback.format_exc()))
self.writeError(u'tdx[{}] {}退出'.format(i,datetime.now()))
def processReq(self, req, i):
"""
处理行情信息ticker请求
:param req:
:param i:
:return:
"""
symbol = req
api = self.api_dict.get(i, None)
if api is None:
self.writeLog(u'tdx[{}] Api is None'.format(i))
raise Exception(u'tdx[{}] Api is None'.format(i))
#self.writeLog(u'tdx[{}] get_instrument_quote:({},{})'.format(i,self.symbol_market_dict.get(symbol),symbol))
rt_list = api.get_instrument_quote(self.symbol_market_dict.get(symbol),symbol)
if len(rt_list) == 0:
self.writeLog(u'tdx[{}]: rt_list为空'.format(i))
return
#else:
# self.writeLog(u'tdx[{}]: rt_list数据:{}'.format(i, rt_list))
if i in self.last_tick_dt:
self.last_tick_dt[i] = datetime.now()
for d in list(rt_list):
# 忽略成交量为0的无效单合约tick数据
if d.get('xianliang', 0) <= 0:
self.writeLog(u'忽略成交量为0的无效单合约tick数据:')
continue
code = d.get('code',None)
if symbol != code and code is not None:
#self.writeLog(u'忽略合约{} {} 不一致的tick数据:{}'.format(symbol,d.get('code'),rt_list))
#continue
symbol = code
tick = VtTickData()
tick.gatewayName = self.gatewayName
tick.symbol = self.symbol_vn_dict.get(symbol,None)
if tick.symbol is None:
self.writeLog(u'self.symbol_vn_dict 取不到映射得:{}'.format(symbol))
return
tick.symbol = tick.symbol.upper()
tick.exchange = self.symbol_exchange_dict.get(symbol)
tick.vtSymbol = tick.symbol
tick.preClosePrice = d.get('pre_close')
tick.highPrice = d.get('high')
tick.openPrice = d.get('open')
tick.lowPrice = d.get('low')
tick.lastPrice = d.get('price')
tick.volume = d.get('zongliang',0)
tick.openInterest = d.get('chicang')
tick.datetime = datetime.now()
# 修正毫秒
last_tick = self.symbol_tick_dict.get(symbol,None)
if (last_tick is not None) and tick.datetime.replace(microsecond=0) == last_tick.datetime:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
tick.datetime = tick.datetime.replace(microsecond=500)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12]
else:
tick.datetime = tick.datetime.replace(microsecond=0)
tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12]
tick.date = tick.datetime.strftime('%Y-%m-%d')
# 修正时间
if tick.datetime.hour >= 20:
if tick.datetime.isoweekday() == 5:
# 交易日是星期下周一
tick.tradingDay = tick.datetime + timedelta(days=3)
else:
# 第二天
tick.tradingDay = tick.datetime + timedelta(days=1)
elif tick.datetime.hour < 8 and tick.datetime.isoweekday() == 6:
# 交易日是星期一
tick.tradingDay = tick.datetime + timedelta(days=2)
else:
tick.tradingDay = tick.datetime
tick.tradingDay = tick.tradingDay.strftime('%Y-%m-%d')
# 指数没有涨停和跌停就用昨日收盘价正负10%
tick.upperLimit = tick.preClosePrice * 1.1
tick.lowerLimit = tick.preClosePrice * 0.9
# CTP只有一档行情
tick.bidPrice1 = d.get('bid1')
tick.bidVolume1 = d.get('bid_vol1')
tick.askPrice1 = d.get('ask1')
tick.askVolume1 = d.get('ask_vol1')
short_symbol = tick.vtSymbol
short_symbol = short_symbol.replace('99', '').upper()
# 排除非交易时间得tick
if tick.exchange is EXCHANGE_CFFEX:
if tick.datetime.hour not in [9,10,11,13,14,15]:
return
if tick.datetime.hour == 9 and tick.datetime.minute < 15:
return
if tick.datetime.hour == 15 and tick.datetime.minute >= 15:
return
else: # 大商所/郑商所,上期所,上海能源
# 排除非开盘小时
if tick.datetime.hour in [3,4,5,6,7,8,12,15,16,17,18,19,20]:
return
# 排除早盘 10:15~10:30
if tick.datetime.hour == 10 and 15 <= tick.datetime.minute < 30:
return
# 排除早盘 11:30~12:00
if tick.datetime.hour == 11 and tick.datetime.minute >= 30:
return
# 排除午盘 13:00 ~13:30
if tick.datetime.hour == 13 and tick.datetime.minute < 30:
return
# 排除凌晨2:30~3:00
if tick.datetime.hour == 2 and tick.datetime.minute >= 30:
return
# 排除大商所/郑商所夜盘数据
if short_symbol in NIGHT_MARKET_DL or short_symbol in NIGHT_MARKET_ZZ:
if tick.datetime.hour == 23 and tick.datetime.minute>=30:
return
if tick.datetime.hour in [0,1,2]:
return
# 排除上期所夜盘数据 23:00 收盘
if short_symbol in NIGHT_MARKET_SQ3:
if tick.datetime.hour in [23,0,1,2]:
return
# 排除上期所夜盘数据 1:00 收盘
if short_symbol in NIGHT_MARKET_SQ2:
if tick.datetime.hour in [1,2]:
return
# 排除日盘合约在夜盘得数据
if short_symbol in MARKET_DAY_ONLY and (tick.datetime.hour < 9 or tick.datetime.hour > 16):
#self.writeLog(u'排除日盘合约{}在夜盘得数据'.format(short_symbol))
return
"""
self.writeLog('{},{},{},{},{},{},{},{},{},{},{},{},{},{}'.format(tick.gatewayName, tick.symbol,
tick.exchange, tick.vtSymbol,
tick.datetime, tick.tradingDay,
tick.openPrice, tick.highPrice,
tick.lowPrice, tick.preClosePrice,
tick.bidPrice1,
tick.bidVolume1, tick.askPrice1,
tick.askVolume1))
"""
self.symbol_tick_dict[symbol] = tick
self.gateway.onTick(tick)
# ----------------------------------------------------------------------
def writeLog(self, content):
"""发出日志"""
log = VtLogData()
log.gatewayName = self.gatewayName
log.logContent = content
self.gateway.onLog(log)
def writeError(self,content):
self.gateway.writeError(content)
#----------------------------------------------------------------------
def test():
@ -1636,9 +2162,21 @@ def test():
gateway = CtpGateway(eventEngine)
gateway.connect()
# gateway.connect()
auto_subscribe_symbols = ['M99', 'RB99', 'TA99', 'MA99', 'NI99', 'SR99']
for symbol in auto_subscribe_symbols:
print(u'自动订阅合约:{}'.format(symbol))
sub = VtSubscribeReq()
sub.symbol = symbol
gateway.subscribe(sub)
gateway.connect()
sys.exit(app.exec_())
if __name__ == '__main__':
test()
try:
test()
except Exception as ex:
print(u'异常:{},{}'.format(str(ex), traceback.format_exc()))
print('Finished')