[新功能] tdx 订阅指数全行情=> rabbit_mq =>ctp_gateway多账号接收

This commit is contained in:
msincenselee 2020-01-01 22:32:39 +08:00
parent c030a5daf6
commit c567bfe8db
8 changed files with 1405 additions and 57 deletions

View File

@ -0,0 +1,101 @@
import os
import sys
import multiprocessing
from time import sleep
from datetime import datetime, time
from logging import INFO
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
sys.path.append(ROOT_PATH)
print(f'append {ROOT_PATH} into sys.path')
from vnpy.event import EventEngine
from vnpy.trader.setting import SETTINGS
from vnpy.trader.engine import MainEngine
from vnpy.gateway.ctp import CtpGateway
from vnpy.app.index_tick_publisher import IndexTickPublisherApp
from vnpy.app.cta_strategy.base import EVENT_CTA_LOG
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
rebbit_setting = {
"host": "192.168.1.211"
}
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
publisher_engine = main_engine.add_app(IndexTickPublisherApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
sleep(10)
main_engine.write_log("启动连接tdx & rabbit")
publisher_engine.connect(rebbit_setting)
while True:
sleep(1)
def run_parent():
"""
Running in the parent process.
"""
print("启动CTA策略守护父进程")
# Chinese futures market trading period (day/night)
DAY_START = time(8, 45)
DAY_END = time(15, 30)
NIGHT_START = time(20, 45)
NIGHT_END = time(2, 45)
child_process = None
while True:
current_time = datetime.now().time()
trading = False
# Check whether in trading period
if (
(current_time >= DAY_START and current_time <= DAY_END)
or (current_time >= NIGHT_START)
or (current_time <= NIGHT_END)
):
trading = True
# Start child process in trading period
if trading and child_process is None:
print("启动子进程")
child_process = multiprocessing.Process(target=run_child)
child_process.start()
print("子进程启动成功")
# 非记录时间则退出子进程
if not trading and child_process is not None:
print("关闭子进程")
child_process.terminate()
child_process.join()
child_process = None
print("子进程关闭成功")
sleep(5)
if __name__ == "__main__":
run_parent()

View File

@ -37,7 +37,7 @@ def run_child():
event_engine = EventEngine() event_engine = EventEngine()
main_engine = MainEngine(event_engine) main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway) main_engine.add_gateway(CtpGateway)
record_engine = main_engine.add_app(TickRecorderApp) main_engine.add_app(TickRecorderApp)
main_engine.write_log("主引擎创建成功") main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log") log_engine = main_engine.get_engine("log")

View File

@ -0,0 +1,15 @@
# encoding: UTF-8
import os
from pathlib import Path
from vnpy.trader.app import BaseApp
from .engine import IndexTickPublisher, APP_NAME
class IndexTickPublisherApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = u'指数行情推送'
engine_class = IndexTickPublisher

View File

@ -0,0 +1,479 @@
# encoding: UTF-8
# 通达信指数行情发布器
# 华富资产
import os
import sys
import copy
import json
import traceback
from threading import Thread
from datetime import datetime, timedelta
from time import sleep
from logging import ERROR
from pytdx.exhq import TdxExHq_API
from vnpy.event import EventEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.event import EVENT_TIMER
from vnpy.trader.object import TickData
from vnpy.trader.utility import get_trading_date
from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS
from vnpy.app.cta_strategy_pro.base import (
NIGHT_MARKET_23,
NIGHT_MARKET_SQ2,
MARKET_DAY_ONLY)
from vnpy.amqp.producer import publisher
APP_NAME = 'INDEXDATAPUBLISHER'
class IndexTickPublisher(BaseEngine):
# 指数tick发布服务
# 通过通达信接口获取指数行情tick发布至rabbitMQ
# ----------------------------------------------------------------------
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(IndexTickPublisher, self).__init__(
main_engine, event_engine, APP_NAME)
self.main_engine = main_engine
self.event_engine = event_engine
self.create_logger(logger_name=APP_NAME)
self.last_minute = None
self.registerEvent()
self.req_interval = 0.5 # 操作请求间隔500毫秒
self.req_id = 0 # 操作请求编号
self.connection_status = False # 连接状态
self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典
self.symbol_market_dict = {} # tdx合约与tdx市场的字典
self.symbol_tick_dict = {} # tdx合约与最后一个Tick得字典
# self.queue = Queue() # 请求队列
self.pool = None # 线程池
self.req_thread = None # 定时器线程
self.ip_list = TDX_FUTURE_HOSTS
# tdx api
self.fail_ip_dict = {} # 失效得API 的连接服务器配置: IP_port: 分钟倒数
self.best_ip = None
self.best_port = None
self.best_name = None
self.api = None # API 的连接会话对象
self.last_tick_dt = None # 记录该会话对象的最后一个tick时间
self.last_sort_speed_dt = None
self.instrument_count = 50000
self.has_qry_instrument = False
# vt_setting.json内rabbitmq配置项
self.conf = {}
self.pub = None
def write_error(self, content: str):
self.write_log(msg=content, level=ERROR)
def create_publisher(self, conf):
"""创建rabbitmq 消息发布器"""
if self.pub:
return
try:
# 消息发布
self.pub = publisher(host=conf.get('host', 'localhost'),
port=conf.get('port', 5672),
user=conf.get('user', 'admin'),
password=conf.get('password', 'admin'),
channel_number=conf.get('channel_number', 1),
queue_name=conf.get('queue_name', ''),
routing_key=conf.get('routing_key', 'default'),
exchange=conf.get('exchange', 'x_fanout_idx_tick'))
except Exception as ex:
self.write_log(u'创建tick发布器异常:{}'.format(str(ex)))
# ----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_timer_event(self, event):
"""定时执行"""
dt = datetime.now()
if dt.minute == self.last_minute:
return
# 更新失效IP地址得counter
for k in list(self.fail_ip_dict.keys()):
c = self.fail_ip_dict.get(k, 0)
if c <= 0:
self.fail_ip_dict.pop(k, None)
else:
c -= 1
self.fail_ip_dict.update({k: c})
self.checkStatus()
# ----------------------------------------------------------------------
def ping(self, ip, port=7709):
"""
ping行情服务器
:param ip:
:param port:
:param type_:
:return:
"""
apix = TdxExHq_API()
__time1 = datetime.now()
try:
with apix.connect(ip, port):
if apix.get_instrument_count() > 10000:
_timestamp = (datetime.now() - __time1).total_seconds() * 1000
self.write_log('服务器{}:{},耗时:{}ms'.format(ip, port, _timestamp))
return _timestamp
else:
self.write_log(u'该服务器IP {}无响应.'.format(ip))
return timedelta(seconds=10).total_seconds() * 1000
except Exception as ex:
self.write_error(u'tdx ping服务器{},异常的响应{}'.format(ip,str(ex)))
return timedelta(seconds=10).total_seconds() * 1000
def sort_ip_speed(self):
"""
对所有服务器进行速度排序
:return:
"""
speed_result = []
for x in self.ip_list:
speed = self.ping(x['ip'], x['port'])
x.update({'speed': speed})
speed_result.append(copy.copy(x))
# 更新服务器,按照速度排序
self.ip_list = sorted(speed_result, key=lambda s: s['speed'])
self.write_log(u'服务器访问速度排序:{}'.format(self.ip_list))
# ----------------------------------------------------------------------
def select_best_ip(self):
"""
选择行情服务器
:return: IP地址 端口 服务器名称
"""
self.write_log(u'选择通达信行情服务器')
if self.last_sort_speed_dt is None or (datetime.now() - self.last_sort_speed_dt).total_seconds() > 60:
self.sort_ip_speed()
self.last_sort_speed_dt = datetime.now()
valid_ip_list = [x for x in self.ip_list if x.get('speed', 10000) < 10000]
if len(valid_ip_list) == 0:
self.write_error(u'未能找到合适速度得行情服务器')
return None, None, None
for server in valid_ip_list:
ip = server.get('ip')
port = server.get('port')
name = server.get('name', '{}:{}'.format(ip, port))
if '{}:{}'.format(ip, port) in self.fail_ip_dict:
self.write_log(u'{}:{}属于上次异常IP地址忽略'.format(ip, port))
continue
return ip, port, name
return None, None, None
def connect(self, rabbit_config: dict):
"""
连接通达讯行情服务器
:param n:
:return:
"""
if self.connection_status:
if self.api is not None or getattr(self.api, "client", None) is not None:
self.write_log(u'当前已经连接,不需要重新连接')
return
self.write_log(u'开始通达信行情服务器')
try:
self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True)
# 选取最佳服务器
self.best_ip, self.best_port, self.best_name = self.select_best_ip()
if self.best_ip is None or self.best_port is None:
self.write_error(u'未能选择到服务器')
self.write_log(u'api 选择 {}: {}:{}'.format(self.best_name, self.best_ip, self.best_port))
self.api.connect(self.best_ip, self.best_port)
# 尝试获取市场合约统计
c = self.api.get_instrument_count()
if c is None or c < 10:
err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip, self.best_port)
self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10})
self.write_error(err_msg)
else:
self.write_log(u'创建tdx连接')
self.last_tick_dt = datetime.now()
self.connection_status = True
self.instrument_count = c
except Exception as ex:
self.write_error(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc()))
self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10})
return
# 更新 symbol_exchange_dict , symbol_market_dict
self.write_log(u'查询合约')
self.qryInstrument()
self.conf.update(rabbit_config)
self.create_publisher(self.conf)
self.req_thread = Thread(target=self.run)
self.req_thread.start()
def reconnect(self):
"""
重连
:return:
"""
try:
self.best_ip, self.best_port, self.best_name = self.select_best_ip()
self.api = TdxExHq_API(heartbeat=True, auto_retry=True)
self.api.connect(self.best_ip, self.best_port)
# 尝试获取市场合约统计
c = self.api.get_instrument_count()
if c is None or c < 10:
err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip, self.best_port)
self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10})
self.write_error(err_msg)
else:
self.write_log(u'重新创建tdx连接')
sleep(1)
except Exception as ex:
self.write_error(u'重新连接服务器异常:{},{}'.format(str(ex), traceback.format_exc()))
self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10})
return
def close(self):
"""退出API"""
self.write_log(u'退出tdx API')
self.connection_status = False
if self.req_thread is not None:
self.write_log(u'退出请求线程')
self.req_thread.join()
if self.pub:
self.write_log(u'退出rabbitMQ 发布器')
self.pub.exit()
def checkStatus(self):
# self.write_log(u'检查tdx接口状态')
# 若还没有启动连接,就启动连接
over_time = self.last_tick_dt is None or (datetime.now() - self.last_tick_dt).total_seconds() > 60
if not self.connection_status or self.api is None or over_time:
self.write_log(u'tdx还没有启动连接就启动连接')
self.close()
self.api = None
self.reconnect()
# self.write_log(u'tdx接口状态正常')
def qryInstrument(self):
"""
查询/更新合约信息
:return:
"""
if not self.connection_status:
self.write_error(u'tdx连接状态为断开不能查询和更新合约信息')
return
if self.has_qry_instrument:
self.write_error(u'已经查询过一次合约信息,不再查询')
return
# 取得所有的合约信息
num = self.api.get_instrument_count()
if not isinstance(num, int):
return
all_contacts = sum(
[self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], [])
# [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}]
# 对所有合约处理,更新字典 指数合约-tdx市场指数合约-交易所
for tdx_contract in all_contacts:
tdx_symbol = tdx_contract.get('code', None)
if tdx_symbol is None or tdx_symbol[-2:] not in ['L9']:
continue
tdx_market_id = tdx_contract.get('market')
self.symbol_market_dict[tdx_symbol] = tdx_market_id
if tdx_market_id == 47: # 中金所
self.symbol_exchange_dict[tdx_symbol] = Exchange.CFFEX
elif tdx_market_id == 28: # 郑商所
self.symbol_exchange_dict[tdx_symbol] = Exchange.CZCE
elif tdx_market_id == 29: # 大商所
self.symbol_exchange_dict[tdx_symbol] = Exchange.DCE
elif tdx_market_id == 30: # 上期所+能源
self.symbol_exchange_dict[tdx_symbol] = Exchange.SHFE
elif tdx_market_id == 60: # 主力合约
self.write_log(u'主力合约:{}'.format(tdx_contract))
self.has_qry_instrument = True
def run(self):
# 版本3 :直接查询板块
try:
last_dt = datetime.now()
self.write_log(u'开始运行tdx,{}'.format(last_dt))
while self.connection_status:
try:
self.process_index_req()
except BrokenPipeError as bex:
self.write_error(u'BrokenPipeError{},重试重连tdx[{}]'.format(str(bex), 0))
self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10})
self.reconnect()
sleep(5)
break
except Exception as ex:
self.write_error(u'tdx exception:{},{}'.format(str(ex), traceback.format_exc()))
self.fail_ip_dict.update({'{}:{}'.format(self.best_ip, self.best_port): 10})
self.reconnect()
sleep(self.req_interval)
dt = datetime.now()
if last_dt.minute != dt.minute:
self.write_log('tdxcheck point. {},last_tick_dt:{}'.format(dt, self.last_tick_dt))
last_dt = dt
except Exception as ex:
self.write_error(u'tdx pool.run exception:{},{}'.format(str(ex), traceback.format_exc()))
self.write_error(u'tdx 线程 {}退出'.format(datetime.now()))
def process_index_req(self):
"""处理板块获取指数行情tick"""
# 获取通达信指数板块所有行情
rt_list = self.api.get_instrument_quote_list(42, 3, 0, 100)
if rt_list is None or len(rt_list) == 0:
self.write_log(u'tdx:get_instrument_quote_list() rt_list为空')
return
# 记录该接口的行情最后更新时间
self.last_tick_dt = datetime.now()
for d in list(rt_list):
tdx_symbol = d.get('code', None)
if tdx_symbol.endswith('L9'):
vn_symbol = tdx_symbol.replace('L9', '99').upper()
else:
vn_symbol = tdx_symbol.upper()
tick_datetime = datetime.now()
# 修正毫秒
last_tick = self.symbol_tick_dict.get(vn_symbol, None)
if (last_tick is not None) and tick_datetime.replace(microsecond=0) == last_tick.datetime:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
tick_datetime = tick_datetime.replace(microsecond=500)
else:
tick_datetime = tick_datetime.replace(microsecond=0)
tick = TickData(
gateway_name='tdx',
symbol=vn_symbol,
datetime=tick_datetime,
exchange=self.symbol_exchange_dict.get(tdx_symbol, Exchange.LOCAL)
)
tick.pre_close = float(d.get('ZuoJie', 0.0))
tick.high_price = float(d.get('ZuiGao', 0.0))
tick.open_price = float(d.get('JinKai', 0.0))
tick.low_price = float(d.get('ZuiDi', 0.0))
tick.last_price = float(d.get('MaiChu', 0.0))
tick.volume = int(d.get('XianLiang', 0))
tick.open_interest = d.get('ChiCangLiang')
tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12]
tick.date = tick.datetime.strftime('%Y-%m-%d')
tick.trading_day = get_trading_date(tick_datetime)
# 指数没有涨停和跌停就用昨日收盘价正负10%
tick.limit_up = tick.pre_close * 1.1
tick.limit_down = tick.pre_close * 0.9
# CTP只有一档行情
tick.bid_price_1 = float(d.get('MaiRuJia', 0.0))
tick.bid_volume_1 = int(d.get('MaiRuLiang', 0))
tick.ask_price_1 = float(d.get('MaiChuJia', 0.0))
tick.ask_volume_1 = int(d.get('MaiChuLiang', 0))
underlying_symbol = vn_symbol.replace('99', '').upper()
# 排除非交易时间得tick
if tick.exchange is Exchange.CFFEX:
if tick.datetime.hour not in [9, 10, 11, 13, 14, 15]:
continue
if tick.datetime.hour == 9 and tick.datetime.minute < 15:
continue
# 排除早盘 11:30~12:00
if tick.datetime.hour == 11 and tick.datetime.minute >= 30:
continue
if tick.datetime.hour == 15 and tick.datetime.minute >= 15 and underlying_symbol in ['T', 'TF', 'TS']:
continue
if tick.datetime.hour == 15 and underlying_symbol in ['IH', 'IF', 'IC']:
continue
else: # 大商所/郑商所,上期所,上海能源
# 排除非开盘小时
if tick.datetime.hour in [3, 4, 5, 6, 7, 8, 12, 15, 16, 17, 18, 19, 20]:
continue
# 排除早盘 10:15~10:30
if tick.datetime.hour == 10 and 15 <= tick.datetime.minute < 30:
continue
# 排除早盘 11:30~12:00
if tick.datetime.hour == 11 and tick.datetime.minute >= 30:
continue
# 排除午盘 13:00 ~13:30
if tick.datetime.hour == 13 and tick.datetime.minute < 30:
continue
# 排除凌晨2:30~3:00
if tick.datetime.hour == 2 and tick.datetime.minute >= 30:
continue
# 排除大商所/郑商所/上期所夜盘数据上期所夜盘数据 23:00 收盘
if underlying_symbol in NIGHT_MARKET_23:
if tick.datetime.hour in [23, 0, 1, 2]:
continue
# 排除上期所夜盘数据 1:00 收盘
if underlying_symbol in NIGHT_MARKET_SQ2:
if tick.datetime.hour in [1, 2]:
continue
# 排除日盘合约在夜盘得数据
if underlying_symbol in MARKET_DAY_ONLY and (tick.datetime.hour < 9 or tick.datetime.hour > 16):
# self.write_log(u'排除日盘合约{}在夜盘得数据'.format(short_symbol))
continue
self.symbol_tick_dict[tick.symbol] = tick
if self.pub:
d = copy.copy(tick.__dict__)
if isinstance(tick.datetime, datetime):
d.update({'datetime': tick.datetime.strftime('%Y-%m-%d %H:%M:%S.%f')})
d.update({'exchange': tick.exchange.value()})
d = json.dumps(d)
self.pub.pub(d)

View File

@ -1,7 +1,9 @@
""" """
""" """
import traceback
from datetime import datetime import json
from datetime import datetime, timedelta
from copy import copy
from vnpy.api.ctp import ( from vnpy.api.ctp import (
MdApi, MdApi,
@ -60,10 +62,26 @@ from vnpy.trader.object import (
) )
from vnpy.trader.utility import ( from vnpy.trader.utility import (
get_folder_path, get_folder_path,
get_trading_date get_trading_date,
get_underlying_symbol,
round_to
) )
from vnpy.trader.event import EVENT_TIMER from vnpy.trader.event import EVENT_TIMER
# 增加通达信指数接口行情
from time import sleep
from threading import Thread
from pytdx.exhq import TdxExHq_API
from vnpy.amqp.consumer import subscriber
from vnpy.data.tdx.tdx_common import (
TDX_FUTURE_HOSTS,
get_future_contracts,
get_cache_json,
save_cache_json,
TDX_FUTURE_CONFIG)
from vnpy.app.cta_strategy_pro.base import (
MARKET_DAY_ONLY, NIGHT_MARKET_23, NIGHT_MARKET_SQ2
)
STATUS_CTP2VT = { STATUS_CTP2VT = {
THOST_FTDC_OAS_Submitted: Status.SUBMITTING, THOST_FTDC_OAS_Submitted: Status.SUBMITTING,
@ -116,7 +134,6 @@ OPTIONTYPE_CTP2VT = {
THOST_FTDC_CP_PutOptions: OptionType.PUT THOST_FTDC_CP_PutOptions: OptionType.PUT
} }
symbol_exchange_map = {} symbol_exchange_map = {}
symbol_name_map = {} symbol_name_map = {}
symbol_size_map = {} symbol_size_map = {}
@ -137,7 +154,13 @@ class CtpGateway(BaseGateway):
"授权编码": "", "授权编码": "",
"产品信息": "" "产品信息": ""
} }
# 注
# 如果采用rabbit_mq拓展tdx指数行情default_setting中需要增加:
# "rabbit":
# {
# "host": "192.168.1.211",
# "exchange": "x_fanout_idx_tick"
# }
exchanges = list(EXCHANGE_CTP2VT.values()) exchanges = list(EXCHANGE_CTP2VT.values())
def __init__(self, event_engine): def __init__(self, event_engine):
@ -146,6 +169,13 @@ class CtpGateway(BaseGateway):
self.td_api = CtpTdApi(self) self.td_api = CtpTdApi(self)
self.md_api = CtpMdApi(self) self.md_api = CtpMdApi(self)
self.tdx_api = None
self.rabbit_api = None
self.combiner_conf_dict = {} # 保存合成器配置
# 自定义价差/加比的tick合成器
self.combiners = {}
self.tick_combiner_map = {}
def connect(self, setting: dict): def connect(self, setting: dict):
"""""" """"""
@ -157,27 +187,42 @@ class CtpGateway(BaseGateway):
appid = setting["产品名称"] appid = setting["产品名称"]
auth_code = setting["授权编码"] auth_code = setting["授权编码"]
product_info = setting["产品信息"] product_info = setting["产品信息"]
rabbit_dict = setting.get('rabbit', None)
if ( if (
(not td_address.startswith("tcp://")) (not td_address.startswith("tcp://"))
and (not td_address.startswith("ssl://")) and (not td_address.startswith("ssl://"))
): ):
td_address = "tcp://" + td_address td_address = "tcp://" + td_address
if ( if (
(not md_address.startswith("tcp://")) (not md_address.startswith("tcp://"))
and (not md_address.startswith("ssl://")) and (not md_address.startswith("ssl://"))
): ):
md_address = "tcp://" + md_address md_address = "tcp://" + md_address
self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid, product_info) self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid, product_info)
self.md_api.connect(md_address, userid, password, brokerid) self.md_api.connect(md_address, userid, password, brokerid)
if rabbit_dict:
self.rabbit_api = SubMdApi(gateway=self)
self.rabbit_api.connect(rabbit_dict)
else:
self.tdx_api = TdxMdApi(gateway=self)
self.tdx_api.connect()
self.init_query() self.init_query()
def subscribe(self, req: SubscribeRequest): def subscribe(self, req: SubscribeRequest):
"""""" """"""
self.md_api.subscribe(req) # 指数合约从tdx行情订阅
if req.symbol[-2:] in ['99']:
req.symbol = req.symbol.upper()
if self.tdx_api:
self.tdx_api.subscribe(req)
elif self.rabbit_api:
self.rabbit_api.subscribe(req)
else:
self.md_api.subscribe(req)
def send_order(self, req: OrderRequest): def send_order(self, req: OrderRequest):
"""""" """"""
@ -200,13 +245,6 @@ class CtpGateway(BaseGateway):
self.td_api.close() self.td_api.close()
self.md_api.close() self.md_api.close()
def write_error(self, msg: str, error: dict):
""""""
error_id = error["ErrorID"]
error_msg = error["ErrorMsg"]
msg = f"{msg},代码:{error_id},信息:{error_msg}"
self.write_log(msg)
def process_timer_event(self, event): def process_timer_event(self, event):
"""""" """"""
self.count += 1 self.count += 1
@ -224,6 +262,14 @@ class CtpGateway(BaseGateway):
self.query_functions = [self.query_account, self.query_position] self.query_functions = [self.query_account, self.query_position]
self.event_engine.register(EVENT_TIMER, self.process_timer_event) self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def on_custom_tick(self, tick):
"""推送自定义合约行情"""
# 自定义合约行情
for combiner in self.tick_combiner_map.get(tick.symbol, []):
tick = copy(tick)
combiner.on_tick(tick)
class CtpMdApi(MdApi): class CtpMdApi(MdApi):
"""""" """"""
@ -294,7 +340,7 @@ class CtpMdApi(MdApi):
if not exchange: if not exchange:
return return
timestamp = f"{data['ActionDay']} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}" timestamp = f"{data['ActionDay']} {data['UpdateTime']}.{int(data['UpdateMillisec'] / 100)}"
dt = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f") dt = datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f")
# 不处理开盘前的tick数据 # 不处理开盘前的tick数据
@ -690,14 +736,14 @@ class CtpTdApi(TdApi):
self.gateway.on_trade(trade) self.gateway.on_trade(trade)
def connect( def connect(
self, self,
address: str, address: str,
userid: str, userid: str,
password: str, password: str,
brokerid: int, brokerid: int,
auth_code: str, auth_code: str,
appid: str, appid: str,
product_info product_info
): ):
""" """
Start connection to server. Start connection to server.
@ -855,3 +901,669 @@ class CtpTdApi(TdApi):
"""""" """"""
if self.connect_status: if self.connect_status:
self.exit() self.exit()
class TdxMdApi():
"""
通达信数据行情API实现
订阅的指数行情更新合约的数据
"""
def __init__(self, gateway):
self.gateway = gateway # gateway对象
self.gateway_name = gateway.gateway_name # gateway对象名称
self.req_interval = 0.5 # 操作请求间隔500毫秒
self.req_id = 0 # 操作请求编号
self.connection_status = False # 连接状态
self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典
self.symbol_market_dict = {} # tdx合约与tdx市场的字典
self.symbol_vn_dict = {} # tdx合约与vtSymbol的对应
self.symbol_tick_dict = {} # tdx合约与最后一个Tick得字典
# tdx 期货配置本地缓存
self.future_contracts = get_future_contracts()
self.registered_symbol_set = set()
self.thread = None # 查询线程
self.ip_list = TDX_FUTURE_HOSTS
# 调出
self.best_ip = {} # 最佳IP地址和端口
self.api = None # API 的连接会话对象
self.last_tick_dt = datetime.now() # 记录该会话对象的最后一个tick时间
self.instrument_count = 50000
self.has_qry_instrument = False
# ----------------------------------------------------------------------
def ping(self, ip, port=7709):
"""
ping行情服务器
:param ip:
:param port:
:param type_:
:return:
"""
apix = TdxExHq_API()
__time1 = datetime.now()
try:
with apix.connect(ip, port):
if apix.get_instrument_count() > 10000:
_timestamp = (datetime.now() - __time1).total_seconds() * 1000
self.gateway.write_log('服务器{}:{},耗时:{}ms'.format(ip, port, _timestamp))
return _timestamp
else:
self.gateway.write_log(u'该服务器IP {}无响应.'.format(ip))
return timedelta(seconds=10).total_seconds() * 1000
except Exception as ex:
self.gateway.write_log(u'tdx ping服务器{},异常的响应{}'.format(ip, str(ex)))
return timedelta(seconds=10).total_seconds() * 1000
def sort_ip_speed(self):
"""
对所有服务器进行速度排序
:return:
"""
speed_result = []
for x in self.ip_list:
speed = self.ping(x['ip'], x['port'])
x.update({'speed': speed})
speed_result.append(copy(x))
# 更新服务器,按照速度排序
speed_result = sorted(speed_result, key=lambda s: s['speed'])
self.gateway.write_log(u'服务器访问速度排序:{}'.format(speed_result))
return speed_result
# ----------------------------------------------------------------------
def select_best_ip(self, exclude_ip: str = None):
"""
选择行情服务器
:param: exclude_ip, 排除的ip地址
:return:
"""
self.gateway.write_log(u'选择通达信行情服务器')
ip_list = self.sort_ip_speed()
valid_ip_list = [x for x in ip_list if x.get('speed', 10000) < 10000 and x.get('ip') != exclude_ip]
if len(valid_ip_list) == 0:
self.gateway.write_error(u'未能找到合适速度得行情服务器')
return None
best_future_ip = valid_ip_list[0]
save_cache_json(best_future_ip, TDX_FUTURE_CONFIG)
return best_future_ip
def connect(self, is_reconnect=False):
"""
连接通达讯行情服务器
:param is_reconnect:是否重连
:return:
"""
# 创建api连接对象实例
try:
if self.api is None or not self.connection_status:
self.gateway.write_log(u'开始连接通达信行情服务器')
self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True)
# 选取最佳服务器
if is_reconnect or len(self.best_ip) == 0:
self.best_ip = get_cache_json(TDX_FUTURE_CONFIG)
if len(self.best_ip) == 0:
self.best_ip = self.select_best_ip()
self.api.connect(self.best_ip['ip'], self.best_ip['port'])
# 尝试获取市场合约统计
c = self.api.get_instrument_count()
if c < 10:
err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'], self.best_ip['port'])
self.gateway.write_error(err_msg)
else:
self.gateway.write_log(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port']))
self.connection_status = True
self.thread = Thread(target=self.run)
self.thread.start()
except Exception as ex:
self.gateway.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc()))
return
def close(self):
"""退出API"""
self.gateway.write_log(u'退出tdx API')
self.connection_status = False
if self.thread:
self.thread.join()
# ----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅合约"""
# 这里的设计是,如果尚未登录就调用了订阅方法
# 则先保存订阅请求,登录完成后会自动订阅
vn_symbol = str(subscribeReq.symbol)
vn_symbol = vn_symbol.upper()
self.gateway.write_log(u'通达信行情订阅 {}'.format(str(vn_symbol)))
if vn_symbol[-2:] != '99':
self.gateway.write_log(u'{}不是指数合约,不能订阅'.format(vn_symbol))
return
tdx_symbol = vn_symbol[0:-2] + 'L9'
tdx_symbol = tdx_symbol.upper()
self.gateway.write_log(u'{}=>{}'.format(vn_symbol, tdx_symbol))
self.symbol_vn_dict[tdx_symbol] = vn_symbol
if tdx_symbol not in self.registered_symbol_set:
self.registered_symbol_set.add(tdx_symbol)
self.check_status()
def check_status(self):
# self.write_log(u'检查tdx接口状态')
if len(self.registered_symbol_set) == 0:
return
# 若还没有启动连接,就启动连接
over_time = (datetime.now() - self.last_tick_dt).total_seconds() > 60
if not self.connection_status or self.api is None or over_time:
self.gateway.write_log(u'tdx还没有启动连接就启动连接')
self.close()
self.thread = None
self.connect(is_reconnect=True)
def qry_instrument(self):
"""
查询/更新合约信息
:return:
"""
if not self.connection_status:
self.gateway.write_error(u'tdx连接状态为断开不能查询和更新合约信息')
return
if self.has_qry_instrument:
self.gateway.write_error(u'已经查询过一次合约信息,不再查询')
return
# 取得所有的合约信息
num = self.api.get_instrument_count()
if not isinstance(num, int):
return
all_contacts = sum(
[self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], [])
# [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}]
# 对所有合约处理,更新字典 指数合约-tdx市场指数合约-交易所
for tdx_contract in all_contacts:
tdx_symbol = tdx_contract.get('code', None)
if tdx_symbol is None or tdx_symbol[-2:] not in ['L9']:
continue
tdx_market_id = tdx_contract.get('market')
self.symbol_market_dict[tdx_symbol] = tdx_market_id
if tdx_market_id == 47: # 中金所
self.symbol_exchange_dict[tdx_symbol] = Exchange.CFFEX
elif tdx_market_id == 28: # 郑商所
self.symbol_exchange_dict[tdx_symbol] = Exchange.CZCE
elif tdx_market_id == 29: # 大商所
self.symbol_exchange_dict[tdx_symbol] = Exchange.DCE
elif tdx_market_id == 30: # 上期所+能源
self.symbol_exchange_dict[tdx_symbol] = Exchange.SHFE
elif tdx_market_id == 60: # 主力合约
self.gateway.write_log(u'主力合约:{}'.format(tdx_contract))
self.has_qry_instrument = True
def run(self):
# 直接查询板块
try:
last_dt = datetime.now()
self.gateway.write_log(u'开始运行tdx查询指数行情线程,{}'.format(last_dt))
while self.connection_status:
if len(self.registered_symbol_set) > 0:
try:
self.process_index_req()
except BrokenPipeError as bex:
self.gateway.write_error(u'BrokenPipeError{},重试重连tdx[{}]'.format(str(bex), 0))
self.connect(is_reconnect=True)
sleep(5)
break
except Exception as ex:
self.gateway.write_error(u'tdx exception:{},{}'.format(str(ex), traceback.format_exc()))
self.gateway.write_error(u'重试重连tdx')
self.connect(is_reconnect=True)
sleep(self.req_interval)
dt = datetime.now()
if last_dt.minute != dt.minute:
self.gateway.write_log(
'tdx check point. {}, process symbols:{}'.format(dt, self.registered_symbol_set))
last_dt = dt
except Exception as ex:
self.gateway.write_error(u'tdx thead.run exception:{},{}'.format(str(ex), traceback.format_exc()))
self.gateway.write_error(u'tdx查询线程 {}退出'.format(datetime.now()))
def process_index_req(self):
"""处理板块获取指数行情tick"""
# 获取通达信指数板块所有行情
rt_list = self.api.get_instrument_quote_list(42, 3, 0, 100)
if rt_list is None or len(rt_list) == 0:
self.gateway.write_log(u'tdx: rt_list为空')
return
# 记录该接口的行情最后更新时间
self.last_tick_dt = datetime.now()
for d in list(rt_list):
tdx_symbol = d.get('code', None)
if tdx_symbol not in self.registered_symbol_set and tdx_symbol is not None:
continue
# tdx_symbol => vn_symbol
vn_symbol = self.symbol_vn_dict.get(tdx_symbol, None)
if vn_symbol is None:
self.gateway.write_error(u'self.symbol_vn_dict 取不到映射得:{}'.format(tdx_symbol))
continue
# vn_symbol => exchange
exchange = self.symbol_exchange_dict.get(tdx_symbol, None)
underlying_symbol = get_underlying_symbol(vn_symbol)
if exchange is None:
symbol_info = self.future_contracts.get(underlying_symbol, None)
if not symbol_info:
continue
exchange_value = symbol_info.get('exchange', None)
exchange = Exchange(exchange_value)
if exchange is None:
continue
self.symbol_exchange_dict.update({tdx_symbol: exchange})
tick_datetime = datetime.now()
# 修正毫秒
last_tick = self.symbol_tick_dict.get(vn_symbol, None)
if (last_tick is not None) and tick_datetime.replace(microsecond=0) == last_tick.datetime:
# 与上一个tick的时间去除毫秒后相同,修改为500毫秒
tick_datetime = tick_datetime.replace(microsecond=500)
else:
tick_datetime = tick_datetime.replace(microsecond=0)
tick = TickData(gateway_name=self.gateway_name,
symbol=vn_symbol,
exchange=exchange,
datetime=tick_datetime)
tick.pre_close = float(d.get('ZuoJie', 0.0))
tick.high_price = float(d.get('ZuiGao', 0.0))
tick.open_price = float(d.get('JinKai', 0.0))
tick.low_price = float(d.get('ZuiDi', 0.0))
tick.last_price = float(d.get('MaiChu', 0.0))
tick.volume = int(d.get('XianLiang', 0))
tick.open_interest = d.get('ChiCangLiang')
tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12]
tick.date = tick.datetime.strftime('%Y-%m-%d')
tick.trading_day = get_trading_date(tick.datetime)
# 指数没有涨停和跌停就用昨日收盘价正负10%
tick.limit_up = tick.pre_close * 1.1
tick.limit_down = tick.pre_close * 0.9
# CTP只有一档行情
tick.bid_price_1 = float(d.get('MaiRuJia', 0.0))
tick.bid_volume_1 = int(d.get('MaiRuLiang', 0))
tick.ask_price_1 = float(d.get('MaiChuJia', 0.0))
tick.ask_volume_1 = int(d.get('MaiChuLiang', 0))
# 排除非交易时间得tick
if tick.exchange is Exchange.CFFEX:
if tick.datetime.hour not in [9, 10, 11, 13, 14, 15]:
continue
if tick.datetime.hour == 9 and tick.datetime.minute < 15:
continue
# 排除早盘 11:30~12:00
if tick.datetime.hour == 11 and tick.datetime.minute >= 30:
continue
if tick.datetime.hour == 15 and tick.datetime.minute >= 15 and underlying_symbol in ['T', 'TF', 'TS']:
continue
if tick.datetime.hour == 15 and underlying_symbol in ['IH', 'IF', 'IC']:
continue
else: # 大商所/郑商所,上期所,上海能源
# 排除非开盘小时
if tick.datetime.hour in [3, 4, 5, 6, 7, 8, 12, 15, 16, 17, 18, 19, 20]:
continue
# 排除早盘 10:15~10:30
if tick.datetime.hour == 10 and 15 <= tick.datetime.minute < 30:
continue
# 排除早盘 11:30~12:00
if tick.datetime.hour == 11 and tick.datetime.minute >= 30:
continue
# 排除午盘 13:00 ~13:30
if tick.datetime.hour == 13 and tick.datetime.minute < 30:
continue
# 排除凌晨2:30~3:00
if tick.datetime.hour == 2 and tick.datetime.minute >= 30:
continue
# 排除大商所/郑商所夜盘数据上期所夜盘数据 23:00 收盘
if underlying_symbol in NIGHT_MARKET_23:
if tick.datetime.hour in [23, 0, 1, 2]:
continue
# 排除上期所夜盘数据 1:00 收盘
if underlying_symbol in NIGHT_MARKET_SQ2:
if tick.datetime.hour in [1, 2]:
continue
# 排除日盘合约在夜盘得数据
if underlying_symbol in MARKET_DAY_ONLY and (tick.datetime.hour < 9 or tick.datetime.hour > 16):
# self.write_log(u'排除日盘合约{}在夜盘得数据'.format(short_symbol))
continue
# self.gateway.write_log(f'{tick.__dict__}')
self.symbol_tick_dict[tick.symbol] = tick
self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick)
class SubMdApi():
"""
RabbitMQ Subscriber 数据行情接收API
"""
def __init__(self, gateway):
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.symbol_tick_dict = {} # 合约与最后一个Tick得字典
self.registed_symbol_set = set() # 订阅的合约记录集
self.sub = None
self.setting = {}
self.connect_status = False
self.thread = None
def connect(self, setting={}):
"""连接"""
self.setting = setting
try:
self.sub = subscriber(
host=self.setting.get('host', 'localhost'),
port=self.setting.get('port', 5672),
user=self.setting.get('user', 'admin'),
password=self.setting.get('password', 'admin'),
exchange=self.setting.get('exchange', 'x_fanout_idx_tick'))
self.sub.set_callback(self.on_message)
self.thread = Thread(target=self.sub.start)
self.thread.start()
self.connect_status = True
except Exception as ex:
self.gateway.write_error(u'连接RabbitMQ {} 异常:{}'.format(self.setting, str(ex)))
self.gateway.write_error(traceback.format_exc())
self.connect_status = False
def on_message(self, chan, method_frame, _header_frame, body, userdata=None):
# print(" [x] %r" % body)
try:
str_tick = body.decode('utf-8')
d = json.loads(str_tick)
symbol = d.pop('symbol', None)
str_datetime = d.pop('datetime', None)
if symbol not in self.registed_symbol_set or str_datetime is None:
return
if '.' in str_datetime:
dt = datetime.strptime(str_datetime, '%Y-%m-%d %H:%M:%S.%f')
else:
dt = datetime.strptime(str_datetime, '%Y-%m-%d %H:%M:%S')
d.pop('rawData', None)
tick = TickData(gateway_name=self.gateway_name,
exchange=Exchange(d.get('exchange')),
symbol=d.get('symbol'),
datetime=dt)
d.pop('exchange', None)
d.pop('symbol', None)
d.pop()
tick.__dict__.update(d)
self.symbol_tick_dict[symbol] = tick
self.gateway.on_tick(tick)
self.gateway.on_custom_tick(tick)
except Exception as ex:
self.gateway.write_error(u'RabbitMQ on_message 异常:{}'.format(str(ex)))
self.gateway.write_error(traceback.format_exc())
def close(self):
"""退出API"""
self.gateway.write_log(u'退出rabbit行情订阅API')
self.connection_status = False
try:
if self.sub:
self.gateway.write_log(u'关闭订阅器')
self.sub.close()
if self.thread is not None:
self.gateway.write_log(u'关闭订阅器接收线程')
self.thread.join()
except Exception as ex:
self.gateway.write_error(u'退出rabbitMQ行情api异常')
# ----------------------------------------------------------------------
def subscribe(self, subscribeReq):
"""订阅合约"""
# 这里的设计是,如果尚未登录就调用了订阅方法
# 则先保存订阅请求,登录完成后会自动订阅
vn_symbol = str(subscribeReq.symbol)
vn_symbol = vn_symbol.upper()
if vn_symbol not in self.registed_symbol_set:
self.registed_symbol_set.add(vn_symbol)
self.gateway.write_log(u'RabbitMQ行情订阅 {}'.format(str(vn_symbol)))
class TickCombiner(object):
"""
Tick合成类
"""
def __init__(self, gateway, setting):
self.gateway = gateway
self.gateway_name = self.gateway.gateway_name
self.gateway.write_log(u'创建tick合成类:{}'.format(setting))
self.symbol = setting.get('symbol', None)
self.leg1_symbol = setting.get('leg1_symbol', None)
self.leg2_symbol = setting.get('leg2_symbol', None)
self.leg1_ratio = setting.get('leg1_ratio', 1) # 腿1的数量配比
self.leg2_ratio = setting.get('leg2_ratio', 1) # 腿2的数量配比
self.price_tick = setting.get('price_tick', 1) # 合成价差加比后的最小跳动
# 价差
self.is_spread = setting.get('is_spread', False)
# 价比
self.is_ratio = setting.get('is_ratio', False)
self.last_leg1_tick = None
self.last_leg2_tick = None
# 价差日内最高/最低价
self.spread_high = None
self.spread_low = None
# 价比日内最高/最低价
self.ratio_high = None
self.ratio_low = None
# 当前交易日
self.trading_day = None
if self.is_ratio and self.is_spread:
self.gateway.write_error(u'{}参数有误,不能同时做价差/加比.setting:{}'.format(self.symbol, setting))
return
self.gateway.write_log(u'初始化{}合成器成功'.format(self.symbol))
if self.is_spread:
self.gateway.write_log(
u'leg1:{} * {} - leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol,
self.leg2_ratio))
if self.is_ratio:
self.gateway.write_log(
u'leg1:{} * {} / leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol,
self.leg2_ratio))
def on_tick(self, tick):
"""OnTick处理"""
combinable = False
if tick.symbol == self.leg1_symbol:
# leg1合约
self.last_leg1_tick = tick
if self.last_leg2_tick is not None:
if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace(
microsecond=0):
combinable = True
elif tick.symbol == self.leg2_symbol:
# leg2合约
self.last_leg2_tick = tick
if self.last_leg1_tick is not None:
if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace(
microsecond=0):
combinable = True
# 不能合并
if not combinable:
return
if not self.is_ratio and not self.is_spread:
return
# 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick
if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.upperLimit) \
and self.last_leg1_tick.askVolume1 == 0:
self.gateway.write_log(
u'leg1:{0}涨停{1}不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.bid_price_1))
return
if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.lowerLimit) \
and self.last_leg1_tick.bidVolume1 == 0:
self.gateway.write_log(
u'leg1:{0}跌停{1}不合成价差Tick'.format(self.last_leg1_tick.vtSymbol, self.last_leg1_tick.ask_price_1))
return
if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.upperLimit) \
and self.last_leg2_tick.askVolume1 == 0:
self.gateway.write_log(
u'leg2:{0}涨停{1}不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.bid_price_1))
return
if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.lowerLimit) \
and self.last_leg2_tick.bidVolume1 == 0:
self.gateway.write_log(
u'leg2:{0}跌停{1}不合成价差Tick'.format(self.last_leg2_tick.vtSymbol, self.last_leg2_tick.ask_price_1))
return
if self.trading_day != tick.trading_day:
self.trading_day = tick.trading_day
self.spread_high = None
self.spread_low = None
self.ratio_high = None
self.ratio_low = None
if self.is_spread:
spread_tick = TickData(gateway_name=self.gateway_name,
symbol=self.symbol,
exchange=tick.exchange,
datetime=tick.datetime)
spread_tick.trading_day = tick.trading_day
spread_tick.date = tick.date
spread_tick.time = tick.time
# 叫卖价差=leg1.ask_price_1 * 配比 - leg2.bid_price_1 * 配比volume为两者最小
spread_tick.ask_price_1 = round_to(target=self.price_tick,
value=self.last_leg1_tick.ask_price_1 * self.leg1_ratio - self.last_leg2_tick.bid_price_1 * self.leg2_ratio)
spread_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1)
# 叫买价差=leg1.bid_price_1 * 配比 - leg2.ask_price_1 * 配比volume为两者最小
spread_tick.bid_price_1 = round_to(target=self.price_tick,
value=self.last_leg1_tick.bid_price_1 * self.leg1_ratio - self.last_leg2_tick.ask_price_1 * self.leg2_ratio)
spread_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1)
# 最新价
spread_tick.last_price = round_to(target=self.price_tick,
value=(spread_tick.ask_price_1 + spread_tick.bid_price_1) / 2)
# 昨收盘价
if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0:
spread_tick.pre_close = round_to(target=self.price_tick,
value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio)
# 开盘价
if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0:
spread_tick.openPrice = round_to(target=self.price_tick,
value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio)
# 最高价
self.spread_high = spread_tick.ask_price_1 if self.spread_high is None else max(self.spread_high,
spread_tick.ask_price_1)
spread_tick.high_price = self.spread_high
# 最低价
self.spread_low = spread_tick.bid_price_1 if self.spread_low is None else min(self.spread_low,
spread_tick.bid_price_1)
spread_tick.low_price = self.spread_low
self.gateway.on_tick(spread_tick)
if self.is_ratio:
ratio_tick = TickData(gatway_name=self.gateway_name,
symbol=self.symbol,
exchange=tick.exchange,
datetime=tick.datetime)
ratio_tick.trading_day = tick.trading_day
ratio_tick.date = tick.date
ratio_tick.time = tick.time
# 比率tick
ratio_tick.ask_price_1 = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio / (
self.last_leg2_tick.bid_price_1 * self.leg2_ratio))
ratio_tick.askVolume1 = min(self.last_leg1_tick.askVolume1, self.last_leg2_tick.bidVolume1)
ratio_tick.bid_price_1 = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio / (
self.last_leg2_tick.ask_price_1 * self.leg2_ratio))
ratio_tick.bidVolume1 = min(self.last_leg1_tick.bidVolume1, self.last_leg2_tick.askVolume1)
ratio_tick.lastPrice = round_to(target=self.price_tick,
value=(ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2)
# 昨收盘价
if self.last_leg2_tick.preClosePrice > 0 and self.last_leg1_tick.preClosePrice > 0:
ratio_tick.preClosePrice = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.preClosePrice * self.leg1_ratio / (
self.last_leg2_tick.preClosePrice * self.leg2_ratio))
# 开盘价
if self.last_leg2_tick.openPrice > 0 and self.last_leg1_tick.openPrice > 0:
ratio_tick.openPrice = round_to(target=self.price_tick,
value=100 * self.last_leg1_tick.openPrice * self.leg1_ratio / (
self.last_leg2_tick.openPrice * self.leg2_ratio))
# 最高价
self.ratio_high = ratio_tick.ask_price_1 if self.ratio_high is None else max(self.ratio_high,
ratio_tick.ask_price_1)
ratio_tick.high_price = self.spread_high
# 最低价
self.ratio_low = ratio_tick.bid_price_1 if self.ratio_low is None else min(self.ratio_low,
ratio_tick.bid_price_1)
ratio_tick.low_price = self.spread_low
self.gateway.on_tick(ratio_tick)

View File

@ -1,11 +1,12 @@
""" """
""" """
import os
import sys
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Sequence from typing import Any, Sequence
from copy import copy from copy import copy
from logging import INFO from logging import INFO, DEBUG, ERROR
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
from .event import ( from .event import (
@ -31,6 +32,9 @@ from .object import (
HistoryRequest HistoryRequest
) )
from vnpy.trader.utility import get_folder_path
from vnpy.trader.util_logger import setup_logger
class BaseGateway(ABC): class BaseGateway(ABC):
""" """
@ -81,6 +85,21 @@ class BaseGateway(ABC):
"""""" """"""
self.event_engine = event_engine self.event_engine = event_engine
self.gateway_name = gateway_name self.gateway_name = gateway_name
self.logger = None
self.create_logger()
def create_logger(self):
"""
创建engine独有的日志
:return:
"""
log_path = get_folder_path("log")
log_filename = os.path.abspath(os.path.join(log_path, self.gateway_name))
print(u'create logger:{}'.format(log_filename))
from vnpy.trader.setting import SETTINGS
self.logger = setup_logger(file_name=log_filename, name=self.gateway_name,
log_level=SETTINGS.get('log.level', DEBUG))
def on_event(self, type: str, data: Any = None): def on_event(self, type: str, data: Any = None):
""" """
@ -141,12 +160,29 @@ class BaseGateway(ABC):
""" """
self.on_event(EVENT_CONTRACT, contract) self.on_event(EVENT_CONTRACT, contract)
def write_log(self, msg: str, level: int = INFO): def write_log(self, msg: str, level: int = INFO, on_log: bool = False):
""" """
Write a log event from gateway. Write a log event from gateway.
""" """
log = LogData(msg=msg, level=level, gateway_name=self.gateway_name) if self.logger:
self.on_log(log) self.logger.log(level, msg)
if on_log:
log = LogData(msg=msg, level=level, gateway_name=self.gateway_name)
self.on_log(log)
def write_error(self, msg: str, error: dict = {}):
"""
write error log
:param msg:
:return:
"""
if len(error) > 0:
error_id = error.get("ErrorID", '')
error_msg = error.get("ErrorMsg", '')
msg = f"{msg},代码:{error_id},信息:{error_msg}"
self.write_log(msg, level=ERROR, on_log=True)
print(msg, file=sys.stderr)
@abstractmethod @abstractmethod
def connect(self, setting: dict): def connect(self, setting: dict):
@ -273,7 +309,7 @@ class LocalOrderManager:
# For generating local orderid # For generating local orderid
self.order_prefix = order_prefix self.order_prefix = order_prefix
self.order_count = 0 self.order_count = 0
self.orders = {} # local_orderid:order self.orders = {} # local_orderid:order
# Map between local and system orderid # Map between local and system orderid
self.local_sys_orderid_map = {} self.local_sys_orderid_map = {}
@ -286,7 +322,7 @@ class LocalOrderManager:
self.push_data_callback = None self.push_data_callback = None
# Cancel request buf # Cancel request buf
self.cancel_request_buf = {} # local_orderid:req self.cancel_request_buf = {} # local_orderid:req
# Hook cancel order function # Hook cancel order function
self._cancel_order = gateway.cancel_order self._cancel_order = gateway.cancel_order

View File

@ -33,8 +33,8 @@ class TickData(BaseData):
symbol: str symbol: str
exchange: Exchange exchange: Exchange
datetime: datetime datetime: datetime
date: str = "" # '%Y-%m-%d' date: str = "" # '%Y-%m-%d'
time: str = "" # '%H:%M:%S.%f' time: str = "" # '%H:%M:%S.%f'
trading_day: str = "" # '%Y-%m-%d' trading_day: str = "" # '%Y-%m-%d'
name: str = "" name: str = ""
@ -108,14 +108,15 @@ class RenkoBarData(BarData):
""" """
Renko bar data of a certain trading period. Renko bar data of a certain trading period.
""" """
seconds: int = 0 # 当前Bar的秒数针对RenkoBar) seconds: int = 0 # 当前Bar的秒数针对RenkoBar)
high_seconds: int = -1 # 当前Bar的上限秒数 high_seconds: int = -1 # 当前Bar的上限秒数
low_seconds: int = -1 # 当前bar的下限秒数 low_seconds: int = -1 # 当前bar的下限秒数
height: float = 3 # 当前Bar的高度限制针对RenkoBar和RangeBar类 height: float = 3 # 当前Bar的高度限制针对RenkoBar和RangeBar类
up_band: float = 0 # 高位区域的基线 up_band: float = 0 # 高位区域的基线
down_band: float = 0 # 低位区域的基线 down_band: float = 0 # 低位区域的基线
low_time = None # 最后一次进入低位区域的时间 low_time = None # 最后一次进入低位区域的时间
high_time = None # 最后一次进入高位区域的时间 high_time = None # 最后一次进入高位区域的时间
@dataclass @dataclass
class OrderData(BaseData): class OrderData(BaseData):
@ -252,15 +253,15 @@ class ContractData(BaseData):
product: Product product: Product
size: int size: int
pricetick: float pricetick: float
margin_rate: float = 0.1 # 保证金比率 margin_rate: float = 0.1 # 保证金比率
min_volume: float = 1 # minimum trading volume of the contract min_volume: float = 1 # minimum trading volume of the contract
stop_supported: bool = False # whether server supports stop order stop_supported: bool = False # whether server supports stop order
net_position: bool = False # whether gateway uses net position volume net_position: bool = False # whether gateway uses net position volume
history_data: bool = False # whether gateway provides bar history data history_data: bool = False # whether gateway provides bar history data
option_strike: float = 0 option_strike: float = 0
option_underlying: str = "" # vt_symbol of underlying contract option_underlying: str = "" # vt_symbol of underlying contract
option_type: OptionType = None option_type: OptionType = None
option_expiry: datetime = None option_expiry: datetime = None

View File

@ -28,6 +28,7 @@ def func_time(over_ms: int = 0):
:param :over_ms 超过多少毫秒, 提示信息 :param :over_ms 超过多少毫秒, 提示信息
:return: :return:
""" """
def run(func): def run(func):
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
@ -38,7 +39,9 @@ def func_time(over_ms: int = 0):
if execute_ms > over_ms: if execute_ms > over_ms:
print('{} took {} ms'.format(func.__qualname__, execute_ms)) print('{} took {} ms'.format(func.__qualname__, execute_ms))
return result return result
return wrapper return wrapper
return run return run
@ -125,7 +128,7 @@ def get_real_symbol_by_exchange(full_symbol, vn_exchange):
if vn_exchange == Exchange.CFFEX: if vn_exchange == Exchange.CFFEX:
return full_symbol.upper() return full_symbol.upper()
if vn_exchange in [Exchange.DCE, Exchange.SHFE, Exchange.INE]: if vn_exchange in [Exchange.DCE, Exchange.SHFE, Exchange.INE]:
return full_symbol.lower() return full_symbol.lower()
if vn_exchange == Exchange.CZCE: if vn_exchange == Exchange.CZCE:
@ -135,6 +138,7 @@ def get_real_symbol_by_exchange(full_symbol, vn_exchange):
return full_symbol return full_symbol
def get_trading_date(dt: datetime = None): def get_trading_date(dt: datetime = None):
""" """
根据输入的时间返回交易日的日期 根据输入的时间返回交易日的日期
@ -304,11 +308,11 @@ class BarGenerator:
""" """
def __init__( def __init__(
self, self,
on_bar: Callable, on_bar: Callable,
window: int = 0, window: int = 0,
on_window_bar: Callable = None, on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE interval: Interval = Interval.MINUTE
): ):
"""Constructor""" """Constructor"""
self.bar = None self.bar = None