From c567bfe8dbc48324defb3c1e6b71a1beb694c1b7 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Wed, 1 Jan 2020 22:32:39 +0800 Subject: [PATCH] =?UTF-8?q?[=E6=96=B0=E5=8A=9F=E8=83=BD]=20tdx=20=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E6=8C=87=E6=95=B0=E5=85=A8=E8=A1=8C=E6=83=85=3D>=20ra?= =?UTF-8?q?bbit=5Fmq=20=3D>ctp=5Fgateway=E5=A4=9A=E8=B4=A6=E5=8F=B7?= =?UTF-8?q?=E6=8E=A5=E6=94=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- prod/linux/index_publisher/run.py | 101 +++ prod/linux/tick_record/run.py | 2 +- vnpy/app/index_tick_publisher/__init__.py | 15 + vnpy/app/index_tick_publisher/engine.py | 479 ++++++++++++++ vnpy/gateway/ctp/ctp_gateway.py | 766 +++++++++++++++++++++- vnpy/trader/gateway.py | 50 +- vnpy/trader/object.py | 33 +- vnpy/trader/utility.py | 16 +- 8 files changed, 1405 insertions(+), 57 deletions(-) create mode 100644 prod/linux/index_publisher/run.py create mode 100644 vnpy/app/index_tick_publisher/__init__.py create mode 100644 vnpy/app/index_tick_publisher/engine.py diff --git a/prod/linux/index_publisher/run.py b/prod/linux/index_publisher/run.py new file mode 100644 index 00000000..807fb9c6 --- /dev/null +++ b/prod/linux/index_publisher/run.py @@ -0,0 +1,101 @@ +import os +import sys +import multiprocessing +from time import sleep +from datetime import datetime, time +from logging import INFO + + +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) +sys.path.append(ROOT_PATH) +print(f'append {ROOT_PATH} into sys.path') + +from vnpy.event import EventEngine +from vnpy.trader.setting import SETTINGS +from vnpy.trader.engine import MainEngine +from vnpy.gateway.ctp import CtpGateway +from vnpy.app.index_tick_publisher import IndexTickPublisherApp +from vnpy.app.cta_strategy.base import EVENT_CTA_LOG + + +SETTINGS["log.active"] = True +SETTINGS["log.level"] = INFO +SETTINGS["log.console"] = True + + +rebbit_setting = { + "host": "192.168.1.211" +} + + +def run_child(): + """ + Running in the child process. + """ + SETTINGS["log.file"] = True + + event_engine = EventEngine() + main_engine = MainEngine(event_engine) + main_engine.add_gateway(CtpGateway) + publisher_engine = main_engine.add_app(IndexTickPublisherApp) + main_engine.write_log("主引擎创建成功") + + log_engine = main_engine.get_engine("log") + event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event) + main_engine.write_log("注册日志事件监听") + + sleep(10) + main_engine.write_log("启动连接tdx & rabbit") + publisher_engine.connect(rebbit_setting) + while True: + sleep(1) + + +def run_parent(): + """ + Running in the parent process. + """ + print("启动CTA策略守护父进程") + + # Chinese futures market trading period (day/night) + DAY_START = time(8, 45) + DAY_END = time(15, 30) + + NIGHT_START = time(20, 45) + NIGHT_END = time(2, 45) + + child_process = None + + while True: + current_time = datetime.now().time() + trading = False + + # Check whether in trading period + if ( + (current_time >= DAY_START and current_time <= DAY_END) + or (current_time >= NIGHT_START) + or (current_time <= NIGHT_END) + ): + trading = True + + # Start child process in trading period + if trading and child_process is None: + print("启动子进程") + child_process = multiprocessing.Process(target=run_child) + child_process.start() + print("子进程启动成功") + + # 非记录时间则退出子进程 + if not trading and child_process is not None: + print("关闭子进程") + child_process.terminate() + child_process.join() + child_process = None + print("子进程关闭成功") + + sleep(5) + + +if __name__ == "__main__": + run_parent() diff --git a/prod/linux/tick_record/run.py b/prod/linux/tick_record/run.py index 723407e0..48688cc6 100644 --- a/prod/linux/tick_record/run.py +++ b/prod/linux/tick_record/run.py @@ -37,7 +37,7 @@ def run_child(): event_engine = EventEngine() main_engine = MainEngine(event_engine) main_engine.add_gateway(CtpGateway) - record_engine = main_engine.add_app(TickRecorderApp) + main_engine.add_app(TickRecorderApp) main_engine.write_log("主引擎创建成功") log_engine = main_engine.get_engine("log") diff --git a/vnpy/app/index_tick_publisher/__init__.py b/vnpy/app/index_tick_publisher/__init__.py new file mode 100644 index 00000000..50221327 --- /dev/null +++ b/vnpy/app/index_tick_publisher/__init__.py @@ -0,0 +1,15 @@ +# encoding: UTF-8 + +import os +from pathlib import Path +from vnpy.trader.app import BaseApp +from .engine import IndexTickPublisher, APP_NAME + + +class IndexTickPublisherApp(BaseApp): + """""" + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = u'指数行情推送' + engine_class = IndexTickPublisher diff --git a/vnpy/app/index_tick_publisher/engine.py b/vnpy/app/index_tick_publisher/engine.py new file mode 100644 index 00000000..054acc21 --- /dev/null +++ b/vnpy/app/index_tick_publisher/engine.py @@ -0,0 +1,479 @@ +# encoding: UTF-8 + +# 通达信指数行情发布器 +# 华富资产 + +import os +import sys +import copy +import json +import traceback +from threading import Thread +from datetime import datetime, timedelta +from time import sleep +from logging import ERROR +from pytdx.exhq import TdxExHq_API + +from vnpy.event import EventEngine +from vnpy.trader.constant import Exchange +from vnpy.trader.engine import BaseEngine, MainEngine +from vnpy.trader.event import EVENT_TIMER +from vnpy.trader.object import TickData +from vnpy.trader.utility import get_trading_date +from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS +from vnpy.app.cta_strategy_pro.base import ( + NIGHT_MARKET_23, + NIGHT_MARKET_SQ2, + MARKET_DAY_ONLY) + +from vnpy.amqp.producer import publisher + +APP_NAME = 'INDEXDATAPUBLISHER' + + +class IndexTickPublisher(BaseEngine): + # 指数tick发布服务 + # 通过通达信接口,获取指数行情tick,发布至rabbitMQ + + # ---------------------------------------------------------------------- + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super(IndexTickPublisher, self).__init__( + main_engine, event_engine, APP_NAME) + + self.main_engine = main_engine + self.event_engine = event_engine + self.create_logger(logger_name=APP_NAME) + + self.last_minute = None + + self.registerEvent() + + self.req_interval = 0.5 # 操作请求间隔500毫秒 + self.req_id = 0 # 操作请求编号 + self.connection_status = False # 连接状态 + + self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 + self.symbol_market_dict = {} # tdx合约与tdx市场的字典 + self.symbol_tick_dict = {} # tdx合约与最后一个Tick得字典 + + # self.queue = Queue() # 请求队列 + self.pool = None # 线程池 + self.req_thread = None # 定时器线程 + + self.ip_list = TDX_FUTURE_HOSTS + + # tdx api + self.fail_ip_dict = {} # 失效得API 的连接服务器配置: IP_port: 分钟倒数 + self.best_ip = None + self.best_port = None + self.best_name = None + self.api = None # API 的连接会话对象 + self.last_tick_dt = None # 记录该会话对象的最后一个tick时间 + + self.last_sort_speed_dt = None + self.instrument_count = 50000 + + self.has_qry_instrument = False + + # vt_setting.json内rabbitmq配置项 + self.conf = {} + self.pub = None + + def write_error(self, content: str): + self.write_log(msg=content, level=ERROR) + + def create_publisher(self, conf): + """创建rabbitmq 消息发布器""" + if self.pub: + return + try: + # 消息发布 + self.pub = publisher(host=conf.get('host', 'localhost'), + port=conf.get('port', 5672), + user=conf.get('user', 'admin'), + password=conf.get('password', 'admin'), + channel_number=conf.get('channel_number', 1), + queue_name=conf.get('queue_name', ''), + routing_key=conf.get('routing_key', 'default'), + exchange=conf.get('exchange', 'x_fanout_idx_tick')) + except Exception as ex: + self.write_log(u'创建tick发布器异常:{}'.format(str(ex))) + + # ---------------------------------------------------------------------- + def registerEvent(self): + """注册事件监听""" + self.event_engine.register(EVENT_TIMER, self.process_timer_event) + + def process_timer_event(self, event): + """定时执行""" + dt = datetime.now() + + if dt.minute == self.last_minute: + return + + # 更新失效IP地址得counter + for k in list(self.fail_ip_dict.keys()): + c = self.fail_ip_dict.get(k, 0) + if c <= 0: + self.fail_ip_dict.pop(k, None) + else: + c -= 1 + self.fail_ip_dict.update({k: c}) + + self.checkStatus() + + # ---------------------------------------------------------------------- + def ping(self, ip, port=7709): + """ + ping行情服务器 + :param ip: + :param port: + :param type_: + :return: + """ + apix = TdxExHq_API() + __time1 = datetime.now() + try: + with apix.connect(ip, port): + if apix.get_instrument_count() > 10000: + _timestamp = (datetime.now() - __time1).total_seconds() * 1000 + self.write_log('服务器{}:{},耗时:{}ms'.format(ip, port, _timestamp)) + return _timestamp + else: + self.write_log(u'该服务器IP {}无响应.'.format(ip)) + return timedelta(seconds=10).total_seconds() * 1000 + except Exception as ex: + self.write_error(u'tdx ping服务器{},异常的响应{}'.format(ip,str(ex))) + return timedelta(seconds=10).total_seconds() * 1000 + + def sort_ip_speed(self): + """ + 对所有服务器进行速度排序 + :return: + """ + speed_result = [] + for x in self.ip_list: + speed = self.ping(x['ip'], x['port']) + x.update({'speed': speed}) + speed_result.append(copy.copy(x)) + + # 更新服务器,按照速度排序 + self.ip_list = sorted(speed_result, key=lambda s: s['speed']) + self.write_log(u'服务器访问速度排序:{}'.format(self.ip_list)) + + # ---------------------------------------------------------------------- + def select_best_ip(self): + """ + 选择行情服务器 + :return: IP地址, 端口, 服务器名称 + """ + self.write_log(u'选择通达信行情服务器') + if self.last_sort_speed_dt is None or (datetime.now() - self.last_sort_speed_dt).total_seconds() > 60: + self.sort_ip_speed() + self.last_sort_speed_dt = datetime.now() + + valid_ip_list = [x for x in self.ip_list if x.get('speed', 10000) < 10000] + + if len(valid_ip_list) == 0: + self.write_error(u'未能找到合适速度得行情服务器') + return None, None, None + + for server in valid_ip_list: + ip = server.get('ip') + port = server.get('port') + name = server.get('name', '{}:{}'.format(ip, port)) + if '{}:{}'.format(ip, port) in self.fail_ip_dict: + self.write_log(u'{}:{}属于上次异常IP地址,忽略'.format(ip, port)) + continue + return ip, port, name + + return None, None, None + + def connect(self, rabbit_config: dict): + """ + 连接通达讯行情服务器 + :param n: + :return: + """ + if self.connection_status: + if self.api is not None or getattr(self.api, "client", None) is not None: + self.write_log(u'当前已经连接,不需要重新连接') + return + + self.write_log(u'开始通达信行情服务器') + + try: + self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True) + # 选取最佳服务器 + self.best_ip, self.best_port, self.best_name = self.select_best_ip() + + if self.best_ip is None or self.best_port is None: + self.write_error(u'未能选择到服务器') + + self.write_log(u'api 选择 {}: {}:{}'.format(self.best_name, self.best_ip, self.best_port)) + self.api.connect(self.best_ip, self.best_port) + # 尝试获取市场合约统计 + c = self.api.get_instrument_count() + if c is None or c < 10: + err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip, self.best_port) + self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10}) + self.write_error(err_msg) + else: + self.write_log(u'创建tdx连接') + self.last_tick_dt = datetime.now() + self.connection_status = True + self.instrument_count = c + + except Exception as ex: + self.write_error(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc())) + self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10}) + return + + # 更新 symbol_exchange_dict , symbol_market_dict + self.write_log(u'查询合约') + self.qryInstrument() + + self.conf.update(rabbit_config) + self.create_publisher(self.conf) + + self.req_thread = Thread(target=self.run) + self.req_thread.start() + + def reconnect(self): + """ + 重连 + + :return: + """ + try: + self.best_ip, self.best_port, self.best_name = self.select_best_ip() + self.api = TdxExHq_API(heartbeat=True, auto_retry=True) + self.api.connect(self.best_ip, self.best_port) + # 尝试获取市场合约统计 + c = self.api.get_instrument_count() + if c is None or c < 10: + err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip, self.best_port) + self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10}) + self.write_error(err_msg) + else: + self.write_log(u'重新创建tdx连接') + sleep(1) + except Exception as ex: + self.write_error(u'重新连接服务器异常:{},{}'.format(str(ex), traceback.format_exc())) + self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10}) + return + + def close(self): + """退出API""" + self.write_log(u'退出tdx API') + self.connection_status = False + + if self.req_thread is not None: + self.write_log(u'退出请求线程') + self.req_thread.join() + + if self.pub: + self.write_log(u'退出rabbitMQ 发布器') + self.pub.exit() + + def checkStatus(self): + # self.write_log(u'检查tdx接口状态') + + # 若还没有启动连接,就启动连接 + over_time = self.last_tick_dt is None or (datetime.now() - self.last_tick_dt).total_seconds() > 60 + if not self.connection_status or self.api is None or over_time: + self.write_log(u'tdx还没有启动连接,就启动连接') + self.close() + self.api = None + self.reconnect() + + # self.write_log(u'tdx接口状态正常') + + def qryInstrument(self): + """ + 查询/更新合约信息 + :return: + """ + if not self.connection_status: + self.write_error(u'tdx连接状态为断开,不能查询和更新合约信息') + return + + if self.has_qry_instrument: + self.write_error(u'已经查询过一次合约信息,不再查询') + return + + # 取得所有的合约信息 + num = self.api.get_instrument_count() + if not isinstance(num, int): + return + + all_contacts = sum( + [self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], []) + # [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}] + + # 对所有合约处理,更新字典 指数合约-tdx市场,指数合约-交易所 + for tdx_contract in all_contacts: + tdx_symbol = tdx_contract.get('code', None) + if tdx_symbol is None or tdx_symbol[-2:] not in ['L9']: + continue + tdx_market_id = tdx_contract.get('market') + self.symbol_market_dict[tdx_symbol] = tdx_market_id + if tdx_market_id == 47: # 中金所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.CFFEX + elif tdx_market_id == 28: # 郑商所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.CZCE + elif tdx_market_id == 29: # 大商所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.DCE + elif tdx_market_id == 30: # 上期所+能源 + self.symbol_exchange_dict[tdx_symbol] = Exchange.SHFE + elif tdx_market_id == 60: # 主力合约 + self.write_log(u'主力合约:{}'.format(tdx_contract)) + self.has_qry_instrument = True + + def run(self): + # 版本3 :直接查询板块 + try: + last_dt = datetime.now() + self.write_log(u'开始运行tdx,{}'.format(last_dt)) + while self.connection_status: + try: + self.process_index_req() + except BrokenPipeError as bex: + self.write_error(u'BrokenPipeError{},重试重连tdx[{}]'.format(str(bex), 0)) + self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10}) + self.reconnect() + sleep(5) + break + except Exception as ex: + self.write_error(u'tdx exception:{},{}'.format(str(ex), traceback.format_exc())) + self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10}) + self.reconnect() + + sleep(self.req_interval) + dt = datetime.now() + if last_dt.minute != dt.minute: + self.write_log('tdxcheck point. {},last_tick_dt:{}'.format(dt, self.last_tick_dt)) + last_dt = dt + except Exception as ex: + self.write_error(u'tdx pool.run exception:{},{}'.format(str(ex), traceback.format_exc())) + + self.write_error(u'tdx 线程 {}退出'.format(datetime.now())) + + def process_index_req(self): + """处理板块获取指数行情tick""" + + # 获取通达信指数板块所有行情 + rt_list = self.api.get_instrument_quote_list(42, 3, 0, 100) + + if rt_list is None or len(rt_list) == 0: + self.write_log(u'tdx:get_instrument_quote_list() rt_list为空') + return + + # 记录该接口的行情最后更新时间 + self.last_tick_dt = datetime.now() + + for d in list(rt_list): + tdx_symbol = d.get('code', None) + if tdx_symbol.endswith('L9'): + vn_symbol = tdx_symbol.replace('L9', '99').upper() + else: + vn_symbol = tdx_symbol.upper() + + tick_datetime = datetime.now() + # 修正毫秒 + last_tick = self.symbol_tick_dict.get(vn_symbol, None) + if (last_tick is not None) and tick_datetime.replace(microsecond=0) == last_tick.datetime: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick_datetime = tick_datetime.replace(microsecond=500) + else: + tick_datetime = tick_datetime.replace(microsecond=0) + + tick = TickData( + gateway_name='tdx', + symbol=vn_symbol, + datetime=tick_datetime, + exchange=self.symbol_exchange_dict.get(tdx_symbol, Exchange.LOCAL) + ) + + tick.pre_close = float(d.get('ZuoJie', 0.0)) + tick.high_price = float(d.get('ZuiGao', 0.0)) + tick.open_price = float(d.get('JinKai', 0.0)) + tick.low_price = float(d.get('ZuiDi', 0.0)) + tick.last_price = float(d.get('MaiChu', 0.0)) + + tick.volume = int(d.get('XianLiang', 0)) + tick.open_interest = d.get('ChiCangLiang') + + tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12] + tick.date = tick.datetime.strftime('%Y-%m-%d') + + tick.trading_day = get_trading_date(tick_datetime) + + # 指数没有涨停和跌停,就用昨日收盘价正负10% + tick.limit_up = tick.pre_close * 1.1 + tick.limit_down = tick.pre_close * 0.9 + + # CTP只有一档行情 + tick.bid_price_1 = float(d.get('MaiRuJia', 0.0)) + tick.bid_volume_1 = int(d.get('MaiRuLiang', 0)) + tick.ask_price_1 = float(d.get('MaiChuJia', 0.0)) + tick.ask_volume_1 = int(d.get('MaiChuLiang', 0)) + + underlying_symbol = vn_symbol.replace('99', '').upper() + + # 排除非交易时间得tick + if tick.exchange is Exchange.CFFEX: + if tick.datetime.hour not in [9, 10, 11, 13, 14, 15]: + continue + if tick.datetime.hour == 9 and tick.datetime.minute < 15: + continue + # 排除早盘 11:30~12:00 + if tick.datetime.hour == 11 and tick.datetime.minute >= 30: + continue + if tick.datetime.hour == 15 and tick.datetime.minute >= 15 and underlying_symbol in ['T', 'TF', 'TS']: + continue + if tick.datetime.hour == 15 and underlying_symbol in ['IH', 'IF', 'IC']: + continue + + else: # 大商所/郑商所,上期所,上海能源 + # 排除非开盘小时 + if tick.datetime.hour in [3, 4, 5, 6, 7, 8, 12, 15, 16, 17, 18, 19, 20]: + continue + # 排除早盘 10:15~10:30 + if tick.datetime.hour == 10 and 15 <= tick.datetime.minute < 30: + continue + # 排除早盘 11:30~12:00 + if tick.datetime.hour == 11 and tick.datetime.minute >= 30: + continue + # 排除午盘 13:00 ~13:30 + if tick.datetime.hour == 13 and tick.datetime.minute < 30: + continue + # 排除凌晨2:30~3:00 + if tick.datetime.hour == 2 and tick.datetime.minute >= 30: + continue + + # 排除大商所/郑商所/上期所夜盘数据上期所夜盘数据 23:00 收盘 + if underlying_symbol in NIGHT_MARKET_23: + if tick.datetime.hour in [23, 0, 1, 2]: + continue + + # 排除上期所夜盘数据 1:00 收盘 + if underlying_symbol in NIGHT_MARKET_SQ2: + if tick.datetime.hour in [1, 2]: + continue + + # 排除日盘合约在夜盘得数据 + if underlying_symbol in MARKET_DAY_ONLY and (tick.datetime.hour < 9 or tick.datetime.hour > 16): + # self.write_log(u'排除日盘合约{}在夜盘得数据'.format(short_symbol)) + continue + + self.symbol_tick_dict[tick.symbol] = tick + + if self.pub: + d = copy.copy(tick.__dict__) + if isinstance(tick.datetime, datetime): + d.update({'datetime': tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')}) + d.update({'exchange': tick.exchange.value()}) + d = json.dumps(d) + self.pub.pub(d) diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 50ae6c08..79545b33 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -1,7 +1,9 @@ """ """ - -from datetime import datetime +import traceback +import json +from datetime import datetime, timedelta +from copy import copy from vnpy.api.ctp import ( MdApi, @@ -60,10 +62,26 @@ from vnpy.trader.object import ( ) from vnpy.trader.utility import ( get_folder_path, - get_trading_date + get_trading_date, + get_underlying_symbol, + round_to ) from vnpy.trader.event import EVENT_TIMER +# 增加通达信指数接口行情 +from time import sleep +from threading import Thread +from pytdx.exhq import TdxExHq_API +from vnpy.amqp.consumer import subscriber +from vnpy.data.tdx.tdx_common import ( + TDX_FUTURE_HOSTS, + get_future_contracts, + get_cache_json, + save_cache_json, + TDX_FUTURE_CONFIG) +from vnpy.app.cta_strategy_pro.base import ( + MARKET_DAY_ONLY, NIGHT_MARKET_23, NIGHT_MARKET_SQ2 +) STATUS_CTP2VT = { THOST_FTDC_OAS_Submitted: Status.SUBMITTING, @@ -116,7 +134,6 @@ OPTIONTYPE_CTP2VT = { THOST_FTDC_CP_PutOptions: OptionType.PUT } - symbol_exchange_map = {} symbol_name_map = {} symbol_size_map = {} @@ -137,7 +154,13 @@ class CtpGateway(BaseGateway): "授权编码": "", "产品信息": "" } - + # 注 + # 如果采用rabbit_mq拓展tdx指数行情,default_setting中,需要增加: + # "rabbit": + # { + # "host": "192.168.1.211", + # "exchange": "x_fanout_idx_tick" + # } exchanges = list(EXCHANGE_CTP2VT.values()) def __init__(self, event_engine): @@ -146,6 +169,13 @@ class CtpGateway(BaseGateway): self.td_api = CtpTdApi(self) self.md_api = CtpMdApi(self) + self.tdx_api = None + self.rabbit_api = None + + self.combiner_conf_dict = {} # 保存合成器配置 + # 自定义价差/加比的tick合成器 + self.combiners = {} + self.tick_combiner_map = {} def connect(self, setting: dict): """""" @@ -157,27 +187,42 @@ class CtpGateway(BaseGateway): appid = setting["产品名称"] auth_code = setting["授权编码"] product_info = setting["产品信息"] - + rabbit_dict = setting.get('rabbit', None) if ( - (not td_address.startswith("tcp://")) - and (not td_address.startswith("ssl://")) + (not td_address.startswith("tcp://")) + and (not td_address.startswith("ssl://")) ): td_address = "tcp://" + td_address if ( - (not md_address.startswith("tcp://")) - and (not md_address.startswith("ssl://")) + (not md_address.startswith("tcp://")) + and (not md_address.startswith("ssl://")) ): md_address = "tcp://" + md_address self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid, product_info) self.md_api.connect(md_address, userid, password, brokerid) + if rabbit_dict: + self.rabbit_api = SubMdApi(gateway=self) + self.rabbit_api.connect(rabbit_dict) + else: + self.tdx_api = TdxMdApi(gateway=self) + self.tdx_api.connect() + self.init_query() def subscribe(self, req: SubscribeRequest): """""" - self.md_api.subscribe(req) + # 指数合约,从tdx行情订阅 + if req.symbol[-2:] in ['99']: + req.symbol = req.symbol.upper() + if self.tdx_api: + self.tdx_api.subscribe(req) + elif self.rabbit_api: + self.rabbit_api.subscribe(req) + else: + self.md_api.subscribe(req) def send_order(self, req: OrderRequest): """""" @@ -200,13 +245,6 @@ class CtpGateway(BaseGateway): self.td_api.close() self.md_api.close() - def write_error(self, msg: str, error: dict): - """""" - error_id = error["ErrorID"] - error_msg = error["ErrorMsg"] - msg = f"{msg},代码:{error_id},信息:{error_msg}" - self.write_log(msg) - def process_timer_event(self, event): """""" self.count += 1 @@ -224,6 +262,14 @@ class CtpGateway(BaseGateway): self.query_functions = [self.query_account, self.query_position] self.event_engine.register(EVENT_TIMER, self.process_timer_event) + def on_custom_tick(self, tick): + """推送自定义合约行情""" + # 自定义合约行情 + + for combiner in self.tick_combiner_map.get(tick.symbol, []): + tick = copy(tick) + combiner.on_tick(tick) + class CtpMdApi(MdApi): """""" @@ -294,7 +340,7 @@ class CtpMdApi(MdApi): if not exchange: return - timestamp = f"{data['ActionDay']} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}" + timestamp = f"{data['ActionDay']} {data['UpdateTime']}.{int(data['UpdateMillisec'] / 100)}" dt = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f") # 不处理开盘前的tick数据 @@ -690,14 +736,14 @@ class CtpTdApi(TdApi): self.gateway.on_trade(trade) def connect( - self, - address: str, - userid: str, - password: str, - brokerid: int, - auth_code: str, - appid: str, - product_info + self, + address: str, + userid: str, + password: str, + brokerid: int, + auth_code: str, + appid: str, + product_info ): """ Start connection to server. @@ -855,3 +901,669 @@ class CtpTdApi(TdApi): """""" if self.connect_status: self.exit() + + +class TdxMdApi(): + """ + 通达信数据行情API实现 + 订阅的指数行情,更新合约的数据 + + """ + + def __init__(self, gateway): + self.gateway = gateway # gateway对象 + self.gateway_name = gateway.gateway_name # gateway对象名称 + + self.req_interval = 0.5 # 操作请求间隔500毫秒 + self.req_id = 0 # 操作请求编号 + self.connection_status = False # 连接状态 + + self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 + self.symbol_market_dict = {} # tdx合约与tdx市场的字典 + self.symbol_vn_dict = {} # tdx合约与vtSymbol的对应 + self.symbol_tick_dict = {} # tdx合约与最后一个Tick得字典 + + # tdx 期货配置本地缓存 + self.future_contracts = get_future_contracts() + + self.registered_symbol_set = set() + + self.thread = None # 查询线程 + + self.ip_list = TDX_FUTURE_HOSTS + + # 调出 + self.best_ip = {} # 最佳IP地址和端口 + self.api = None # API 的连接会话对象 + self.last_tick_dt = datetime.now() # 记录该会话对象的最后一个tick时间 + + self.instrument_count = 50000 + + self.has_qry_instrument = False + + # ---------------------------------------------------------------------- + def ping(self, ip, port=7709): + """ + ping行情服务器 + :param ip: + :param port: + :param type_: + :return: + """ + apix = TdxExHq_API() + __time1 = datetime.now() + try: + with apix.connect(ip, port): + if apix.get_instrument_count() > 10000: + _timestamp = (datetime.now() - __time1).total_seconds() * 1000 + self.gateway.write_log('服务器{}:{},耗时:{}ms'.format(ip, port, _timestamp)) + return _timestamp + else: + self.gateway.write_log(u'该服务器IP {}无响应.'.format(ip)) + return timedelta(seconds=10).total_seconds() * 1000 + except Exception as ex: + self.gateway.write_log(u'tdx ping服务器{},异常的响应{}'.format(ip, str(ex))) + return timedelta(seconds=10).total_seconds() * 1000 + + def sort_ip_speed(self): + """ + 对所有服务器进行速度排序 + :return: + """ + + speed_result = [] + for x in self.ip_list: + speed = self.ping(x['ip'], x['port']) + x.update({'speed': speed}) + speed_result.append(copy(x)) + + # 更新服务器,按照速度排序 + speed_result = sorted(speed_result, key=lambda s: s['speed']) + self.gateway.write_log(u'服务器访问速度排序:{}'.format(speed_result)) + return speed_result + + # ---------------------------------------------------------------------- + def select_best_ip(self, exclude_ip: str = None): + """ + 选择行情服务器 + :param: exclude_ip, 排除的ip地址 + :return: + """ + self.gateway.write_log(u'选择通达信行情服务器') + + ip_list = self.sort_ip_speed() + + valid_ip_list = [x for x in ip_list if x.get('speed', 10000) < 10000 and x.get('ip') != exclude_ip] + + if len(valid_ip_list) == 0: + self.gateway.write_error(u'未能找到合适速度得行情服务器') + return None + best_future_ip = valid_ip_list[0] + save_cache_json(best_future_ip, TDX_FUTURE_CONFIG) + return best_future_ip + + def connect(self, is_reconnect=False): + """ + 连接通达讯行情服务器 + :param is_reconnect:是否重连 + :return: + """ + # 创建api连接对象实例 + try: + if self.api is None or not self.connection_status: + self.gateway.write_log(u'开始连接通达信行情服务器') + self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True) + + # 选取最佳服务器 + if is_reconnect or len(self.best_ip) == 0: + self.best_ip = get_cache_json(TDX_FUTURE_CONFIG) + + if len(self.best_ip) == 0: + self.best_ip = self.select_best_ip() + + self.api.connect(self.best_ip['ip'], self.best_ip['port']) + # 尝试获取市场合约统计 + c = self.api.get_instrument_count() + if c < 10: + err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'], self.best_ip['port']) + self.gateway.write_error(err_msg) + else: + self.gateway.write_log(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port'])) + self.connection_status = True + + self.thread = Thread(target=self.run) + self.thread.start() + + except Exception as ex: + self.gateway.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc())) + return + + def close(self): + """退出API""" + self.gateway.write_log(u'退出tdx API') + self.connection_status = False + + if self.thread: + self.thread.join() + + # ---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅合约""" + # 这里的设计是,如果尚未登录就调用了订阅方法 + # 则先保存订阅请求,登录完成后会自动订阅 + vn_symbol = str(subscribeReq.symbol) + vn_symbol = vn_symbol.upper() + self.gateway.write_log(u'通达信行情订阅 {}'.format(str(vn_symbol))) + + if vn_symbol[-2:] != '99': + self.gateway.write_log(u'{}不是指数合约,不能订阅'.format(vn_symbol)) + return + + tdx_symbol = vn_symbol[0:-2] + 'L9' + tdx_symbol = tdx_symbol.upper() + self.gateway.write_log(u'{}=>{}'.format(vn_symbol, tdx_symbol)) + self.symbol_vn_dict[tdx_symbol] = vn_symbol + + if tdx_symbol not in self.registered_symbol_set: + self.registered_symbol_set.add(tdx_symbol) + + self.check_status() + + def check_status(self): + # self.write_log(u'检查tdx接口状态') + if len(self.registered_symbol_set) == 0: + return + + # 若还没有启动连接,就启动连接 + over_time = (datetime.now() - self.last_tick_dt).total_seconds() > 60 + if not self.connection_status or self.api is None or over_time: + self.gateway.write_log(u'tdx还没有启动连接,就启动连接') + self.close() + self.thread = None + self.connect(is_reconnect=True) + + def qry_instrument(self): + """ + 查询/更新合约信息 + :return: + """ + if not self.connection_status: + self.gateway.write_error(u'tdx连接状态为断开,不能查询和更新合约信息') + return + + if self.has_qry_instrument: + self.gateway.write_error(u'已经查询过一次合约信息,不再查询') + return + + # 取得所有的合约信息 + num = self.api.get_instrument_count() + if not isinstance(num, int): + return + + all_contacts = sum( + [self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], []) + # [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}] + + # 对所有合约处理,更新字典 指数合约-tdx市场,指数合约-交易所 + for tdx_contract in all_contacts: + tdx_symbol = tdx_contract.get('code', None) + if tdx_symbol is None or tdx_symbol[-2:] not in ['L9']: + continue + tdx_market_id = tdx_contract.get('market') + self.symbol_market_dict[tdx_symbol] = tdx_market_id + if tdx_market_id == 47: # 中金所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.CFFEX + elif tdx_market_id == 28: # 郑商所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.CZCE + elif tdx_market_id == 29: # 大商所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.DCE + elif tdx_market_id == 30: # 上期所+能源 + self.symbol_exchange_dict[tdx_symbol] = Exchange.SHFE + elif tdx_market_id == 60: # 主力合约 + self.gateway.write_log(u'主力合约:{}'.format(tdx_contract)) + self.has_qry_instrument = True + + def run(self): + # 直接查询板块 + try: + last_dt = datetime.now() + self.gateway.write_log(u'开始运行tdx查询指数行情线程,{}'.format(last_dt)) + while self.connection_status: + if len(self.registered_symbol_set) > 0: + try: + self.process_index_req() + except BrokenPipeError as bex: + self.gateway.write_error(u'BrokenPipeError{},重试重连tdx[{}]'.format(str(bex), 0)) + self.connect(is_reconnect=True) + sleep(5) + break + except Exception as ex: + self.gateway.write_error(u'tdx exception:{},{}'.format(str(ex), traceback.format_exc())) + self.gateway.write_error(u'重试重连tdx') + self.connect(is_reconnect=True) + + sleep(self.req_interval) + dt = datetime.now() + if last_dt.minute != dt.minute: + self.gateway.write_log( + 'tdx check point. {}, process symbols:{}'.format(dt, self.registered_symbol_set)) + last_dt = dt + except Exception as ex: + self.gateway.write_error(u'tdx thead.run exception:{},{}'.format(str(ex), traceback.format_exc())) + + self.gateway.write_error(u'tdx查询线程 {}退出'.format(datetime.now())) + + def process_index_req(self): + """处理板块获取指数行情tick""" + + # 获取通达信指数板块所有行情 + rt_list = self.api.get_instrument_quote_list(42, 3, 0, 100) + + if rt_list is None or len(rt_list) == 0: + self.gateway.write_log(u'tdx: rt_list为空') + return + + # 记录该接口的行情最后更新时间 + self.last_tick_dt = datetime.now() + + for d in list(rt_list): + tdx_symbol = d.get('code', None) + if tdx_symbol not in self.registered_symbol_set and tdx_symbol is not None: + continue + # tdx_symbol => vn_symbol + vn_symbol = self.symbol_vn_dict.get(tdx_symbol, None) + if vn_symbol is None: + self.gateway.write_error(u'self.symbol_vn_dict 取不到映射得:{}'.format(tdx_symbol)) + continue + # vn_symbol => exchange + exchange = self.symbol_exchange_dict.get(tdx_symbol, None) + underlying_symbol = get_underlying_symbol(vn_symbol) + + if exchange is None: + symbol_info = self.future_contracts.get(underlying_symbol, None) + if not symbol_info: + continue + exchange_value = symbol_info.get('exchange', None) + exchange = Exchange(exchange_value) + if exchange is None: + continue + self.symbol_exchange_dict.update({tdx_symbol: exchange}) + + tick_datetime = datetime.now() + # 修正毫秒 + last_tick = self.symbol_tick_dict.get(vn_symbol, None) + if (last_tick is not None) and tick_datetime.replace(microsecond=0) == last_tick.datetime: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick_datetime = tick_datetime.replace(microsecond=500) + else: + tick_datetime = tick_datetime.replace(microsecond=0) + + tick = TickData(gateway_name=self.gateway_name, + symbol=vn_symbol, + exchange=exchange, + datetime=tick_datetime) + + tick.pre_close = float(d.get('ZuoJie', 0.0)) + tick.high_price = float(d.get('ZuiGao', 0.0)) + tick.open_price = float(d.get('JinKai', 0.0)) + tick.low_price = float(d.get('ZuiDi', 0.0)) + tick.last_price = float(d.get('MaiChu', 0.0)) + tick.volume = int(d.get('XianLiang', 0)) + tick.open_interest = d.get('ChiCangLiang') + + tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12] + tick.date = tick.datetime.strftime('%Y-%m-%d') + + tick.trading_day = get_trading_date(tick.datetime) + + # 指数没有涨停和跌停,就用昨日收盘价正负10% + tick.limit_up = tick.pre_close * 1.1 + tick.limit_down = tick.pre_close * 0.9 + + # CTP只有一档行情 + tick.bid_price_1 = float(d.get('MaiRuJia', 0.0)) + tick.bid_volume_1 = int(d.get('MaiRuLiang', 0)) + tick.ask_price_1 = float(d.get('MaiChuJia', 0.0)) + tick.ask_volume_1 = int(d.get('MaiChuLiang', 0)) + + # 排除非交易时间得tick + if tick.exchange is Exchange.CFFEX: + if tick.datetime.hour not in [9, 10, 11, 13, 14, 15]: + continue + if tick.datetime.hour == 9 and tick.datetime.minute < 15: + continue + # 排除早盘 11:30~12:00 + if tick.datetime.hour == 11 and tick.datetime.minute >= 30: + continue + if tick.datetime.hour == 15 and tick.datetime.minute >= 15 and underlying_symbol in ['T', 'TF', 'TS']: + continue + if tick.datetime.hour == 15 and underlying_symbol in ['IH', 'IF', 'IC']: + continue + else: # 大商所/郑商所,上期所,上海能源 + # 排除非开盘小时 + if tick.datetime.hour in [3, 4, 5, 6, 7, 8, 12, 15, 16, 17, 18, 19, 20]: + continue + # 排除早盘 10:15~10:30 + if tick.datetime.hour == 10 and 15 <= tick.datetime.minute < 30: + continue + # 排除早盘 11:30~12:00 + if tick.datetime.hour == 11 and tick.datetime.minute >= 30: + continue + # 排除午盘 13:00 ~13:30 + if tick.datetime.hour == 13 and tick.datetime.minute < 30: + continue + # 排除凌晨2:30~3:00 + if tick.datetime.hour == 2 and tick.datetime.minute >= 30: + continue + + # 排除大商所/郑商所夜盘数据上期所夜盘数据 23:00 收盘 + if underlying_symbol in NIGHT_MARKET_23: + if tick.datetime.hour in [23, 0, 1, 2]: + continue + # 排除上期所夜盘数据 1:00 收盘 + if underlying_symbol in NIGHT_MARKET_SQ2: + if tick.datetime.hour in [1, 2]: + continue + + # 排除日盘合约在夜盘得数据 + if underlying_symbol in MARKET_DAY_ONLY and (tick.datetime.hour < 9 or tick.datetime.hour > 16): + # self.write_log(u'排除日盘合约{}在夜盘得数据'.format(short_symbol)) + continue + + # self.gateway.write_log(f'{tick.__dict__}') + self.symbol_tick_dict[tick.symbol] = tick + + self.gateway.on_tick(tick) + self.gateway.on_custom_tick(tick) + +class SubMdApi(): + """ + RabbitMQ Subscriber 数据行情接收API + """ + + def __init__(self, gateway): + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.symbol_tick_dict = {} # 合约与最后一个Tick得字典 + self.registed_symbol_set = set() # 订阅的合约记录集 + + self.sub = None + self.setting = {} + self.connect_status = False + self.thread = None + + def connect(self, setting={}): + """连接""" + self.setting = setting + try: + self.sub = subscriber( + host=self.setting.get('host', 'localhost'), + port=self.setting.get('port', 5672), + user=self.setting.get('user', 'admin'), + password=self.setting.get('password', 'admin'), + exchange=self.setting.get('exchange', 'x_fanout_idx_tick')) + + self.sub.set_callback(self.on_message) + self.thread = Thread(target=self.sub.start) + self.thread.start() + self.connect_status = True + + except Exception as ex: + self.gateway.write_error(u'连接RabbitMQ {} 异常:{}'.format(self.setting, str(ex))) + self.gateway.write_error(traceback.format_exc()) + self.connect_status = False + + def on_message(self, chan, method_frame, _header_frame, body, userdata=None): + # print(" [x] %r" % body) + try: + str_tick = body.decode('utf-8') + d = json.loads(str_tick) + symbol = d.pop('symbol', None) + str_datetime = d.pop('datetime', None) + if symbol not in self.registed_symbol_set or str_datetime is None: + return + if '.' in str_datetime: + dt = datetime.strptime(str_datetime, '%Y-%m-%d %H:%M:%S.%f') + else: + dt = datetime.strptime(str_datetime, '%Y-%m-%d %H:%M:%S') + + d.pop('rawData', None) + tick = TickData(gateway_name=self.gateway_name, + exchange=Exchange(d.get('exchange')), + symbol=d.get('symbol'), + datetime=dt) + d.pop('exchange', None) + d.pop('symbol', None) + d.pop() + tick.__dict__.update(d) + + self.symbol_tick_dict[symbol] = tick + self.gateway.on_tick(tick) + self.gateway.on_custom_tick(tick) + + except Exception as ex: + self.gateway.write_error(u'RabbitMQ on_message 异常:{}'.format(str(ex))) + self.gateway.write_error(traceback.format_exc()) + + def close(self): + """退出API""" + self.gateway.write_log(u'退出rabbit行情订阅API') + self.connection_status = False + + try: + if self.sub: + self.gateway.write_log(u'关闭订阅器') + self.sub.close() + + if self.thread is not None: + self.gateway.write_log(u'关闭订阅器接收线程') + self.thread.join() + except Exception as ex: + self.gateway.write_error(u'退出rabbitMQ行情api异常') + + # ---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅合约""" + # 这里的设计是,如果尚未登录就调用了订阅方法 + # 则先保存订阅请求,登录完成后会自动订阅 + vn_symbol = str(subscribeReq.symbol) + vn_symbol = vn_symbol.upper() + + if vn_symbol not in self.registed_symbol_set: + self.registed_symbol_set.add(vn_symbol) + self.gateway.write_log(u'RabbitMQ行情订阅 {}'.format(str(vn_symbol))) + + +class TickCombiner(object): + """ + Tick合成类 + """ + + def __init__(self, gateway, setting): + self.gateway = gateway + self.gateway_name = self.gateway.gateway_name + self.gateway.write_log(u'创建tick合成类:{}'.format(setting)) + + self.symbol = setting.get('symbol', None) + self.leg1_symbol = setting.get('leg1_symbol', None) + self.leg2_symbol = setting.get('leg2_symbol', None) + self.leg1_ratio = setting.get('leg1_ratio', 1) # 腿1的数量配比 + self.leg2_ratio = setting.get('leg2_ratio', 1) # 腿2的数量配比 + self.price_tick = setting.get('price_tick', 1) # 合成价差加比后的最小跳动 + # 价差 + self.is_spread = setting.get('is_spread', False) + # 价比 + self.is_ratio = setting.get('is_ratio', False) + + self.last_leg1_tick = None + self.last_leg2_tick = None + + # 价差日内最高/最低价 + self.spread_high = None + self.spread_low = None + + # 价比日内最高/最低价 + self.ratio_high = None + self.ratio_low = None + + # 当前交易日 + self.trading_day = None + + if self.is_ratio and self.is_spread: + self.gateway.write_error(u'{}参数有误,不能同时做价差/加比.setting:{}'.format(self.symbol, setting)) + return + + self.gateway.write_log(u'初始化{}合成器成功'.format(self.symbol)) + if self.is_spread: + self.gateway.write_log( + u'leg1:{} * {} - leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, + self.leg2_ratio)) + if self.is_ratio: + self.gateway.write_log( + u'leg1:{} * {} / leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, + self.leg2_ratio)) + + def on_tick(self, tick): + """OnTick处理""" + combinable = False + + if tick.symbol == self.leg1_symbol: + # leg1合约 + self.last_leg1_tick = tick + if self.last_leg2_tick is not None: + if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace( + microsecond=0): + combinable = True + + elif tick.symbol == self.leg2_symbol: + # leg2合约 + self.last_leg2_tick = tick + if self.last_leg1_tick is not None: + if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace( + microsecond=0): + combinable = True + + # 不能合并 + if not combinable: + return + + if not self.is_ratio and not self.is_spread: + return + + # 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick + if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.upperLimit) \ + and self.last_leg1_tick.askVolume1 == 0: + self.gateway.write_log( + u'leg1:{0}涨停{1},不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.bid_price_1)) + return + if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.lowerLimit) \ + and self.last_leg1_tick.bidVolume1 == 0: + self.gateway.write_log( + u'leg1:{0}跌停{1},不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.ask_price_1)) + return + if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.upperLimit) \ + and self.last_leg2_tick.askVolume1 == 0: + self.gateway.write_log( + u'leg2:{0}涨停{1},不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.bid_price_1)) + return + if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.lowerLimit) \ + and self.last_leg2_tick.bidVolume1 == 0: + self.gateway.write_log( + u'leg2:{0}跌停{1},不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.ask_price_1)) + return + + if self.trading_day != tick.trading_day: + self.trading_day = tick.trading_day + self.spread_high = None + self.spread_low = None + self.ratio_high = None + self.ratio_low = None + + if self.is_spread: + spread_tick = TickData(gateway_name=self.gateway_name, + symbol=self.symbol, + exchange=tick.exchange, + datetime=tick.datetime) + + spread_tick.trading_day = tick.trading_day + spread_tick.date = tick.date + spread_tick.time = tick.time + + # 叫卖价差=leg1.ask_price_1 * 配比 - leg2.bid_price_1 * 配比,volume为两者最小 + spread_tick.ask_price_1 = round_to(target=self.price_tick, + value=self.last_leg1_tick.ask_price_1 * self.leg1_ratio - self.last_leg2_tick.bid_price_1 * self.leg2_ratio) + spread_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) + + # 叫买价差=leg1.bid_price_1 * 配比 - leg2.ask_price_1 * 配比,volume为两者最小 + spread_tick.bid_price_1 = round_to(target=self.price_tick, + value=self.last_leg1_tick.bid_price_1 * self.leg1_ratio - self.last_leg2_tick.ask_price_1 * self.leg2_ratio) + spread_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) + + # 最新价 + spread_tick.last_price = round_to(target=self.price_tick, + value=(spread_tick.ask_price_1 + spread_tick.bid_price_1) / 2) + # 昨收盘价 + if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: + spread_tick.pre_close = round_to(target=self.price_tick, + value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio) + # 开盘价 + if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: + spread_tick.openPrice = round_to(target=self.price_tick, + value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio) + # 最高价 + self.spread_high = spread_tick.ask_price_1 if self.spread_high is None else max(self.spread_high, + spread_tick.ask_price_1) + spread_tick.high_price = self.spread_high + + # 最低价 + self.spread_low = spread_tick.bid_price_1 if self.spread_low is None else min(self.spread_low, + spread_tick.bid_price_1) + spread_tick.low_price = self.spread_low + + self.gateway.on_tick(spread_tick) + + if self.is_ratio: + ratio_tick = TickData(gatway_name=self.gateway_name, + symbol=self.symbol, + exchange=tick.exchange, + datetime=tick.datetime) + + ratio_tick.trading_day = tick.trading_day + ratio_tick.date = tick.date + ratio_tick.time = tick.time + + # 比率tick + ratio_tick.ask_price_1 = round_to(target=self.price_tick, + value=100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio / ( + self.last_leg2_tick.bid_price_1 * self.leg2_ratio)) + ratio_tick.askVolume1 = min(self.last_leg1_tick.askVolume1, self.last_leg2_tick.bidVolume1) + + ratio_tick.bid_price_1 = round_to(target=self.price_tick, + value=100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio / ( + self.last_leg2_tick.ask_price_1 * self.leg2_ratio)) + ratio_tick.bidVolume1 = min(self.last_leg1_tick.bidVolume1, self.last_leg2_tick.askVolume1) + ratio_tick.lastPrice = round_to(target=self.price_tick, + value=(ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2) + + # 昨收盘价 + if self.last_leg2_tick.preClosePrice > 0 and self.last_leg1_tick.preClosePrice > 0: + ratio_tick.preClosePrice = round_to(target=self.price_tick, + value=100 * self.last_leg1_tick.preClosePrice * self.leg1_ratio / ( + self.last_leg2_tick.preClosePrice * self.leg2_ratio)) + # 开盘价 + if self.last_leg2_tick.openPrice > 0 and self.last_leg1_tick.openPrice > 0: + ratio_tick.openPrice = round_to(target=self.price_tick, + value=100 * self.last_leg1_tick.openPrice * self.leg1_ratio / ( + self.last_leg2_tick.openPrice * self.leg2_ratio)) + # 最高价 + self.ratio_high = ratio_tick.ask_price_1 if self.ratio_high is None else max(self.ratio_high, + ratio_tick.ask_price_1) + ratio_tick.high_price = self.spread_high + + # 最低价 + self.ratio_low = ratio_tick.bid_price_1 if self.ratio_low is None else min(self.ratio_low, + ratio_tick.bid_price_1) + ratio_tick.low_price = self.spread_low + + self.gateway.on_tick(ratio_tick) diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index e46adf1d..12cc4db9 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -1,11 +1,12 @@ """ """ - +import os +import sys from abc import ABC, abstractmethod from typing import Any, Sequence from copy import copy -from logging import INFO +from logging import INFO, DEBUG, ERROR from vnpy.event import Event, EventEngine from .event import ( @@ -31,6 +32,9 @@ from .object import ( HistoryRequest ) +from vnpy.trader.utility import get_folder_path +from vnpy.trader.util_logger import setup_logger + class BaseGateway(ABC): """ @@ -81,6 +85,21 @@ class BaseGateway(ABC): """""" self.event_engine = event_engine self.gateway_name = gateway_name + self.logger = None + + self.create_logger() + + def create_logger(self): + """ + 创建engine独有的日志 + :return: + """ + log_path = get_folder_path("log") + log_filename = os.path.abspath(os.path.join(log_path, self.gateway_name)) + print(u'create logger:{}'.format(log_filename)) + from vnpy.trader.setting import SETTINGS + self.logger = setup_logger(file_name=log_filename, name=self.gateway_name, + log_level=SETTINGS.get('log.level', DEBUG)) def on_event(self, type: str, data: Any = None): """ @@ -141,12 +160,29 @@ class BaseGateway(ABC): """ self.on_event(EVENT_CONTRACT, contract) - def write_log(self, msg: str, level: int = INFO): + def write_log(self, msg: str, level: int = INFO, on_log: bool = False): """ Write a log event from gateway. """ - log = LogData(msg=msg, level=level, gateway_name=self.gateway_name) - self.on_log(log) + if self.logger: + self.logger.log(level, msg) + + if on_log: + log = LogData(msg=msg, level=level, gateway_name=self.gateway_name) + self.on_log(log) + + def write_error(self, msg: str, error: dict = {}): + """ + write error log + :param msg: + :return: + """ + if len(error) > 0: + error_id = error.get("ErrorID", '') + error_msg = error.get("ErrorMsg", '') + msg = f"{msg},代码:{error_id},信息:{error_msg}" + self.write_log(msg, level=ERROR, on_log=True) + print(msg, file=sys.stderr) @abstractmethod def connect(self, setting: dict): @@ -273,7 +309,7 @@ class LocalOrderManager: # For generating local orderid self.order_prefix = order_prefix self.order_count = 0 - self.orders = {} # local_orderid:order + self.orders = {} # local_orderid:order # Map between local and system orderid self.local_sys_orderid_map = {} @@ -286,7 +322,7 @@ class LocalOrderManager: self.push_data_callback = None # Cancel request buf - self.cancel_request_buf = {} # local_orderid:req + self.cancel_request_buf = {} # local_orderid:req # Hook cancel order function self._cancel_order = gateway.cancel_order diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index 5c1b2e32..51914588 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -33,8 +33,8 @@ class TickData(BaseData): symbol: str exchange: Exchange datetime: datetime - date: str = "" # '%Y-%m-%d' - time: str = "" # '%H:%M:%S.%f' + date: str = "" # '%Y-%m-%d' + time: str = "" # '%H:%M:%S.%f' trading_day: str = "" # '%Y-%m-%d' name: str = "" @@ -108,14 +108,15 @@ class RenkoBarData(BarData): """ Renko bar data of a certain trading period. """ - seconds: int = 0 # 当前Bar的秒数(针对RenkoBar) - high_seconds: int = -1 # 当前Bar的上限秒数 - low_seconds: int = -1 # 当前bar的下限秒数 - height: float = 3 # 当前Bar的高度限制(针对RenkoBar和RangeBar类) - up_band: float = 0 # 高位区域的基线 - down_band: float = 0 # 低位区域的基线 - low_time = None # 最后一次进入低位区域的时间 - high_time = None # 最后一次进入高位区域的时间 + seconds: int = 0 # 当前Bar的秒数(针对RenkoBar) + high_seconds: int = -1 # 当前Bar的上限秒数 + low_seconds: int = -1 # 当前bar的下限秒数 + height: float = 3 # 当前Bar的高度限制(针对RenkoBar和RangeBar类) + up_band: float = 0 # 高位区域的基线 + down_band: float = 0 # 低位区域的基线 + low_time = None # 最后一次进入低位区域的时间 + high_time = None # 最后一次进入高位区域的时间 + @dataclass class OrderData(BaseData): @@ -252,15 +253,15 @@ class ContractData(BaseData): product: Product size: int pricetick: float - margin_rate: float = 0.1 # 保证金比率 + margin_rate: float = 0.1 # 保证金比率 - min_volume: float = 1 # minimum trading volume of the contract - stop_supported: bool = False # whether server supports stop order - net_position: bool = False # whether gateway uses net position volume - history_data: bool = False # whether gateway provides bar history data + min_volume: float = 1 # minimum trading volume of the contract + stop_supported: bool = False # whether server supports stop order + net_position: bool = False # whether gateway uses net position volume + history_data: bool = False # whether gateway provides bar history data option_strike: float = 0 - option_underlying: str = "" # vt_symbol of underlying contract + option_underlying: str = "" # vt_symbol of underlying contract option_type: OptionType = None option_expiry: datetime = None diff --git a/vnpy/trader/utility.py b/vnpy/trader/utility.py index 1340238d..36cc5d23 100644 --- a/vnpy/trader/utility.py +++ b/vnpy/trader/utility.py @@ -28,6 +28,7 @@ def func_time(over_ms: int = 0): :param :over_ms 超过多少毫秒, 提示信息 :return: """ + def run(func): @wraps(func) def wrapper(*args, **kwargs): @@ -38,7 +39,9 @@ def func_time(over_ms: int = 0): if execute_ms > over_ms: print('{} took {} ms'.format(func.__qualname__, execute_ms)) return result + return wrapper + return run @@ -125,7 +128,7 @@ def get_real_symbol_by_exchange(full_symbol, vn_exchange): if vn_exchange == Exchange.CFFEX: return full_symbol.upper() - if vn_exchange in [Exchange.DCE, Exchange.SHFE, Exchange.INE]: + if vn_exchange in [Exchange.DCE, Exchange.SHFE, Exchange.INE]: return full_symbol.lower() if vn_exchange == Exchange.CZCE: @@ -135,6 +138,7 @@ def get_real_symbol_by_exchange(full_symbol, vn_exchange): return full_symbol + def get_trading_date(dt: datetime = None): """ 根据输入的时间,返回交易日的日期 @@ -304,11 +308,11 @@ class BarGenerator: """ def __init__( - self, - on_bar: Callable, - window: int = 0, - on_window_bar: Callable = None, - interval: Interval = Interval.MINUTE + self, + on_bar: Callable, + window: int = 0, + on_window_bar: Callable = None, + interval: Interval = Interval.MINUTE ): """Constructor""" self.bar = None