diff --git a/vnpy/trader/gateway/ctpGateway/ctpGateway.py b/vnpy/trader/gateway/ctpGateway/ctpGateway.py index f5a74651..ca767ae0 100644 --- a/vnpy/trader/gateway/ctpGateway/ctpGateway.py +++ b/vnpy/trader/gateway/ctpGateway/ctpGateway.py @@ -18,9 +18,20 @@ from vnpy.trader.vtConstant import * from vnpy.trader.vtGateway import * from vnpy.trader.gateway.ctpGateway.language import text from vnpy.trader.gateway.ctpGateway.ctpDataType import * -from vnpy.trader.vtFunction import getJsonPath +from vnpy.trader.vtFunction import getJsonPath,getShortSymbol +from vnpy.trader.app.ctaStrategy.ctaBase import MARKET_DAY_ONLY,NIGHT_MARKET_SQ1,NIGHT_MARKET_SQ2,NIGHT_MARKET_SQ3,NIGHT_MARKET_ZZ,NIGHT_MARKET_DL + from datetime import datetime,timedelta +# 通达信行情相关 +from threading import Thread +from time import sleep +from pytdx.exhq import TdxExHq_API +from queue import Queue, Empty +from multiprocessing.dummy import Pool +import traceback +import copy + # 以下为一些VT类型和CTP类型的映射字典 # 价格类型映射 priceTypeMap = {} @@ -96,15 +107,21 @@ class CtpGateway(VtGateway): self.mdApi = None # 行情API self.tdApi = None # 交易API + self.tdxApi = None # 通达信指数行情API + self.mdConnected = False # 行情API连接状态,登录完成后为True self.tdConnected = False # 交易API连接状态 - + self.tdxConnected = False # 通达信指数行情API得连接状态 + self.redisConnected = False # redis行情API的连接状态 + self.qryEnabled = False # 是否要启动循环查询 self.subscribedSymbols = set() # 已订阅合约代码 self.requireAuthentication = False - + + self.tdx_pool_count = 2 # 通达信连接池内连接数 + #---------------------------------------------------------------------- def connect(self): """连接""" @@ -130,7 +147,7 @@ class CtpGateway(VtGateway): # 解析json文件 setting = json.load(f) except IOError: - self.writeLog('{} {}'.format(filePath,text.LOADING_ERROR)) + self.writeError('{} {}'.format(filePath,text.LOADING_ERROR)) return try: @@ -149,6 +166,25 @@ class CtpGateway(VtGateway): authCode = None userProductInfo = None + # 如果没有初始化tdxApi + if self.tdxApi is None: + self.writeLog(u'通达信接口未实例化,创建实例') + self.tdxApi = TdxMdApi(self) # 通达信行情API + + # 获取tdx配置 + tdx_conf = setting.get('tdx',None) + if tdx_conf is not None and isinstance(tdx_conf,dict): + if self.tdxApi is None: + self.writeLog(u'通达信接口未实例化,创建实例') + self.tdxApi = TdxMdApi(self) # 通达信行情API + ip_list = tdx_conf.get('ip_list',None) + if ip_list is not None and len(ip_list)>0: + self.writeLog(u'使用配置文件的tdx服务器清单:{}'.format(ip_list)) + self.tdxApi.ip_list = copy.copy(ip_list) + + # 获取通达信得缺省连接池数量 + self.tdx_pool_count = tdx_conf.get('pool_count', self.tdx_pool_count) + except KeyError: self.writeLog(text.CONFIG_KEY_MISSING) return @@ -158,18 +194,34 @@ class CtpGateway(VtGateway): self.mdApi.connect(userID, password, brokerID, mdAddress) self.writeLog(u'连接交易服务器') self.tdApi.connect(userID, password, brokerID, tdAddress, authCode, userProductInfo) + self.setQryEnabled(True) # 初始化并启动查询 self.initQuery() - for req in self.subscribedSymbols: - self.mdApi.subscribe(req) + for req in list(self.subscribedSymbols): + # 指数合约,从tdx行情订阅 + if req.symbol[-2:] in ['99']: + req.symbol = req.symbol.upper() + if self.tdxApi is not None: + self.writeLog(u'有指数订阅,连接通达信行情服务器') + self.tdxApi.connect(self.tdx_pool_count) + self.tdxApi.subscribe(req) + else: + self.mdApi.subscribe(req) #---------------------------------------------------------------------- def subscribe(self, subscribeReq): """订阅行情""" if self.mdApi is not None: - self.mdApi.subscribe(subscribeReq) + # 指数合约,从tdx行情订阅 + if subscribeReq.symbol[-2:] in ['99']: + subscribeReq.symbol = subscribeReq.symbol.upper() + if self.tdxApi: + self.tdxApi.subscribe(subscribeReq) + + else: + self.mdApi.subscribe(subscribeReq) # Allow the strategies to start before the connection self.subscribedSymbols.add(subscribeReq) @@ -204,6 +256,9 @@ class CtpGateway(VtGateway): def checkStatus(self): """查询md/td的状态""" + if self.tdxApi is not None: + self.tdxApi.checkStatus() + if self.tdApi is None or self.mdApi is None: return False @@ -228,7 +283,15 @@ class CtpGateway(VtGateway): tmp2.close() self.tdConnected = False - self.writeLog(u'主动断开连接') + if self.tdxApi is not None: + self.writeLog(u'断开通达信行情API') + tmp1 = self.tdxApi + self.tdxApi.connection_status = False + self.tdxApi = None + tmp1.close() + self.tdxConnected = False + + self.writeLog(u'CTP Gateway 主动断开连接') #---------------------------------------------------------------------- def initQuery(self): @@ -1616,7 +1679,470 @@ class CtpTdApi(TdApi): log.logContent = content self.gateway.onLog(log) -print(u'fininsh load ctpGateway.py') +class TdxMdApi(): + """ + 通达信数据行情API实现 + 通过线程池,仅仅查询订阅的行情,更新合约的数据 + + """ + + def __init__(self, gateway): + self.gateway = gateway # gateway对象 + self.gatewayName = gateway.gatewayName # gateway对象名称 + + self.req_interval = 0.5 # 操作请求间隔500毫秒 + self.req_id = EMPTY_INT # 操作请求编号 + self.connection_status = False # 连接状态 + + self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 + self.symbol_market_dict = {} # tdx合约与tdx市场的字典 + self.symbol_vn_dict = {} # tdx合约与vtSymbol的对应 + self.symbol_tick_dict = {} # tdx合约与最后一个Tick得字典 + self.registed_symbol_set = set() + + #self.queue = Queue() # 请求队列 + self.pool = None # 线程池 + #self.req_thread = None # 定时器线程 + + self.ip_list = [{'ip': '112.74.214.43', 'port': 7727}, + {'ip': '59.175.238.38', 'port': 7727}, + {'ip': '124.74.236.94', 'port': 7721}, + {'ip': '124.74.236.94', 'port': 7721}, + {'ip': '58.246.109.27', 'port': 7721} + ] + # 调出 {'ip': '218.80.248.229', 'port': 7721}, + + self.best_ip = {'ip': None, 'port': None} + self.api_dict = {} # API 的连接会话对象字典 + self.last_tick_dt = {} # 记录该会话对象的最后一个tick时间 + + self.instrument_count = 50000 + # ---------------------------------------------------------------------- + def ping(self, ip, port=7709): + """ + ping行情服务器 + :param ip: + :param port: + :param type_: + :return: + """ + apix = TdxExHq_API() + __time1 = datetime.now() + try: + with apix.connect(ip, port): + if apix.get_instrument_count() > 10000: + _timestamp = datetime.now() - __time1 + self.writeLog('服务器{}:{},耗时:{}'.format(ip,port,_timestamp)) + return _timestamp + else: + self.writeLog(u'该服务器IP {}无响应'.format(ip)) + return timedelta(9, 9, 0) + except: + self.writeError(u'tdx ping服务器,异常的响应{}'.format(ip)) + return timedelta(9, 9, 0) + + # ---------------------------------------------------------------------- + def select_best_ip(self): + """ + 选择行情服务器 + :return: + """ + self.writeLog(u'选择通达信行情服务器') + + data_future = [self.ping(x['ip'], x['port']) for x in self.ip_list] + + best_future_ip = self.ip_list[data_future.index(min(data_future))] + + self.writeLog(u'选取 {}:{}'.format( + best_future_ip['ip'], best_future_ip['port'])) + return best_future_ip + + def connect(self,n=3): + """ + 连接通达讯行情服务器 + :param n: + :return: + """ + if self.connection_status: + for api in self.api_dict: + if api is not None or getattr(api,"client",None) is not None: + self.writeLog(u'当前已经连接,不需要重新连接') + return + + self.writeLog(u'开始通达信行情服务器') + + # 选取最佳服务器 + if self.best_ip['ip'] is None and self.best_ip['port'] is None: + self.best_ip = self.select_best_ip() + + # 创建n个api连接对象实例 + for i in range(n): + try: + api = TdxExHq_API( heartbeat=True, auto_retry=True,raise_exception=True) + api.connect(self.best_ip['ip'], self.best_ip['port']) + # 尝试获取市场合约统计 + c = api.get_instrument_count() + if c is None or c < 10: + err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'],self.best_ip['port']) + err = VtErrorData() + err.gatewayName = self.gatewayName + err.errorID = -1 + err.errorMsg = err_msg + self.gateway.onError(err) + else: + self.writeLog(u'创建第{}个tdx连接'.format(i+1)) + self.api_dict[i] = api + self.last_tick_dt[i] = datetime.now() + self.connection_status = True + self.instrument_count = c + + except Exception as ex: + self.writeError(u'连接服务器tdx[{}]异常:{},{}'.format(i,str(ex),traceback.format_exc())) + return + + # 更新 symbol_exchange_dict , symbol_market_dict + self.qryInstrument() + + #self.req_thread = Thread(target=self.addReq) + #self.req_thread.start() + + # 创建连接池,每个连接都调用run方法 + self.pool = Pool(n) + self.pool.map_async(self.run,range(n)) + + def reconnect(self,i): + """ + 重连 + :param i: + :return: + """ + try: + self.best_ip = self.select_best_ip() + api = TdxExHq_API(heartbeat=True, auto_retry=True) + api.connect(self.best_ip['ip'], self.best_ip['port']) + # 尝试获取市场合约统计 + c = api.get_instrument_count() + if c is None or c < 10: + err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'], self.best_ip['port']) + err = VtErrorData() + err.gatewayName = self.gatewayName + err.errorID = -1 + err.errorMsg = err_msg + self.gateway.onError(err) + else: + self.writeLog(u'重新创建第{}个tdx连接'.format(i + 1)) + self.api_dict[i] = api + sleep(1) + except Exception as ex: + self.writeError(u'重新连接服务器tdx[{}]异常:{},{}'.format(i, str(ex), traceback.format_exc())) + return + + def close(self): + """退出API""" + self.connection_status = False + + if self.pool is not None: + self.pool.close() + self.pool.join() + # ---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅合约""" + # 这里的设计是,如果尚未登录就调用了订阅方法 + # 则先保存订阅请求,登录完成后会自动订阅 + vn_symbol = str(subscribeReq.symbol) + vn_symbol = vn_symbol.upper() + self.writeLog(u'通达信行情订阅 {}'.format(str(vn_symbol))) + + if vn_symbol[-2:] != '99': + self.writeLog(u'{}不是指数合约,不能订阅'.format(vn_symbol)) + return + + tdx_symbol = vn_symbol[0:-2] + 'L9' + tdx_symbol = tdx_symbol.upper() + self.writeLog(u'{}=>{}'.format(vn_symbol,tdx_symbol)) + self.symbol_vn_dict[tdx_symbol] = vn_symbol + + if tdx_symbol not in self.registed_symbol_set: + self.registed_symbol_set.add(tdx_symbol) + + self.checkStatus() + + def checkStatus(self): + #self.writeLog(u'检查tdx接口状态') + if len(self.registed_symbol_set) ==0: + return + + # 若还没有启动连接,就启动连接 + over_time = [((datetime.now()-dt).total_seconds() > 60) for dt in self.last_tick_dt.values()] + if not self.connection_status or len(self.api_dict) == 0 or any(over_time): + self.writeLog(u'tdx还没有启动连接,就启动连接') + self.close() + self.pool = None + self.api_dict = {} + pool_cout = getattr(self.gateway,'tdx_pool_count',3) + self.connect(pool_cout) + + #self.writeLog(u'tdx接口状态正常') + + def qryInstrument(self): + """ + 查询/更新合约信息 + :return: + """ + if not self.connection_status: + return + + api = self.api_dict.get(0) + if api is None: + self.writeLog(u'取不到api连接,更新合约信息失败') + return + + # 取得所有的合约信息 + num = api.get_instrument_count() + if not isinstance(num,int): + return + + all_contacts = sum([api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)],[]) + #[{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}] + + # 对所有合约处理,更新字典 指数合约-tdx市场,指数合约-交易所 + for tdx_contract in all_contacts: + tdx_symbol = tdx_contract.get('code', None) + if tdx_symbol is None or tdx_symbol[-2:] not in ['L9']: + continue + tdx_market_id = tdx_contract.get('market') + self.symbol_market_dict[tdx_symbol] = tdx_market_id + if tdx_market_id == 47: # 中金所 + self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_CFFEX + elif tdx_market_id == 28: # 郑商所 + self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_CZCE + elif tdx_market_id == 29: # 大商所 + self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_DCE + elif tdx_market_id == 30: # 上期所+能源 + self.symbol_exchange_dict[tdx_symbol] = EXCHANGE_SHFE + + # 如果有预定的订阅合约,提前订阅 + + def run(self, i): + """ + 版本1:Pool内得线程,持续运行,每个线程从queue中获取一个请求并处理 + 版本2:Pool内线程,从订阅合约集合中,取出符合自己下标 mode n = 0的合约,并发送请求 + :param i: + :return: + """ + """ + # 版本1 + while self.connection_status: + try: + req = self.queue.get(timeout=self.req_interval) + self.processReq(req,i) + except Exception as ex: + self.writeLog(u'tdx[{}] exception:{},{}'.format(i,str(ex),traceback.format_exc())) + """ + # 版本2: + try: + api_count = len(self.api_dict) + last_dt = datetime.now() + self.writeLog(u'开始运行tdx[{}],{}'.format(i,last_dt)) + while self.connection_status: + symbols = set() + for idx,tdx_symbol in enumerate(list(self.registed_symbol_set)): + #self.writeLog(u'tdx[{}], api_count:{}, idx:{}, tdx_symbol:{}'.format(i, api_count, idx, tdx_symbol)) + if idx % api_count == i: + try: + symbols.add(tdx_symbol) + self.processReq(tdx_symbol, i) + except BrokenPipeError as bex: + self.writeError(u'BrokenPipeError{},重试重连tdx[{}]'.format(str(bex),i)) + self.reconnect(i) + sleep(5) + break + except Exception as ex: + self.writeError(u'tdx[{}] exception:{},{}'.format(i, str(ex), traceback.format_exc())) + + #api = self.api_dict.get(i,None) + #if api is None or getattr(api,'client') is None: + self.writeError(u'重试重连tdx[{}]'.format(i)) + print(u'重试重连tdx[{}]'.format(i),file=sys.stderr) + self.reconnect(i) + + #self.writeLog(u'tdx[{}] sleep'.format(i)) + sleep(self.req_interval) + dt = datetime.now() + if last_dt.minute != dt.minute: + self.writeLog('tdx[{}] check point. {}, process symbols:{}'.format(i,dt,symbols)) + last_dt = dt + except Exception as ex: + self.writeError(u'tdx[{}] pool.run exception:{},{}'.format(i, str(ex), traceback.format_exc())) + + self.writeError(u'tdx[{}] {}退出'.format(i,datetime.now())) + + def processReq(self, req, i): + """ + 处理行情信息ticker请求 + :param req: + :param i: + :return: + """ + symbol = req + api = self.api_dict.get(i, None) + if api is None: + self.writeLog(u'tdx[{}] Api is None'.format(i)) + raise Exception(u'tdx[{}] Api is None'.format(i)) + + #self.writeLog(u'tdx[{}] get_instrument_quote:({},{})'.format(i,self.symbol_market_dict.get(symbol),symbol)) + rt_list = api.get_instrument_quote(self.symbol_market_dict.get(symbol),symbol) + if len(rt_list) == 0: + self.writeLog(u'tdx[{}]: rt_list为空'.format(i)) + return + #else: + # self.writeLog(u'tdx[{}]: rt_list数据:{}'.format(i, rt_list)) + if i in self.last_tick_dt: + self.last_tick_dt[i] = datetime.now() + + for d in list(rt_list): + # 忽略成交量为0的无效单合约tick数据 + if d.get('xianliang', 0) <= 0: + self.writeLog(u'忽略成交量为0的无效单合约tick数据:') + continue + + code = d.get('code',None) + if symbol != code and code is not None: + #self.writeLog(u'忽略合约{} {} 不一致的tick数据:{}'.format(symbol,d.get('code'),rt_list)) + #continue + symbol = code + + tick = VtTickData() + tick.gatewayName = self.gatewayName + + tick.symbol = self.symbol_vn_dict.get(symbol,None) + if tick.symbol is None: + self.writeLog(u'self.symbol_vn_dict 取不到映射得:{}'.format(symbol)) + return + tick.symbol = tick.symbol.upper() + tick.exchange = self.symbol_exchange_dict.get(symbol) + tick.vtSymbol = tick.symbol + + tick.preClosePrice = d.get('pre_close') + tick.highPrice = d.get('high') + tick.openPrice = d.get('open') + tick.lowPrice = d.get('low') + tick.lastPrice = d.get('price') + + tick.volume = d.get('zongliang',0) + tick.openInterest = d.get('chicang') + + tick.datetime = datetime.now() + # 修正毫秒 + last_tick = self.symbol_tick_dict.get(symbol,None) + if (last_tick is not None) and tick.datetime.replace(microsecond=0) == last_tick.datetime: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick.datetime = tick.datetime.replace(microsecond=500) + tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12] + else: + tick.datetime = tick.datetime.replace(microsecond=0) + tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12] + + tick.date = tick.datetime.strftime('%Y-%m-%d') + + # 修正时间 + if tick.datetime.hour >= 20: + if tick.datetime.isoweekday() == 5: + # 交易日是星期下周一 + tick.tradingDay = tick.datetime + timedelta(days=3) + else: + # 第二天 + tick.tradingDay = tick.datetime + timedelta(days=1) + elif tick.datetime.hour < 8 and tick.datetime.isoweekday() == 6: + # 交易日是星期一 + tick.tradingDay = tick.datetime + timedelta(days=2) + else: + tick.tradingDay = tick.datetime + tick.tradingDay = tick.tradingDay.strftime('%Y-%m-%d') + + # 指数没有涨停和跌停,就用昨日收盘价正负10% + tick.upperLimit = tick.preClosePrice * 1.1 + tick.lowerLimit = tick.preClosePrice * 0.9 + + # CTP只有一档行情 + tick.bidPrice1 = d.get('bid1') + tick.bidVolume1 = d.get('bid_vol1') + tick.askPrice1 = d.get('ask1') + tick.askVolume1 = d.get('ask_vol1') + + short_symbol = tick.vtSymbol + short_symbol = short_symbol.replace('99', '').upper() + + # 排除非交易时间得tick + if tick.exchange is EXCHANGE_CFFEX: + if tick.datetime.hour not in [9,10,11,13,14,15]: + return + if tick.datetime.hour == 9 and tick.datetime.minute < 15: + return + if tick.datetime.hour == 15 and tick.datetime.minute >= 15: + return + else: # 大商所/郑商所,上期所,上海能源 + # 排除非开盘小时 + if tick.datetime.hour in [3,4,5,6,7,8,12,15,16,17,18,19,20]: + return + # 排除早盘 10:15~10:30 + if tick.datetime.hour == 10 and 15 <= tick.datetime.minute < 30: + return + # 排除早盘 11:30~12:00 + if tick.datetime.hour == 11 and tick.datetime.minute >= 30: + return + # 排除午盘 13:00 ~13:30 + if tick.datetime.hour == 13 and tick.datetime.minute < 30: + return + # 排除凌晨2:30~3:00 + if tick.datetime.hour == 2 and tick.datetime.minute >= 30: + return + + # 排除大商所/郑商所夜盘数据 + if short_symbol in NIGHT_MARKET_DL or short_symbol in NIGHT_MARKET_ZZ: + if tick.datetime.hour == 23 and tick.datetime.minute>=30: + return + if tick.datetime.hour in [0,1,2]: + return + + # 排除上期所夜盘数据 23:00 收盘 + if short_symbol in NIGHT_MARKET_SQ3: + if tick.datetime.hour in [23,0,1,2]: + return + # 排除上期所夜盘数据 1:00 收盘 + if short_symbol in NIGHT_MARKET_SQ2: + if tick.datetime.hour in [1,2]: + return + + # 排除日盘合约在夜盘得数据 + if short_symbol in MARKET_DAY_ONLY and (tick.datetime.hour < 9 or tick.datetime.hour > 16): + #self.writeLog(u'排除日盘合约{}在夜盘得数据'.format(short_symbol)) + return + """ + self.writeLog('{},{},{},{},{},{},{},{},{},{},{},{},{},{}'.format(tick.gatewayName, tick.symbol, + tick.exchange, tick.vtSymbol, + tick.datetime, tick.tradingDay, + tick.openPrice, tick.highPrice, + tick.lowPrice, tick.preClosePrice, + tick.bidPrice1, + tick.bidVolume1, tick.askPrice1, + tick.askVolume1)) + """ + + self.symbol_tick_dict[symbol] = tick + + self.gateway.onTick(tick) + + # ---------------------------------------------------------------------- + def writeLog(self, content): + """发出日志""" + log = VtLogData() + log.gatewayName = self.gatewayName + log.logContent = content + self.gateway.onLog(log) + + def writeError(self,content): + self.gateway.writeError(content) #---------------------------------------------------------------------- def test(): @@ -1636,9 +2162,21 @@ def test(): gateway = CtpGateway(eventEngine) gateway.connect() - + + # gateway.connect() + auto_subscribe_symbols = ['M99', 'RB99', 'TA99', 'MA99', 'NI99', 'SR99'] + for symbol in auto_subscribe_symbols: + print(u'自动订阅合约:{}'.format(symbol)) + sub = VtSubscribeReq() + sub.symbol = symbol + gateway.subscribe(sub) + gateway.connect() + sys.exit(app.exec_()) - if __name__ == '__main__': - test() \ No newline at end of file + try: + test() + except Exception as ex: + print(u'异常:{},{}'.format(str(ex), traceback.format_exc())) + print('Finished') \ No newline at end of file