diff --git a/vnpy/amqp/consumer.py b/vnpy/amqp/consumer.py index 13a1ae90..d7611df1 100644 --- a/vnpy/amqp/consumer.py +++ b/vnpy/amqp/consumer.py @@ -32,7 +32,7 @@ class receiver(base_broker): # self.channel.basic_qos(prefetch_count=1) def callback(self, chan, method_frame, _header_frame, body, userdata=None): - print(1) + #print(1) print(" [x] received: %r" % body) def subscribe(self): @@ -44,7 +44,7 @@ class receiver(base_broker): try: self.subscribe() except Exception as e: - print(e) + print('start consumer exception:{}'.format(str(e))) self.start() @@ -72,7 +72,7 @@ class worker(base_broker): self.channel.basic_qos(prefetch_count=1) def callback(self, chan, method_frame, _header_frame, body, userdata=None): - print(1) + #print(1) print(" [x] received task: %r" % body) chan.basic_ack(delivery_tag=method_frame.delivery_tag) print(" [x] task finished ") @@ -120,7 +120,7 @@ class subscriber(base_broker): self.cb_func = cb_func def callback(self, chan, method_frame, _header_frame, body, userdata=None): - print(1) + #print(1) print(" [x] %r" % body) def subscribe(self): @@ -160,7 +160,7 @@ class subscriber_routing(base_broker): routing_key=routing_key) def callback(self, chan, method_frame, _header_frame, body, userdata=None): - print(1) + #print(1) print(" [x] %r" % body) def subscribe(self): @@ -198,7 +198,7 @@ class subscriber_topic(base_broker): routing_key=routing_key) def callback(self, chan, method_frame, _header_frame, body, userdata=None): - print(1) + #print(1) print(" [x] %r" % body) def subscribe(self): diff --git a/vnpy/app/account_recorder/engine.py b/vnpy/app/account_recorder/engine.py index 9f660049..29771279 100644 --- a/vnpy/app/account_recorder/engine.py +++ b/vnpy/app/account_recorder/engine.py @@ -93,6 +93,8 @@ class AccountRecorder(BaseEngine): # 账号的同步记录 self.account_dict = {} # gateway_name: setting + self.is_7x24 = False # 7 x 24 运行的账号( 数字货币) + self.last_qry_dict = {} self.copy_history_orders = [] # 需要复制至历史成交的gateway名称 self.copy_history_trades = [] # 需要复制至历史成交的gateway名称 @@ -101,7 +103,7 @@ class AccountRecorder(BaseEngine): self.gw_name_acct_id = {} - self.is_remove_pre_data = False + self.cur_trading_date = "" self.scaning_gw = [] @@ -143,6 +145,7 @@ class AccountRecorder(BaseEngine): # 获取需要处理处理得账号配置 self.account_dict = d.get('accounts', {}) + self.is_7x24 = d.get('is_7x24', False) # 识别配置,检查账号是否需要复制委托/成交到历史表 for gateway_name, account_setting in self.account_dict.items(): @@ -288,7 +291,7 @@ class AccountRecorder(BaseEngine): :param trading_day: :return: """ - if self.is_remove_pre_data: + if self.cur_trading_date == trading_day: return # 移除非当日得交易/持仓 @@ -322,7 +325,7 @@ class AccountRecorder(BaseEngine): col_name=TODAY_STRATEGY_POS_COL, flt=flt) - self.is_remove_pre_data = True + self.cur_trading_date = trading_day def update_order(self, event: Event): """更新当日记录""" @@ -374,10 +377,16 @@ class AccountRecorder(BaseEngine): self.update_data(db_name=ACCOUNT_DB_NAME, col_name=HISTORY_ORDER_COL, fld=fld2, data=history_data) + def get_trading_date(self, dt:datetime): + if self.is_7x24: + return dt.strftime('%Y-%m-%d') + else: + return get_trading_date(dt) + def update_trade(self, event: Event): """更新当日成交""" trade = event.data - trade_date = get_trading_date(datetime.now()) + trade_date = self.get_trading_date(datetime.now()) fld = {'vt_symbol': trade.vt_symbol, 'account_id': trade.accountid, @@ -413,7 +422,7 @@ class AccountRecorder(BaseEngine): def update_position(self, event: Event): """更新当日持仓""" pos = event.data - trade_date = get_trading_date(datetime.now()) + trade_date = self.get_trading_date(datetime.now()) # 不处理交易所返回得套利合约 if pos.symbol.startswith('SP') and '&' in pos.symbol and ' ' in pos.symbol: @@ -532,7 +541,7 @@ class AccountRecorder(BaseEngine): d.update({'msg': data.msg}) d.update({'additional_info': data.additional_info}) d.update({'log_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}) - d.update({'trading_day': get_trading_date()}) + d.update({'trading_day': self.get_trading_date(datetime.now())}) account_id = self.gw_name_acct_id.get(data.gateway_name, None) if account_id: @@ -584,7 +593,7 @@ class AccountRecorder(BaseEngine): d.update({'msg': data.msg}) d.update({'additional_info': data.additional_info}) d.update({'log_time': datetime.now().strftime("%Y-%m-%d %H:%M:%S")}) - d.update({'trading_day': get_trading_date()}) + d.update({'trading_day': self.get_trading_date(datetime.now())}) account_id = self.gw_name_acct_id.get(data.gateway_name, None) if account_id: diff --git a/vnpy/app/cta_crypto/engine.py b/vnpy/app/cta_crypto/engine.py index ceb46b52..07ce4560 100644 --- a/vnpy/app/cta_crypto/engine.py +++ b/vnpy/app/cta_crypto/engine.py @@ -58,7 +58,8 @@ from vnpy.trader.utility import ( TRADER_DIR, get_folder_path, get_underlying_symbol, - append_data) + append_data, + import_module_by_str) from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.util_wechat import send_wx_msg @@ -1128,10 +1129,22 @@ class CtaEngine(BaseEngine): module_name = self.class_module_map[class_name] # 重新load class module - if not self.load_strategy_class_from_module(module_name): - err_msg = f'不能加载模块:{module_name}' - self.write_error(err_msg) - return False, err_msg + #if not self.load_strategy_class_from_module(module_name): + # err_msg = f'不能加载模块:{module_name}' + # self.write_error(err_msg) + # return False, err_msg + if module_name: + new_class_name = module_name + '.' + class_name + self.write_log(u'转换策略为全路径:{}'.format(new_class_name)) + + strategy_class = import_module_by_str(new_class_name) + if strategy_class is None: + err_msg = u'加载策略模块失败:{}'.format(class_name) + self.write_error(err_msg) + return False, err_msg + + self.write_log(f'重新加载模块成功,使用新模块:{new_class_name}') + self.classes[class_name] = strategy_class # 停止当前策略实例的运行,撤单 self.stop_strategy(strategy_name) diff --git a/vnpy/app/cta_stock/engine.py b/vnpy/app/cta_stock/engine.py index 9d33fa15..63af14db 100644 --- a/vnpy/app/cta_stock/engine.py +++ b/vnpy/app/cta_stock/engine.py @@ -59,7 +59,8 @@ from vnpy.trader.utility import ( TRADER_DIR, get_folder_path, get_underlying_symbol, - append_data) + append_data, + import_module_by_str) from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.util_wechat import send_wx_msg @@ -210,7 +211,7 @@ class CtaEngine(BaseEngine): all_trading = False dt = datetime.now() - + # 每分钟执行的逻辑 if self.last_minute != dt.minute: self.last_minute = dt.minute @@ -218,6 +219,7 @@ class CtaEngine(BaseEngine): # 主动获取所有策略得持仓信息 all_strategy_pos = self.get_all_strategy_pos() + # 每5分钟检查一次 if dt.minute % 5 == 0: # 比对仓位,使用上述获取得持仓信息,不用重复获取 self.compare_pos(strategy_pos_list=copy(all_strategy_pos)) @@ -1148,10 +1150,22 @@ class CtaEngine(BaseEngine): module_name = self.class_module_map[class_name] # 重新load class module - if not self.load_strategy_class_from_module(module_name): - err_msg = f'不能加载模块:{module_name}' - self.write_error(err_msg) - return False, err_msg + #if not self.load_strategy_class_from_module(module_name): + # err_msg = f'不能加载模块:{module_name}' + # self.write_error(err_msg) + # return False, err_msg + if module_name: + new_class_name = module_name + '.' + class_name + self.write_log(u'转换策略为全路径:{}'.format(new_class_name)) + + strategy_class = import_module_by_str(new_class_name) + if strategy_class is None: + err_msg = u'加载策略模块失败:{}'.format(class_name) + self.write_error(err_msg) + return False, err_msg + + self.write_log(f'重新加载模块成功,使用新模块:{new_class_name}') + self.classes[class_name] = strategy_class # 停止当前策略实例的运行,撤单 self.stop_strategy(strategy_name) diff --git a/vnpy/app/cta_stock/template.py b/vnpy/app/cta_stock/template.py index f91be1d4..a3514069 100644 --- a/vnpy/app/cta_stock/template.py +++ b/vnpy/app/cta_stock/template.py @@ -371,7 +371,7 @@ class StockPolicy(CtaPolicy): super().from_json(json_data) self.cur_trading_date = json_data.get('cur_trading_date', None) - self.sub_tns = json_data.get('sub_tns') + self.sub_tns = json_data.get('sub_tns',{}) signals = json_data.get('signals', {}) for kline_name, signal in signals: last_signal = signal.get('last_signal', "") @@ -872,7 +872,7 @@ class CtaStockTemplate(CtaTemplate): continue cur_price = self.cta_engine.get_price(lg.vt_symbol) - if not lg.stop_price and lg.stop_price > cur_price > 0: + if lg.stop_price != 0 and lg.stop_price > cur_price > 0: # 调用平仓模块 self.write_log(u'{} {}当前价:{} 触发止损线{},开仓价:{},v:{}'. format(self.cur_datetime, diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index 54c33e71..211ab1fe 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -55,7 +55,8 @@ from vnpy.trader.utility import ( TRADER_DIR, get_folder_path, get_underlying_symbol, - append_data) + append_data, + import_module_by_str) from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.util_wechat import send_wx_msg @@ -351,7 +352,7 @@ class CtaEngine(BaseEngine): self.put_strategy_event(strategy) if self.engine_config.get('trade_2_wx', False): - accountid = self.engine_config.get('accountid', '-') + accountid = self.engine_config.get('accountid', 'XXX') d = { 'account': accountid, 'strategy': strategy_name, @@ -362,7 +363,7 @@ class CtaEngine(BaseEngine): 'remark': f'{accountid}:{strategy_name}', 'timestamp': trade.time } - send_wx_msg(content=d, target=accountid) + send_wx_msg(content=d, target=accountid, msg_type='TRADE') def process_position_event(self, event: Event): """""" @@ -829,6 +830,9 @@ class CtaEngine(BaseEngine): return None + def get_contract(self, vt_symbol): + return self.main_engine.get_contract(vt_symbol) + def get_account(self, vt_accountid: str = ""): """ 查询账号的资金""" # 如果启动风控,则使用风控中的最大仓位 @@ -1115,10 +1119,22 @@ class CtaEngine(BaseEngine): module_name = self.class_module_map[class_name] # 重新load class module - if not self.load_strategy_class_from_module(module_name): - err_msg = f'不能加载模块:{module_name}' - self.write_error(err_msg) - return False, err_msg + #if not self.load_strategy_class_from_module(module_name): + # err_msg = f'不能加载模块:{module_name}' + # self.write_error(err_msg) + # return False, err_msg + if module_name: + new_class_name = module_name + '.' + class_name + self.write_log(u'转换策略为全路径:{}'.format(new_class_name)) + + strategy_class = import_module_by_str(new_class_name) + if strategy_class is None: + err_msg = u'加载策略模块失败:{}'.format(new_class_name) + self.write_error(err_msg) + return False, err_msg + + self.write_log(f'重新加载模块成功,使用新模块:{new_class_name}') + self.classes[class_name] = strategy_class # 停止当前策略实例的运行,撤单 self.stop_strategy(strategy_name) @@ -1839,7 +1855,7 @@ class CtaEngine(BaseEngine): print(f"{strategy_name}: {msg}" if strategy_name else msg, file=sys.stderr) if level in [logging.CRITICAL, logging.WARN, logging.WARNING]: - send_wx_msg(content=f"{strategy_name}: {msg}" if strategy_name else msg) + send_wx_msg(content=f"{strategy_name}: {msg}" if strategy_name else msg, target=self.engine_config.get('accountid', 'XXX')) def write_error(self, msg: str, strategy_name: str = '', level: int = logging.ERROR): """写入错误日志""" diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index 02516a94..b1c0e3c2 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -636,7 +636,6 @@ class CtaProTemplate(CtaTemplate): self.symbol_size = self.cta_engine.get_size(self.vt_symbol) self.margin_rate = self.cta_engine.get_margin_rate(self.vt_symbol) - def sync_data(self): """同步更新数据""" if not self.backtesting: @@ -1111,7 +1110,6 @@ class CtaProFutureTemplate(CtaProTemplate): self.trading = True self.put_event() - # ---------------------------------------------------------------------- def on_stop(self): """停止策略(必须由用户继承实现)""" self.active_orders.clear() @@ -1290,7 +1288,6 @@ class CtaProFutureTemplate(CtaProTemplate): order_vt_symbol = copy(old_order['vt_symbol']) order_volume = old_order['volume'] - old_order['traded'] - order_price = old_order['price'] order_type = old_order.get('order_type', OrderType.LIMIT) order_retry = old_order.get('retry', 0) @@ -1471,7 +1468,6 @@ class CtaProFutureTemplate(CtaProTemplate): self.active_orders.pop(order.vt_orderid, None) return - if order_retry > 20: msg = u'{} 平仓撤单 {}/{}手, 重试平仓次数{}>20' \ .format(self.strategy_name, order_vt_symbol, order_volume, order_retry) diff --git a/vnpy/data/tdx/tdx_common.py b/vnpy/data/tdx/tdx_common.py index f55a973b..aa5fae10 100644 --- a/vnpy/data/tdx/tdx_common.py +++ b/vnpy/data/tdx/tdx_common.py @@ -27,6 +27,8 @@ TDX_FUTURE_CONFIG = 'tdx_future_config.json' # } } TDX_STOCK_CONFIG = 'tdx_stock_config.pkb2' +TDX_PROXY_CONFIG = 'tdx_proxy_config.json' + @lru_cache() def get_tdx_market_code(code): @@ -94,7 +96,6 @@ def save_future_contracts(future_contracts_dict: dict): """保存期货合约信息""" save_cache_json(future_contracts_dict, 'future_contracts.json') - def get_cache_config(config_file_name): """获取本地缓存的配置地址信息""" config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), config_file_name)) diff --git a/vnpy/data/tdx/tdx_future_data.py b/vnpy/data/tdx/tdx_future_data.py index 9ceb37c5..824ee045 100644 --- a/vnpy/data/tdx/tdx_future_data.py +++ b/vnpy/data/tdx/tdx_future_data.py @@ -91,11 +91,13 @@ def get_tdx_marketid(symbol): class TdxFutureData(object): # ---------------------------------------------------------------------- - def __init__(self, strategy=None, best_ip={}): + def __init__(self, strategy=None, best_ip={}, proxy_ip="", proxy_port=0): """ 构造函数 :param strategy: 上层策略,主要用与使用write_log() """ + self.proxy_ip = proxy_ip + self.proxy_port = proxy_port self.api = None self.connection_status = False # 连接状态 self.best_ip = best_ip @@ -145,7 +147,14 @@ class TdxFutureData(object): if len(self.best_ip) == 0: self.best_ip = self.select_best_ip() - self.api.connect(self.best_ip['ip'], self.best_ip['port']) + # 如果配置proxy5,使用vnpy项目下的pytdx + if len(self.proxy_ip) > 0 and self.proxy_port > 0: + self.api.connect(ip=self.best_ip['ip'], port=self.best_ip['port'], + proxy_ip=self.proxy_ip, proxy_port=self.proxy_port) + else: + # 使用pip install pytdx + self.api.connect(ip=self.best_ip['ip'], port=self.best_ip['port']) + # 尝试获取市场合约统计 c = self.api.get_instrument_count() if c < 10: @@ -176,7 +185,7 @@ class TdxFutureData(object): apix = TdxExHq_API() __time1 = datetime.now() try: - with apix.connect(ip, port): + with apix.connect(ip=ip, port=port, proxy_ip=self.proxy_ip, proxy_port=self.proxy_port): if apix.get_instrument_count() > 10000: _timestamp = datetime.now() - __time1 self.write_log(f'服务器{ip}:{port},耗时:{_timestamp}') @@ -534,6 +543,8 @@ class TdxFutureData(object): # 查询的是指数合约 symbol = symbol.replace('99', 'L9') tdx_index_symbol = symbol + elif symbol.endswith('L9'): + tdx_index_symbol = symbol else: # 查询的是普通合约 tdx_index_symbol = get_underlying_symbol(symbol).upper() + 'L9' @@ -543,8 +554,8 @@ class TdxFutureData(object): q_size = QSIZE * 5 # 每秒 2个, 10小时 max_data_size = 1000000 - - self.write_log(u'开始下载{}当日分笔数据'.format(symbol)) + market_id = INIT_TDX_MARKET_MAP.get(tdx_index_symbol, 0) + self.write_log(u'开始下载{}=>{}, market_id={} 当日分笔数据'.format(symbol,tdx_index_symbol, market_id)) try: _datas = [] @@ -552,7 +563,7 @@ class TdxFutureData(object): while True: _res = self.api.get_transaction_data( - market=self.symbol_market_dict.get(tdx_index_symbol, 0), + market=market_id, code=symbol, start=_pos, count=q_size) @@ -667,6 +678,8 @@ class TdxFutureData(object): # 查询的是指数合约 symbol = symbol.replace('99', 'L9') tdx_index_symbol = symbol + elif symbol.endswith('L9'): + tdx_index_symbol = symbol else: # 查询的是普通合约 tdx_index_symbol = get_underlying_symbol(symbol).upper() + 'L9' diff --git a/vnpy/data/tdx/tdx_stock_data.py b/vnpy/data/tdx/tdx_stock_data.py index 711424ba..ad4348ca 100644 --- a/vnpy/data/tdx/tdx_stock_data.py +++ b/vnpy/data/tdx/tdx_stock_data.py @@ -120,7 +120,14 @@ class TdxStockData(object): self.config.update({'best_ip': self.best_ip}) save_cache_config(self.config, TDX_STOCK_CONFIG) - self.api.connect(self.best_ip.get('ip'), self.best_ip.get('port')) + # 如果配置proxy5,使用vnpy项目下的pytdx + if len(self.proxy_ip) > 0 and self.proxy_port > 0: + self.api.connect(ip=self.best_ip['ip'], port=self.best_ip['port'], + proxy_ip=self.proxy_ip, proxy_port=self.proxy_port) + else: + # 使用pip install pytdx + self.api.connect(ip=self.best_ip['ip'], port=self.best_ip['port']) + self.write_log(f'创建tdx连接, : {self.best_ip}') self.connection_status = True diff --git a/vnpy/data/tdx/test_tdx_future.py b/vnpy/data/tdx/test_tdx_future.py index 9a99b9f9..24afe61d 100644 --- a/vnpy/data/tdx/test_tdx_future.py +++ b/vnpy/data/tdx/test_tdx_future.py @@ -14,7 +14,8 @@ from vnpy.data.tdx.tdx_future_data import * t1 = FakeStrategy() t2 = FakeStrategy() # 创建API对象 -api_01 = TdxFutureData(t1) +#api_01 = TdxFutureData(strategy=t1, proxy_ip='localhost', proxy_port=1080) +api_01 = TdxFutureData(strategy=t1) # 获取所有市场信息 markets = api_01.get_markets() @@ -25,8 +26,8 @@ print(u'{}'.format(str_markets)) api_01.qry_instrument() # 获取某个合约得最新价 -price = api_01.get_price('rb2005') -print('price={}'.format(price)) +#price = api_01.get_price('rb2010') +#print('price={}'.format(price)) # 获取主力合约 @@ -63,6 +64,7 @@ corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2) # api_01.get_bars('IF99', period='1min', callback=t1.display_bar, bar_freq=1) # 获取bar,只返回 list[dict] +""" result, bars = api_01.get_bars('IF99', period='1min', return_bar=False) if result: print('前十根bar') @@ -71,15 +73,15 @@ if result: print('后十根bar') for bar in bars[-10:]: print(bar) - +""" # result,datas = api_01.get_transaction_data(symbol='ni1905') # api_02 = TdxFutureData(t2) # api_02.get_bars('IF99', period='1min', callback=t1.display_bar) # 获取当前交易日分时数据 -# ret,result = api_01.get_transaction_data('RB99') -# for r in result[0:10] + result[-10:]: -# print(r) +ret,result = api_01.get_transaction_data('NI99') +for r in result[0:10] + result[-10:]: + print(r) # 获取历史分时数据 # ret, result = api_01.get_history_transaction_data('RB99', '20190109') @@ -87,4 +89,4 @@ if result: # print(r) # 更新本地合约缓存信息 -api_01.update_mi_contracts() +#api_01.update_mi_contracts() diff --git a/vnpy/data/tdx/test_tdx_stock.py b/vnpy/data/tdx/test_tdx_stock.py index 3083a2c3..49c372c5 100644 --- a/vnpy/data/tdx/test_tdx_stock.py +++ b/vnpy/data/tdx/test_tdx_stock.py @@ -17,8 +17,11 @@ import json t1 = FakeStrategy() t2 = FakeStrategy() -# 创建API对象 -api_01 = TdxStockData(t1) + +# 创建API对象(使用本地socket5代理) +api_01 = TdxStockData(strategy=t1, proxy_ip='localhost', proxy_port=1080) +# 不使用代理 +#api_01 = TdxStockData(strategy=t1) # 获取市场下股票 for market_id in range(2): @@ -48,6 +51,6 @@ for market_id in range(2): # print(r) # 获取历史分时数据 -ret, result = api_01.get_history_transaction_data('600410', '20190925') +ret, result = api_01.get_history_transaction_data('110031', '20200504') for r in result[0:10] + result[-10:]: print(r) diff --git a/vnpy/gateway/rohon/libLinuxDataCollect.so b/vnpy/gateway/rohon/libLinuxDataCollect.so new file mode 100644 index 00000000..c4879ff8 Binary files /dev/null and b/vnpy/gateway/rohon/libLinuxDataCollect.so differ diff --git a/vnpy/gateway/rohon/libthostmduserapi_se.so b/vnpy/gateway/rohon/libthostmduserapi_se.so new file mode 100644 index 00000000..8c5f665d Binary files /dev/null and b/vnpy/gateway/rohon/libthostmduserapi_se.so differ diff --git a/vnpy/gateway/rohon/libthosttraderapi_se.so b/vnpy/gateway/rohon/libthosttraderapi_se.so new file mode 100644 index 00000000..68dee150 Binary files /dev/null and b/vnpy/gateway/rohon/libthosttraderapi_se.so differ diff --git a/vnpy/gateway/rohon/rohon_gateway.py b/vnpy/gateway/rohon/rohon_gateway.py index c41744eb..cf87503a 100644 --- a/vnpy/gateway/rohon/rohon_gateway.py +++ b/vnpy/gateway/rohon/rohon_gateway.py @@ -1,7 +1,10 @@ """ """ - -from datetime import datetime +import sys +import json +import traceback +from datetime import datetime, timedelta +from copy import copy,deepcopy from .vnctpmd import MdApi from .vnctptd import TdApi @@ -58,9 +61,31 @@ from vnpy.trader.object import ( CancelRequest, SubscribeRequest, ) -from vnpy.trader.utility import get_folder_path +from vnpy.trader.utility import ( + extract_vt_symbol, + get_folder_path, + get_trading_date, + get_underlying_symbol, + round_to, + BarGenerator +) from vnpy.trader.event import EVENT_TIMER +# 增加通达信指数接口行情 +from time import sleep +from threading import Thread +from pytdx.exhq import TdxExHq_API +from vnpy.amqp.consumer import subscriber +from vnpy.data.tdx.tdx_common import ( + TDX_FUTURE_HOSTS, + get_future_contracts, + save_future_contracts, + get_cache_json, + save_cache_json, + TDX_FUTURE_CONFIG) +from vnpy.component.base import ( + MARKET_DAY_ONLY, NIGHT_MARKET_23, NIGHT_MARKET_SQ2 +) STATUS_ROHON2VT = { THOST_FTDC_OAS_Submitted: Status.SUBMITTING, @@ -99,7 +124,9 @@ EXCHANGE_ROHON2VT = { "SHFE": Exchange.SHFE, "CZCE": Exchange.CZCE, "DCE": Exchange.DCE, - "INE": Exchange.INE + "INE": Exchange.INE, + "SPD": Exchange.SPD + } PRODUCT_ROHON2VT = { @@ -113,10 +140,14 @@ OPTIONTYPE_ROHON2VT = { THOST_FTDC_CP_PutOptions: OptionType.PUT } +MAX_FLOAT = sys.float_info.max symbol_exchange_map = {} symbol_name_map = {} symbol_size_map = {} +index_contracts = {} +# tdx 期货配置本地缓存 +future_contracts = get_future_contracts() class RohonGateway(BaseGateway): @@ -134,15 +165,30 @@ class RohonGateway(BaseGateway): "授权编码": "", "产品信息": "" } - + # 注 + # 如果采用rabbit_mq拓展tdx指数行情,default_setting中,需要增加: + # "rabbit": + # { + # "host": "192.168.1.211", + # "exchange": "x_fanout_idx_tick" + # } exchanges = list(EXCHANGE_ROHON2VT.values()) - def __init__(self, event_engine): + def __init__(self, event_engine, gateway_name="ROHON"): """Constructor""" - super().__init__(event_engine, "ROHON") + super().__init__(event_engine, gateway_name) - self.td_api = RohonTdApi(self) - self.md_api = RohonMdApi(self) + self.td_api = None + self.md_api = None + self.tdx_api = None + self.rabbit_api = None + + self.subscribed_symbols = set() # 已订阅合约代码 + + self.combiner_conf_dict = {} # 保存合成器配置 + # 自定义价差/加比的tick合成器 + self.combiners = {} + self.tick_combiner_map = {} def connect(self, setting: dict): """""" @@ -154,20 +200,156 @@ class RohonGateway(BaseGateway): appid = setting["产品名称"] auth_code = setting["授权编码"] product_info = setting["产品信息"] + rabbit_dict = setting.get('rabbit', None) if not td_address.startswith("tcp://"): td_address = "tcp://" + td_address if not md_address.startswith("tcp://"): md_address = "tcp://" + md_address + # 获取自定义价差/价比合约的配置 + try: + from vnpy.trader.engine import CustomContract + c = CustomContract() + self.combiner_conf_dict = c.get_config() + if len(self.combiner_conf_dict) > 0: + self.write_log(u'加载的自定义价差/价比配置:{}'.format(self.combiner_conf_dict)) + except Exception as ex: # noqa + pass + + if not self.td_api: + self.td_api = RohonTdApi(self) self.td_api.connect(td_address, userid, password, brokerid, auth_code, appid, product_info) + if not self.md_api: + self.md_api = RohonMdApi(self) self.md_api.connect(md_address, userid, password, brokerid) + if rabbit_dict: + self.rabbit_api = SubMdApi(gateway=self) + self.rabbit_api.connect(rabbit_dict) + else: + self.tdx_api = TdxMdApi(gateway=self) + self.tdx_api.connect() + self.init_query() + for (vt_symbol, is_bar) in self.subscribed_symbols: + symbol, exchange = extract_vt_symbol(vt_symbol) + req = SubscribeRequest( + symbol=symbol, + exchange=exchange, + is_bar=is_bar + ) + # 指数合约,从tdx行情订阅 + if req.symbol[-2:] in ['99']: + req.symbol = req.symbol.upper() + if self.tdx_api is not None: + self.write_log(u'有指数订阅,连接通达信行情服务器') + self.tdx_api.connect() + self.tdx_api.subscribe(req) + elif self.rabbit_api is not None: + self.rabbit_api.subscribe(req) + else: + self.md_api.subscribe(req) + + def check_status(self): + """检查状态""" + + if self.td_api.connect_status and self.md_api.connect_status: + self.status.update({'con': True}) + + if self.tdx_api: + self.tdx_api.check_status() + if self.tdx_api is None or self.md_api is None: + return False + + if not self.td_api.connect_status or self.md_api.connect_status: + return False + + return True + def subscribe(self, req: SubscribeRequest): """""" - self.md_api.subscribe(req) + try: + if self.md_api: + # 如果是自定义的套利合约符号 + if req.symbol in self.combiner_conf_dict: + self.write_log(u'订阅自定义套利合约:{}'.format(req.symbol)) + # 创建合成器 + if req.symbol not in self.combiners: + setting = self.combiner_conf_dict.get(req.symbol) + setting.update({"symbol": req.symbol}) + combiner = TickCombiner(self, setting) + # 更新合成器 + self.write_log(u'添加{}与合成器映射'.format(req.symbol)) + self.combiners.update({setting.get('symbol'): combiner}) + + # 增加映射( leg1 对应的合成器列表映射) + leg1_symbol = setting.get('leg1_symbol') + combiner_list = self.tick_combiner_map.get(leg1_symbol, []) + if combiner not in combiner_list: + self.write_log(u'添加Leg1:{}与合成器得映射'.format(leg1_symbol)) + combiner_list.append(combiner) + self.tick_combiner_map.update({leg1_symbol: combiner_list}) + + # 增加映射( leg2 对应的合成器列表映射) + leg2_symbol = setting.get('leg2_symbol') + combiner_list = self.tick_combiner_map.get(leg2_symbol, []) + if combiner not in combiner_list: + self.write_log(u'添加Leg2:{}与合成器得映射'.format(leg2_symbol)) + combiner_list.append(combiner) + self.tick_combiner_map.update({leg2_symbol: combiner_list}) + + self.write_log(u'订阅leg1:{}'.format(leg1_symbol)) + leg1_req = SubscribeRequest( + symbol=leg1_symbol, + exchange=symbol_exchange_map.get(leg1_symbol, Exchange.LOCAL) + ) + self.subscribe(leg1_req) + + self.write_log(u'订阅leg2:{}'.format(leg2_symbol)) + leg2_req = SubscribeRequest( + symbol=leg2_symbol, + exchange=symbol_exchange_map.get(leg1_symbol, Exchange.LOCAL) + ) + self.subscribe(leg2_req) + + self.subscribed_symbols.add((req.vt_symbol, req.is_bar)) + else: + self.write_log(u'{}合成器已经在存在'.format(req.symbol)) + return + elif req.exchange == Exchange.SPD: + self.write_error(u'自定义合约{}不在CTP设置中'.format(req.symbol)) + + # 指数合约,从tdx行情订阅 + if req.symbol[-2:] in ['99']: + req.symbol = req.symbol.upper() + if self.tdx_api: + self.tdx_api.subscribe(req) + elif self.rabbit_api: + self.rabbit_api.subscribe(req) + else: + self.md_api.subscribe(req) + + # Allow the strategies to start before the connection + self.subscribed_symbols.add((req.vt_symbol, req.is_bar)) + if req.is_bar: + self.subscribe_bar(req) + + except Exception as ex: + self.write_error(u'订阅合约异常:{},{}'.format(str(ex), traceback.format_exc())) + + def subscribe_bar(self, req: SubscribeRequest): + """订阅1分钟行情""" + + vt_symbol = req.vt_symbol + if vt_symbol in self.klines: + return + + # 创建1分钟bar产生器 + self.write_log(u'创建:{}的一分钟行情产生器'.format(vt_symbol)) + bg = BarGenerator(on_bar=self.on_bar) + self.klines.update({vt_symbol: bg}) def send_order(self, req: OrderRequest): """""" @@ -176,6 +358,7 @@ class RohonGateway(BaseGateway): def cancel_order(self, req: CancelRequest): """""" self.td_api.cancel_order(req) + return True def query_account(self): """""" @@ -187,15 +370,29 @@ class RohonGateway(BaseGateway): def close(self): """""" - self.td_api.close() - self.md_api.close() + if self.md_api: + self.write_log('断开行情API') + tmp1 = self.md_api + self.md_api = None + tmp1.close() - def write_error(self, msg: str, error: dict): - """""" - error_id = error["ErrorID"] - error_msg = error["ErrorMsg"] - msg = f"{msg},代码:{error_id},信息:{error_msg}" - self.write_log(msg) + if self.td_api: + self.write_log('断开交易API') + tmp2 = self.td_api + self.td_api = None + tmp2.close() + + if self.tdx_api: + self.write_log(u'断开tdx指数行情API') + tmp3 = self.tdx_api + self.tdx_api = None + tmp3.close() + + if self.rabbit_api: + self.write_log(u'断开rabbit MQ tdx指数行情API') + tmp4 = self.rabbit_api + self.rabbit_api = None + tmp4.close() def process_timer_event(self, event): """""" @@ -214,6 +411,13 @@ class RohonGateway(BaseGateway): self.query_functions = [self.query_account, self.query_position] self.event_engine.register(EVENT_TIMER, self.process_timer_event) + def on_custom_tick(self, tick): + """推送自定义合约行情""" + # 自定义合约行情 + + for combiner in self.tick_combiner_map.get(tick.symbol, []): + tick = copy(tick) + combiner.on_tick(tick) class RohonMdApi(MdApi): """""" @@ -241,6 +445,7 @@ class RohonMdApi(MdApi): """ self.gateway.write_log("行情服务器连接成功") self.login() + self.gateway.status.update({'md_con': True, 'md_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onFrontDisconnected(self, reason: int): """ @@ -248,6 +453,7 @@ class RohonMdApi(MdApi): """ self.login_status = False self.gateway.write_log(f"行情服务器连接断开,原因{reason}") + self.gateway.status.update({'md_con': False, 'md_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool): """ @@ -283,30 +489,65 @@ class RohonMdApi(MdApi): exchange = symbol_exchange_map.get(symbol, "") if not exchange: return + # 取当前时间 + dt = datetime.now() + s_date = dt.strftime('%Y-%m-%d') + timestamp = f"{s_date} {data['UpdateTime']}.{int(data['UpdateMillisec'] / 100)}" + dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S.%f") - timestamp = f"{data['ActionDay']} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}" + # 不处理开盘前的tick数据 + if dt.hour in [8, 20] and dt.minute < 59: + return + if exchange is Exchange.CFFEX and dt.hour == 9 and dt.minute < 14: + return tick = TickData( symbol=symbol, exchange=exchange, - datetime=datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f"), + datetime=dt, + date=s_date, + time=dt.strftime('%H:%M:%S.%f'), + trading_day=get_trading_date(dt), name=symbol_name_map[symbol], volume=data["Volume"], open_interest=data["OpenInterest"], last_price=data["LastPrice"], limit_up=data["UpperLimitPrice"], limit_down=data["LowerLimitPrice"], - open_price=data["OpenPrice"], - high_price=data["HighestPrice"], - low_price=data["LowestPrice"], - pre_close=data["PreClosePrice"], - bid_price_1=data["BidPrice1"], - ask_price_1=data["AskPrice1"], + open_price=adjust_price(data["OpenPrice"]), + high_price=adjust_price(data["HighestPrice"]), + low_price=adjust_price(data["LowestPrice"]), + pre_close=adjust_price(data["PreClosePrice"]), + bid_price_1=adjust_price(data["BidPrice1"]), + ask_price_1=adjust_price(data["AskPrice1"]), bid_volume_1=data["BidVolume1"], ask_volume_1=data["AskVolume1"], gateway_name=self.gateway_name ) + + if data["BidVolume2"] or data["AskVolume2"]: + tick.bid_price_2 = adjust_price(data["BidPrice2"]) + tick.bid_price_3 = adjust_price(data["BidPrice3"]) + tick.bid_price_4 = adjust_price(data["BidPrice4"]) + tick.bid_price_5 = adjust_price(data["BidPrice5"]) + + tick.ask_price_2 = adjust_price(data["AskPrice2"]) + tick.ask_price_3 = adjust_price(data["AskPrice3"]) + tick.ask_price_4 = adjust_price(data["AskPrice4"]) + tick.ask_price_5 = adjust_price(data["AskPrice5"]) + + tick.bid_volume_2 = adjust_price(data["BidVolume2"]) + tick.bid_volume_3 = adjust_price(data["BidVolume3"]) + tick.bid_volume_4 = adjust_price(data["BidVolume4"]) + tick.bid_volume_5 = adjust_price(data["BidVolume5"]) + + tick.ask_volume_2 = adjust_price(data["AskVolume2"]) + tick.ask_volume_3 = adjust_price(data["AskVolume3"]) + tick.ask_volume_4 = adjust_price(data["AskVolume4"]) + tick.ask_volume_5 = adjust_price(data["AskVolume5"]) + self.gateway.on_tick(tick) + self.gateway.on_custom_tick(tick) def connect(self, address: str, userid: str, password: str, brokerid: int): """ @@ -347,6 +588,7 @@ class RohonMdApi(MdApi): Subscribe to tick data update. """ if self.login_status: + self.gateway.write_log(f'订阅:{req.exchange} {req.symbol}') self.subscribeMarketData(req.symbol) self.subscribed.add(req.symbol) @@ -390,6 +632,9 @@ class RohonTdApi(TdApi): self.trade_data = [] self.positions = {} self.sysid_orderid_map = {} + self.future_contract_changed = False + + self.accountid = self.userid def onFrontConnected(self): """""" @@ -399,11 +644,13 @@ class RohonTdApi(TdApi): self.authenticate() else: self.login() + self.gateway.status.update({'td_con': True, 'td_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onFrontDisconnected(self, reason: int): """""" self.login_status = False self.gateway.write_log(f"交易服务器连接断开,原因{reason}") + self.gateway.status.update({'td_con': True, 'td_dis_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) def onRspAuthenticate(self, data: dict, error: dict, reqid: int, last: bool): """""" @@ -442,10 +689,22 @@ class RohonTdApi(TdApi): symbol = data["InstrumentID"] exchange = symbol_exchange_map[symbol] + order_type = OrderType.LIMIT + if data["OrderPriceType"] == THOST_FTDC_OPT_LimitPrice and data["TimeCondition"] == THOST_FTDC_TC_IOC: + if data["VolumeCondition"] == THOST_FTDC_VC_AV: + order_type = OrderType.FAK + elif data["VolumeCondition"] == THOST_FTDC_VC_CV: + order_type = OrderType.FOK + + if data["OrderPriceType"] == THOST_FTDC_OPT_AnyPrice: + order_type = OrderType.MARKET + order = OrderData( symbol=symbol, exchange=exchange, + accountid=self.accountid, orderid=orderid, + type=order_type, direction=DIRECTION_ROHON2VT[data["Direction"]], offset=OFFSET_ROHON2VT.get(data["CombOffsetFlag"], Offset.NONE), price=data["LimitPrice"], @@ -471,19 +730,29 @@ class RohonTdApi(TdApi): """ self.gateway.write_log("结算信息确认成功") - self.reqid += 1 - self.reqQryInstrument({}, self.reqid) + while True: + self.reqid += 1 + n = self.reqQryInstrument({}, self.reqid) + + if not n: + break + else: + sleep(1) def onRspQryInvestorPosition(self, data: dict, error: dict, reqid: int, last: bool): """""" if not data: return + if data.get("InstrumentID") not in symbol_exchange_map: + return + # Get buffered position object key = f"{data['InstrumentID'], data['PosiDirection']}" position = self.positions.get(key, None) if not position: position = PositionData( + accountid=self.accountid, symbol=data["InstrumentID"], exchange=symbol_exchange_map[data["InstrumentID"]], direction=DIRECTION_ROHON2VT[data["PosiDirection"]], @@ -492,7 +761,7 @@ class RohonTdApi(TdApi): self.positions[key] = position # For SHFE position data update - if position.exchange == Exchange.SHFE: + if position.exchange in [Exchange.SHFE, Exchange.INE]: if data["YdPosition"] and not data["TodayPosition"]: position.yd_volume = data["Position"] # For other exchange position data update @@ -530,14 +799,30 @@ class RohonTdApi(TdApi): """""" if "AccountID" not in data: return + if len(self.accountid)== 0: + self.accountid = data['AccountID'] account = AccountData( accountid=data["AccountID"], - balance=data["Balance"], - frozen=data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"], + pre_balance=round(float(data['PreBalance']), 7), + balance=round(float(data["Balance"]), 7), + frozen=round(data["FrozenMargin"] + data["FrozenCash"] + data["FrozenCommission"], 7), gateway_name=self.gateway_name ) - account.available = data["Available"] + account.available = round(float(data["Available"]), 7) + account.commission = round(float(data['Commission']), 7) + account.margin = round(float(data['CurrMargin']), 7) + account.close_profit = round(float(data['CloseProfit']), 7) + account.holding_profit = round(float(data['PositionProfit']), 7) + account.trading_day = str(data['TradingDay']) + if '-' not in account.trading_day and len(account.trading_day) == 8: + account.trading_day = '-'.join( + [ + account.trading_day[0:4], + account.trading_day[4:6], + account.trading_day[6:8] + ] + ) self.gateway.on_account(account) @@ -556,12 +841,23 @@ class RohonTdApi(TdApi): pricetick=data["PriceTick"], gateway_name=self.gateway_name ) + # 保证金费率 + contract.margin_rate = max(data.get('LongMarginRatio', 0), data.get('ShortMarginRatio', 0)) + if contract.margin_rate == 0: + contract.margin_rate = 0.1 # For option only if contract.product == Product.OPTION: + # Remove C/P suffix of CZCE option product name + if contract.exchange == Exchange.CZCE: + contract.option_portfolio = data["ProductID"][:-1] + else: + contract.option_portfolio = data["ProductID"] + contract.option_underlying = data["UnderlyingInstrID"], contract.option_type = OPTIONTYPE_ROHON2VT.get(data["OptionsType"], None), contract.option_strike = data["StrikePrice"], + contract.option_index = str(data["StrikePrice"]) contract.option_expiry = datetime.strptime(data["ExpireDate"], "%Y%m%d"), self.gateway.on_contract(contract) @@ -570,9 +866,42 @@ class RohonTdApi(TdApi): symbol_name_map[contract.symbol] = contract.name symbol_size_map[contract.symbol] = contract.size + if contract.product == Product.FUTURES: + # 生成指数合约信息 + underlying_symbol = data["ProductID"] # 短合约名称 + underlying_symbol = underlying_symbol.upper() + # 只推送普通合约的指数 + if len(underlying_symbol) <= 2: + idx_contract = index_contracts.get(underlying_symbol, None) + if idx_contract is None: + idx_contract = deepcopy(contract) + idx_contract.symbol = '{}99'.format(underlying_symbol) + idx_contract.name = u'{}指数'.format(underlying_symbol) + idx_contract.vt_symbol = f'{idx_contract.symbol}.{idx_contract.exchange.value}' + self.gateway.on_contract(idx_contract) + + # 获取data/tdx/future_contracts.json中的合约记录 + future_contract = future_contracts.get(underlying_symbol, {}) + mi_contract_symbol = future_contract.get('mi_symbol', '') + margin_rate = float(future_contract.get('margin_rate', 0)) + mi_margin_rate = round(idx_contract.margin_rate, 4) + if mi_contract_symbol == contract.symbol: + if margin_rate != mi_margin_rate: + self.gateway.write_log( + f"{underlying_symbol}合约主力{mi_contract_symbol} 保证金{margin_rate}=>{mi_margin_rate}") + future_contract.update({'margin_rate': mi_margin_rate}) + future_contract.update({'symbol_size': idx_contract.size}) + future_contract.update({'price_tick': idx_contract.pricetick}) + future_contracts.update({underlying_symbol: future_contract}) + self.future_contract_changed = True + index_contracts.update({underlying_symbol: idx_contract}) if last: self.gateway.write_log("合约信息查询成功") + if self.future_contract_changed: + self.gateway.write_log('更新vnpy/data/tdx/future_contracts.json') + save_future_contracts(future_contracts) + for data in self.order_data: self.onRtnOrder(data) self.order_data.clear() @@ -596,11 +925,23 @@ class RohonTdApi(TdApi): order_ref = data["OrderRef"] orderid = f"{frontid}_{sessionid}_{order_ref}" + order_type = OrderType.LIMIT + if data["OrderPriceType"] == THOST_FTDC_OPT_LimitPrice and data["TimeCondition"] == THOST_FTDC_TC_IOC: + if data["VolumeCondition"] == THOST_FTDC_VC_AV: + order_type = OrderType.FAK + elif data["VolumeCondition"] == THOST_FTDC_VC_CV: + order_type = OrderType.FOK + + if data["OrderPriceType"] == THOST_FTDC_OPT_AnyPrice: + order_type = OrderType.MARKET + order = OrderData( + accountid=self.accountid, symbol=symbol, exchange=exchange, orderid=orderid, - type=ORDERTYPE_ROHON2VT[data["OrderPriceType"]], + sys_orderid=data.get('OrderSysID', orderid), + type=order_type, direction=DIRECTION_ROHON2VT[data["Direction"]], offset=OFFSET_ROHON2VT[data["CombOffsetFlag"]], price=data["LimitPrice"], @@ -626,16 +967,25 @@ class RohonTdApi(TdApi): orderid = self.sysid_orderid_map[data["OrderSysID"]] + trade_date = data['TradeDate'] + if '-' not in trade_date and len(trade_date) == 8: + trade_date = trade_date[0:4] + '-' + trade_date[4:6] + '-' + trade_date[6:8] + trade_time = data['TradeTime'] + trade_datetime = datetime.strptime(f'{trade_date} {trade_time}', '%Y-%m-%d %H:%M:%S') + tradeid = data["TradeID"] trade = TradeData( + accountid=self.accountid, symbol=symbol, exchange=exchange, orderid=orderid, - tradeid=data["TradeID"], + sys_orderid=data.get("OrderSysID", orderid), + tradeid=tradeid.replace(' ',''), direction=DIRECTION_ROHON2VT[data["Direction"]], offset=OFFSET_ROHON2VT[data["OffsetFlag"]], price=data["Price"], volume=data["Volume"], time=data["TradeTime"], + datetime=trade_datetime, gateway_name=self.gateway_name ) self.gateway.on_trade(trade) @@ -704,7 +1054,7 @@ class RohonTdApi(TdApi): "BrokerID": self.brokerid, "AppID": self.appid } - + self.accountid = copy(self.userid) if self.product_info: req["UserProductInfo"] = self.product_info @@ -715,6 +1065,10 @@ class RohonTdApi(TdApi): """ Send new order. """ + if req.offset not in OFFSET_VT2ROHON: + self.gateway.write_log("请选择开平方向") + return "" + self.order_ref += 1 rohon_req = { @@ -752,6 +1106,8 @@ class RohonTdApi(TdApi): orderid = f"{self.frontid}_{self.sessionid}_{self.order_ref}" order = req.create_order_data(orderid, self.gateway_name) + order.accountid = self.accountid + order.vt_accountid = f"{self.gateway_name}.{self.accountid}" self.gateway.on_order(order) return order.vt_orderid @@ -802,3 +1158,763 @@ class RohonTdApi(TdApi): """""" if self.connect_status: self.exit() + + +def adjust_price(price: float) -> float: + """""" + if price == MAX_FLOAT: + price = 0 + return price + + +class TdxMdApi(): + """ + 通达信数据行情API实现 + 订阅的指数行情,更新合约的数据 + + """ + + def __init__(self, gateway): + self.gateway = gateway # gateway对象 + self.gateway_name = gateway.gateway_name # gateway对象名称 + + self.req_interval = 0.5 # 操作请求间隔500毫秒 + self.req_id = 0 # 操作请求编号 + self.connection_status = False # 连接状态 + + self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 + self.symbol_market_dict = {} # tdx合约与tdx市场的字典 + self.symbol_vn_dict = {} # tdx合约与vt_symbol的对应 + self.symbol_tick_dict = {} # tdx合约与最后一个Tick得字典 + + self.registered_symbol_set = set() + + self.thread = None # 查询线程 + + self.ip_list = TDX_FUTURE_HOSTS + + # 调出 + self.best_ip = {} # 最佳IP地址和端口 + self.api = None # API 的连接会话对象 + self.last_tick_dt = datetime.now() # 记录该会话对象的最后一个tick时间 + + self.instrument_count = 50000 + + self.has_qry_instrument = False + + # ---------------------------------------------------------------------- + def ping(self, ip, port=7709): + """ + ping行情服务器 + :param ip: + :param port: + :param type_: + :return: + """ + apix = TdxExHq_API() + __time1 = datetime.now() + try: + with apix.connect(ip, port): + if apix.get_instrument_count() > 10000: + _timestamp = (datetime.now() - __time1).total_seconds() * 1000 + self.gateway.write_log('服务器{}:{},耗时:{}ms'.format(ip, port, _timestamp)) + return _timestamp + else: + self.gateway.write_log(u'该服务器IP {}无响应.'.format(ip)) + return timedelta(seconds=10).total_seconds() * 1000 + except Exception as ex: + self.gateway.write_log(u'tdx ping服务器{},异常的响应{}'.format(ip, str(ex))) + return timedelta(seconds=10).total_seconds() * 1000 + + def sort_ip_speed(self): + """ + 对所有服务器进行速度排序 + :return: + """ + + speed_result = [] + for x in self.ip_list: + speed = self.ping(x['ip'], x['port']) + x.update({'speed': speed}) + speed_result.append(copy(x)) + + # 更新服务器,按照速度排序 + speed_result = sorted(speed_result, key=lambda s: s['speed']) + self.gateway.write_log(u'服务器访问速度排序:{}'.format(speed_result)) + return speed_result + + # ---------------------------------------------------------------------- + def select_best_ip(self, exclude_ip: str = None): + """ + 选择行情服务器 + :param: exclude_ip, 排除的ip地址 + :return: + """ + self.gateway.write_log(u'选择通达信行情服务器') + + ip_list = self.sort_ip_speed() + + valid_ip_list = [x for x in ip_list if x.get('speed', 10000) < 10000 and x.get('ip') != exclude_ip] + + if len(valid_ip_list) == 0: + self.gateway.write_error(u'未能找到合适速度得行情服务器') + return None + best_future_ip = valid_ip_list[0] + save_cache_json(best_future_ip, TDX_FUTURE_CONFIG) + return best_future_ip + + def connect(self, is_reconnect=False): + """ + 连接通达讯行情服务器 + :param is_reconnect:是否重连 + :return: + """ + # 创建api连接对象实例 + try: + if self.api is None or not self.connection_status: + self.gateway.write_log(u'开始连接通达信行情服务器') + self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True) + + # 选取最佳服务器 + if is_reconnect or len(self.best_ip) == 0: + self.best_ip = get_cache_json(TDX_FUTURE_CONFIG) + + if len(self.best_ip) == 0: + self.best_ip = self.select_best_ip() + + self.api.connect(self.best_ip['ip'], self.best_ip['port']) + # 尝试获取市场合约统计 + c = self.api.get_instrument_count() + if c < 10: + err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'], self.best_ip['port']) + self.gateway.write_error(err_msg) + else: + self.gateway.write_log(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port'])) + self.connection_status = True + self.gateway.status.update( + {'tdx_con': True, 'tdx_con_time': datetime.now().strftime('%Y-%m-%d %H:%M%S')}) + self.thread = Thread(target=self.run) + self.thread.start() + + except Exception as ex: + self.gateway.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc())) + return + + def close(self): + """退出API""" + self.gateway.write_log(u'退出tdx API') + self.connection_status = False + + if self.thread: + self.thread.join() + + # ---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅合约""" + # 这里的设计是,如果尚未登录就调用了订阅方法 + # 则先保存订阅请求,登录完成后会自动订阅 + vn_symbol = str(subscribeReq.symbol) + vn_symbol = vn_symbol.upper() + self.gateway.write_log(u'通达信行情订阅 {}'.format(str(vn_symbol))) + + if vn_symbol[-2:] != '99': + self.gateway.write_log(u'{}不是指数合约,不能订阅'.format(vn_symbol)) + return + + tdx_symbol = vn_symbol[0:-2] + 'L9' + tdx_symbol = tdx_symbol.upper() + self.gateway.write_log(u'{}=>{}'.format(vn_symbol, tdx_symbol)) + self.symbol_vn_dict[tdx_symbol] = vn_symbol + + if tdx_symbol not in self.registered_symbol_set: + self.registered_symbol_set.add(tdx_symbol) + + self.check_status() + + def check_status(self): + # self.write_log(u'检查tdx接口状态') + if len(self.registered_symbol_set) == 0: + return + + # 若还没有启动连接,就启动连接 + over_time = (datetime.now() - self.last_tick_dt).total_seconds() > 60 + if not self.connection_status or self.api is None or over_time: + self.gateway.write_log(u'tdx还没有启动连接,就启动连接') + self.close() + self.thread = None + self.connect(is_reconnect=True) + + def qry_instrument(self): + """ + 查询/更新合约信息 + :return: + """ + if not self.connection_status: + self.gateway.write_error(u'tdx连接状态为断开,不能查询和更新合约信息') + return + + if self.has_qry_instrument: + self.gateway.write_error(u'已经查询过一次合约信息,不再查询') + return + + # 取得所有的合约信息 + num = self.api.get_instrument_count() + if not isinstance(num, int): + return + + all_contacts = sum( + [self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], []) + # [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}] + + # 对所有合约处理,更新字典 指数合约-tdx市场,指数合约-交易所 + for tdx_contract in all_contacts: + tdx_symbol = tdx_contract.get('code', None) + if tdx_symbol is None or tdx_symbol[-2:] not in ['L9']: + continue + tdx_market_id = tdx_contract.get('market') + self.symbol_market_dict[tdx_symbol] = tdx_market_id + if tdx_market_id == 47: # 中金所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.CFFEX + elif tdx_market_id == 28: # 郑商所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.CZCE + elif tdx_market_id == 29: # 大商所 + self.symbol_exchange_dict[tdx_symbol] = Exchange.DCE + elif tdx_market_id == 30: # 上期所+能源 + self.symbol_exchange_dict[tdx_symbol] = Exchange.SHFE + elif tdx_market_id == 60: # 主力合约 + self.gateway.write_log(u'主力合约:{}'.format(tdx_contract)) + self.has_qry_instrument = True + + def run(self): + # 直接查询板块 + try: + last_dt = datetime.now() + self.gateway.write_log(u'开始运行tdx查询指数行情线程,{}'.format(last_dt)) + while self.connection_status: + if len(self.registered_symbol_set) > 0: + try: + self.process_index_req() + except BrokenPipeError as bex: + self.gateway.write_error(u'BrokenPipeError{},重试重连tdx[{}]'.format(str(bex), 0)) + self.connect(is_reconnect=True) + sleep(5) + break + except Exception as ex: + self.gateway.write_error(u'tdx exception:{},{}'.format(str(ex), traceback.format_exc())) + self.gateway.write_error(u'重试重连tdx') + self.connect(is_reconnect=True) + + sleep(self.req_interval) + dt = datetime.now() + if last_dt.minute != dt.minute: + self.gateway.write_log( + 'tdx check point. {}, process symbols:{}'.format(dt, self.registered_symbol_set)) + last_dt = dt + except Exception as ex: + self.gateway.write_error(u'tdx thead.run exception:{},{}'.format(str(ex), traceback.format_exc())) + + self.gateway.write_error(u'tdx查询线程 {}退出'.format(datetime.now())) + + def process_index_req(self): + """处理板块获取指数行情tick""" + + # 获取通达信指数板块所有行情 + rt_list = self.api.get_instrument_quote_list(42, 3, 0, 100) + + if rt_list is None or len(rt_list) == 0: + self.gateway.write_log(u'tdx: rt_list为空') + return + + # 记录该接口的行情最后更新时间 + self.last_tick_dt = datetime.now() + + for d in list(rt_list): + tdx_symbol = d.get('code', None) + if tdx_symbol not in self.registered_symbol_set and tdx_symbol is not None: + continue + # tdx_symbol => vn_symbol + vn_symbol = self.symbol_vn_dict.get(tdx_symbol, None) + if vn_symbol is None: + self.gateway.write_error(u'self.symbol_vn_dict 取不到映射得:{}'.format(tdx_symbol)) + continue + # vn_symbol => exchange + exchange = self.symbol_exchange_dict.get(tdx_symbol, None) + underlying_symbol = get_underlying_symbol(vn_symbol) + + if exchange is None: + symbol_info = future_contracts.get(underlying_symbol, None) + if not symbol_info: + continue + exchange_value = symbol_info.get('exchange', None) + exchange = Exchange(exchange_value) + if exchange is None: + continue + self.symbol_exchange_dict.update({tdx_symbol: exchange}) + + tick_datetime = datetime.now() + # 修正毫秒 + last_tick = self.symbol_tick_dict.get(vn_symbol, None) + if (last_tick is not None) and tick_datetime.replace(microsecond=0) == last_tick.datetime: + # 与上一个tick的时间(去除毫秒后)相同,修改为500毫秒 + tick_datetime = tick_datetime.replace(microsecond=500) + else: + tick_datetime = tick_datetime.replace(microsecond=0) + + tick = TickData(gateway_name=self.gateway_name, + symbol=vn_symbol, + exchange=exchange, + datetime=tick_datetime) + + tick.pre_close = float(d.get('ZuoJie', 0.0)) + tick.high_price = float(d.get('ZuiGao', 0.0)) + tick.open_price = float(d.get('JinKai', 0.0)) + tick.low_price = float(d.get('ZuiDi', 0.0)) + tick.last_price = float(d.get('MaiChu', 0.0)) + tick.volume = int(d.get('XianLiang', 0)) + tick.open_interest = d.get('ChiCangLiang') + + tick.time = tick.datetime.strftime('%H:%M:%S.%f')[0:12] + tick.date = tick.datetime.strftime('%Y-%m-%d') + + tick.trading_day = get_trading_date(tick.datetime) + + # 指数没有涨停和跌停,就用昨日收盘价正负10% + tick.limit_up = tick.pre_close * 1.1 + tick.limit_down = tick.pre_close * 0.9 + + # CTP只有一档行情 + tick.bid_price_1 = float(d.get('MaiRuJia', 0.0)) + tick.bid_volume_1 = int(d.get('MaiRuLiang', 0)) + tick.ask_price_1 = float(d.get('MaiChuJia', 0.0)) + tick.ask_volume_1 = int(d.get('MaiChuLiang', 0)) + + # 排除非交易时间得tick + if tick.exchange is Exchange.CFFEX: + if tick.datetime.hour not in [9, 10, 11, 13, 14, 15]: + continue + if tick.datetime.hour == 9 and tick.datetime.minute < 15: + continue + # 排除早盘 11:30~12:00 + if tick.datetime.hour == 11 and tick.datetime.minute >= 30: + continue + if tick.datetime.hour == 15 and tick.datetime.minute >= 15 and underlying_symbol in ['T', 'TF', 'TS']: + continue + if tick.datetime.hour == 15 and underlying_symbol in ['IH', 'IF', 'IC']: + continue + else: # 大商所/郑商所,上期所,上海能源 + # 排除非开盘小时 + if tick.datetime.hour in [3, 4, 5, 6, 7, 8, 12, 15, 16, 17, 18, 19, 20]: + continue + # 排除早盘 10:15~10:30 + if tick.datetime.hour == 10 and 15 <= tick.datetime.minute < 30: + continue + # 排除早盘 11:30~12:00 + if tick.datetime.hour == 11 and tick.datetime.minute >= 30: + continue + # 排除午盘 13:00 ~13:30 + if tick.datetime.hour == 13 and tick.datetime.minute < 30: + continue + # 排除凌晨2:30~3:00 + if tick.datetime.hour == 2 and tick.datetime.minute >= 30: + continue + + # 排除大商所/郑商所夜盘数据上期所夜盘数据 23:00 收盘 + if underlying_symbol in NIGHT_MARKET_23: + if tick.datetime.hour in [23, 0, 1, 2]: + continue + # 排除上期所夜盘数据 1:00 收盘 + if underlying_symbol in NIGHT_MARKET_SQ2: + if tick.datetime.hour in [1, 2]: + continue + + # 排除日盘合约在夜盘得数据 + if underlying_symbol in MARKET_DAY_ONLY and (tick.datetime.hour < 9 or tick.datetime.hour > 16): + # self.write_log(u'排除日盘合约{}在夜盘得数据'.format(short_symbol)) + continue + + # self.gateway.write_log(f'{tick.__dict__}') + self.symbol_tick_dict[tick.symbol] = tick + + self.gateway.on_tick(tick) + self.gateway.on_custom_tick(tick) + + +class SubMdApi(): + """ + RabbitMQ Subscriber 数据行情接收API + """ + + def __init__(self, gateway): + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.symbol_tick_dict = {} # 合约与最后一个Tick得字典 + self.registed_symbol_set = set() # 订阅的合约记录集 + + self.sub = None + self.setting = {} + self.connect_status = False + self.thread = None + + def connect(self, setting={}): + """连接""" + self.setting = setting + try: + self.sub = subscriber( + host=self.setting.get('host', 'localhost'), + port=self.setting.get('port', 5672), + user=self.setting.get('user', 'admin'), + password=self.setting.get('password', 'admin'), + exchange=self.setting.get('exchange', 'x_fanout_idx_tick')) + + self.sub.set_callback(self.on_message) + self.thread = Thread(target=self.sub.start) + self.thread.start() + self.connect_status = True + self.gateway.status.update({'sub_con': True, 'sub_con_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) + except Exception as ex: + self.gateway.write_error(u'连接RabbitMQ {} 异常:{}'.format(self.setting, str(ex))) + self.gateway.write_error(traceback.format_exc()) + self.connect_status = False + + def on_message(self, chan, method_frame, _header_frame, body, userdata=None): + # print(" [x] %r" % body) + try: + str_tick = body.decode('utf-8') + d = json.loads(str_tick) + d.pop('rawData', None) + + d = self.conver_update(d) + + symbol = d.pop('symbol', None) + str_datetime = d.pop('datetime', None) + if symbol not in self.registed_symbol_set or str_datetime is None: + return + if '.' in str_datetime: + dt = datetime.strptime(str_datetime, '%Y-%m-%d %H:%M:%S.%f') + else: + dt = datetime.strptime(str_datetime, '%Y-%m-%d %H:%M:%S') + + tick = TickData(gateway_name=self.gateway_name, + exchange=Exchange(d.get('exchange')), + symbol=d.get('symbol'), + datetime=dt) + d.pop('exchange', None) + d.pop('symbol', None) + tick.__dict__.update(d) + + self.symbol_tick_dict[symbol] = tick + self.gateway.on_tick(tick) + self.gateway.on_custom_tick(tick) + + except Exception as ex: + self.gateway.write_error(u'RabbitMQ on_message 异常:{}'.format(str(ex))) + self.gateway.write_error(traceback.format_exc()) + + def conver_update(self, d): + """转换dict, vnpy1 tick dict => vnpy2 tick dict""" + if 'vtSymbol' not in d: + return d + symbol= d.get('symbol') + exchange = d.get('exchange') + vtSymbol = d.pop('vtSymbol', symbol) + if '.' not in symbol: + d.update({'vt_symbol': f'{symbol}.{exchange}'}) + else: + d.update({'vt_symbol': f'{symbol}.{Exchange.LOCAL.value}'}) + + # 成交数据 + d.update({'last_price': d.pop('lastPrice',0.0)}) # 最新成交价 + d.update({'last_volume': d.pop('lastVolume', 0)}) # 最新成交量 + + d.update({'open_interest': d.pop('openInterest', 0)}) # 昨持仓量 + + d.update({'open_interest': d.pop('tradingDay', get_trading_date())}) + + + # 常规行情 + d.update({'open_price': d.pop('openPrice', 0)}) # 今日开盘价 + d.update({'high_price': d.pop('highPrice', 0)}) # 今日最高价 + d.update({'low_price': d.pop('lowPrice', 0)}) # 今日最低价 + d.update({'pre_close': d.pop('preClosePrice', 0)}) # 昨收盘价 + d.update({'limit_up': d.pop('upperLimit', 0)}) # 涨停价 + d.update({'limit_down': d.pop('lowerLimit', 0)}) # 跌停价 + + # 五档行情 + d.update({'bid_price_1': d.pop('bidPrice1', 0.0)}) + d.update({'bid_price_2': d.pop('bidPrice2', 0.0)}) + d.update({'bid_price_3': d.pop('bidPrice3', 0.0)}) + d.update({'bid_price_4': d.pop('bidPrice4', 0.0)}) + d.update({'bid_price_5': d.pop('bidPrice5', 0.0)}) + + d.update({'ask_price_1': d.pop('askPrice1', 0.0)}) + d.update({'ask_price_2': d.pop('askPrice2', 0.0)}) + d.update({'ask_price_3': d.pop('askPrice3', 0.0)}) + d.update({'ask_price_4': d.pop('askPrice4', 0.0)}) + d.update({'ask_price_5': d.pop('askPrice5', 0.0)}) + + d.update({'bid_volume_1': d.pop('bidVolume1', 0.0)}) + d.update({'bid_volume_2': d.pop('bidVolume2', 0.0)}) + d.update({'bid_volume_3': d.pop('bidVolume3', 0.0)}) + d.update({'bid_volume_4': d.pop('bidVolume4', 0.0)}) + d.update({'bid_volume_5': d.pop('bidVolume5', 0.0)}) + + d.update({'ask_volume_1': d.pop('askVolume1', 0.0)}) + d.update({'ask_volume_2': d.pop('askVolume2', 0.0)}) + d.update({'ask_volume_3': d.pop('askVolume3', 0.0)}) + d.update({'ask_volume_4': d.pop('askVolume4', 0.0)}) + d.update({'ask_volume_5': d.pop('askVolume5', 0.0)}) + + return d + + def close(self): + """退出API""" + self.gateway.write_log(u'退出rabbit行情订阅API') + self.connection_status = False + + try: + if self.sub: + self.gateway.write_log(u'关闭订阅器') + self.sub.close() + + if self.thread is not None: + self.gateway.write_log(u'关闭订阅器接收线程') + self.thread.join() + except Exception as ex: + self.gateway.write_error(u'退出rabbitMQ行情api异常:{}'.format(str(ex))) + + # ---------------------------------------------------------------------- + def subscribe(self, subscribeReq): + """订阅合约""" + # 这里的设计是,如果尚未登录就调用了订阅方法 + # 则先保存订阅请求,登录完成后会自动订阅 + vn_symbol = str(subscribeReq.symbol) + vn_symbol = vn_symbol.upper() + + if vn_symbol not in self.registed_symbol_set: + self.registed_symbol_set.add(vn_symbol) + self.gateway.write_log(u'RabbitMQ行情订阅 {}'.format(str(vn_symbol))) + + +class TickCombiner(object): + """ + Tick合成类 + """ + + def __init__(self, gateway, setting): + self.gateway = gateway + self.gateway_name = self.gateway.gateway_name + self.gateway.write_log(u'创建tick合成类:{}'.format(setting)) + + self.symbol = setting.get('symbol', None) + self.leg1_symbol = setting.get('leg1_symbol', None) + self.leg2_symbol = setting.get('leg2_symbol', None) + self.leg1_ratio = setting.get('leg1_ratio', 1) # 腿1的数量配比 + self.leg2_ratio = setting.get('leg2_ratio', 1) # 腿2的数量配比 + self.price_tick = setting.get('price_tick', 1) # 合成价差加比后的最小跳动 + # 价差 + self.is_spread = setting.get('is_spread', False) + # 价比 + self.is_ratio = setting.get('is_ratio', False) + + self.last_leg1_tick = None + self.last_leg2_tick = None + + # 价差日内最高/最低价 + self.spread_high = None + self.spread_low = None + + # 价比日内最高/最低价 + self.ratio_high = None + self.ratio_low = None + + # 当前交易日 + self.trading_day = None + + if self.is_ratio and self.is_spread: + self.gateway.write_error(u'{}参数有误,不能同时做价差/加比.setting:{}'.format(self.symbol, setting)) + return + + self.gateway.write_log(u'初始化{}合成器成功'.format(self.symbol)) + if self.is_spread: + self.gateway.write_log( + u'leg1:{} * {} - leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, + self.leg2_ratio)) + if self.is_ratio: + self.gateway.write_log( + u'leg1:{} * {} / leg2:{} * {}'.format(self.leg1_symbol, self.leg1_ratio, self.leg2_symbol, + self.leg2_ratio)) + + def on_tick(self, tick): + """OnTick处理""" + combinable = False + + if tick.symbol == self.leg1_symbol: + # leg1合约 + self.last_leg1_tick = tick + if self.last_leg2_tick is not None: + if self.last_leg1_tick.datetime.replace(microsecond=0) == self.last_leg2_tick.datetime.replace( + microsecond=0): + combinable = True + + elif tick.symbol == self.leg2_symbol: + # leg2合约 + self.last_leg2_tick = tick + if self.last_leg1_tick is not None: + if self.last_leg2_tick.datetime.replace(microsecond=0) == self.last_leg1_tick.datetime.replace( + microsecond=0): + combinable = True + + # 不能合并 + if not combinable: + return + + if not self.is_ratio and not self.is_spread: + return + + # 以下情况,基本为单腿涨跌停,不合成价差/价格比 Tick + if (self.last_leg1_tick.ask_price_1 == 0 or self.last_leg1_tick.bid_price_1 == self.last_leg1_tick.limit_up) \ + and self.last_leg1_tick.ask_volume_1 == 0: + self.gateway.write_log( + u'leg1:{0}涨停{1},不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.bid_price_1)) + return + if (self.last_leg1_tick.bid_price_1 == 0 or self.last_leg1_tick.ask_price_1 == self.last_leg1_tick.limit_down) \ + and self.last_leg1_tick.bid_volume_1 == 0: + self.gateway.write_log( + u'leg1:{0}跌停{1},不合成价差Tick'.format(self.last_leg1_tick.vt_symbol, self.last_leg1_tick.ask_price_1)) + return + if (self.last_leg2_tick.ask_price_1 == 0 or self.last_leg2_tick.bid_price_1 == self.last_leg2_tick.limit_up) \ + and self.last_leg2_tick.ask_volume_1 == 0: + self.gateway.write_log( + u'leg2:{0}涨停{1},不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.bid_price_1)) + return + if (self.last_leg2_tick.bid_price_1 == 0 or self.last_leg2_tick.ask_price_1 == self.last_leg2_tick.limit_down) \ + and self.last_leg2_tick.bid_volume_1 == 0: + self.gateway.write_log( + u'leg2:{0}跌停{1},不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1)) + return + + if self.trading_day != tick.trading_day: + self.trading_day = tick.trading_day + self.spread_high = None + self.spread_low = None + self.ratio_high = None + self.ratio_low = None + + if self.is_spread: + spread_tick = TickData(gateway_name=self.gateway_name, + symbol=self.symbol, + exchange=Exchange.SPD, + datetime=tick.datetime) + + spread_tick.trading_day = tick.trading_day + spread_tick.date = tick.date + spread_tick.time = tick.time + + # 叫卖价差=leg1.ask_price_1 * 配比 - leg2.bid_price_1 * 配比,volume为两者最小 + spread_tick.ask_price_1 = round_to(target=self.price_tick, + value=self.last_leg1_tick.ask_price_1 * self.leg1_ratio - self.last_leg2_tick.bid_price_1 * self.leg2_ratio) + spread_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) + + # 叫买价差=leg1.bid_price_1 * 配比 - leg2.ask_price_1 * 配比,volume为两者最小 + spread_tick.bid_price_1 = round_to(target=self.price_tick, + value=self.last_leg1_tick.bid_price_1 * self.leg1_ratio - self.last_leg2_tick.ask_price_1 * self.leg2_ratio) + spread_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) + + # 最新价 + spread_tick.last_price = round_to(target=self.price_tick, + value=(spread_tick.ask_price_1 + spread_tick.bid_price_1) / 2) + # 昨收盘价 + if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: + spread_tick.pre_close = round_to(target=self.price_tick, + value=self.last_leg1_tick.pre_close * self.leg1_ratio - self.last_leg2_tick.pre_close * self.leg2_ratio) + # 开盘价 + if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: + spread_tick.open_price = round_to(target=self.price_tick, + value=self.last_leg1_tick.open_price * self.leg1_ratio - self.last_leg2_tick.open_price * self.leg2_ratio) + # 最高价 + if self.spread_high: + self.spread_high = max(self.spread_high, spread_tick.ask_price_1) + else: + self.spread_high = spread_tick.ask_price_1 + spread_tick.high_price = self.spread_high + + # 最低价 + if self.spread_low: + self.spread_low = min(self.spread_low, spread_tick.bid_price_1) + else: + self.spread_low = spread_tick.bid_price_1 + + spread_tick.low_price = self.spread_low + + self.gateway.on_tick(spread_tick) + + if self.is_ratio: + ratio_tick = TickData( + gateway_name=self.gateway_name, + symbol=self.symbol, + exchange=Exchange.SPD, + datetime=tick.datetime + ) + + ratio_tick.trading_day = tick.trading_day + ratio_tick.date = tick.date + ratio_tick.time = tick.time + + # 比率tick + ratio_tick.ask_price_1 = 100 * self.last_leg1_tick.ask_price_1 * self.leg1_ratio \ + / (self.last_leg2_tick.bid_price_1 * self.leg2_ratio) # noqa + ratio_tick.ask_price_1 = round_to( + target=self.price_tick, + value=ratio_tick.ask_price_1 + ) + + ratio_tick.ask_volume_1 = min(self.last_leg1_tick.ask_volume_1, self.last_leg2_tick.bid_volume_1) + ratio_tick.bid_price_1 = 100 * self.last_leg1_tick.bid_price_1 * self.leg1_ratio \ + / (self.last_leg2_tick.ask_price_1 * self.leg2_ratio) # noqa + ratio_tick.bid_price_1 = round_to( + target=self.price_tick, + value=ratio_tick.bid_price_1 + ) + + ratio_tick.bid_volume_1 = min(self.last_leg1_tick.bid_volume_1, self.last_leg2_tick.ask_volume_1) + ratio_tick.last_price = (ratio_tick.ask_price_1 + ratio_tick.bid_price_1) / 2 + ratio_tick.last_price = round_to( + target=self.price_tick, + value=ratio_tick.last_price + ) + + # 昨收盘价 + if self.last_leg2_tick.pre_close > 0 and self.last_leg1_tick.pre_close > 0: + ratio_tick.pre_close = 100 * self.last_leg1_tick.pre_close * self.leg1_ratio / ( + self.last_leg2_tick.pre_close * self.leg2_ratio) # noqa + ratio_tick.pre_close = round_to( + target=self.price_tick, + value=ratio_tick.pre_close + ) + + # 开盘价 + if self.last_leg2_tick.open_price > 0 and self.last_leg1_tick.open_price > 0: + ratio_tick.open_price = 100 * self.last_leg1_tick.open_price * self.leg1_ratio / ( + self.last_leg2_tick.open_price * self.leg2_ratio) # noqa + ratio_tick.open_price = round_to( + target=self.price_tick, + value=ratio_tick.open_price + ) + + # 最高价 + if self.ratio_high: + self.ratio_high = max(self.ratio_high, ratio_tick.ask_price_1) + else: + self.ratio_high = ratio_tick.ask_price_1 + ratio_tick.high_price = self.spread_high + + # 最低价 + if self.ratio_low: + self.ratio_low = min(self.ratio_low, ratio_tick.bid_price_1) + else: + self.ratio_low = ratio_tick.bid_price_1 + + ratio_tick.low_price = self.spread_low + + self.gateway.on_tick(ratio_tick) diff --git a/vnpy/gateway/xtp/xtp_gateway.py b/vnpy/gateway/xtp/xtp_gateway.py index 6f8ebf52..777b93c1 100644 --- a/vnpy/gateway/xtp/xtp_gateway.py +++ b/vnpy/gateway/xtp/xtp_gateway.py @@ -1,5 +1,6 @@ from typing import Any, Dict, List from datetime import datetime +from functools import lru_cache from vnpy.api.xtp import MdApi, TdApi from vnpy.event import EventEngine @@ -127,6 +128,9 @@ symbol_name_map: Dict[str, str] = {} # 代码 <=> 交易所 symbol_exchange_map: Dict[str, Exchange] = {} +@lru_cache() +def get_vt_symbol_name(vt_symbol): + return symbol_name_map.get(vt_symbol, vt_symbol.split('.')[0]) class XtpGateway(BaseGateway): @@ -238,6 +242,7 @@ class XtpMdApi(MdApi): self.connect_status: bool = False self.login_status: bool = False + def onDisconnected(self, reason: int) -> None: """""" self.connect_status = False @@ -298,7 +303,7 @@ class XtpMdApi(MdApi): tick.bid_volume_1, tick.bid_volume_2, tick.bid_volume_3, tick.bid_volume_4, tick.bid_volume_5 = data["bid_qty"][0:5] tick.ask_volume_1, tick.ask_volume_2, tick.ask_volume_3, tick.ask_volume_4, tick.ask_volume_5 = data["ask_qty"][0:5] - tick.name = symbol_name_map.get(tick.vt_symbol, tick.symbol) + tick.name = get_vt_symbol_name(tick.vt_symbol) self.gateway.prices.update({tick.vt_symbol: tick.last_price}) self.gateway.on_tick(tick) @@ -540,6 +545,7 @@ class XtpTdApi(TdApi): insert_time = str(data["insert_time"]) dt = datetime.strptime(insert_time, '%Y%m%d%H%M%S%f') order = OrderData( + accountid=self.userid, symbol=symbol, exchange=MARKET_XTP2VT[data["market"]], orderid=str(data["order_xtp_id"]), @@ -571,6 +577,7 @@ class XtpTdApi(TdApi): dt = datetime.strptime(trade_time,'%Y%m%d%H%M%S%f') trade = TradeData( + accountid=self.userid, symbol=symbol, exchange=MARKET_XTP2VT[data["market"]], orderid=str(data["order_xtp_id"]), @@ -615,19 +622,23 @@ class XtpTdApi(TdApi): if data["market"] == 0: return - + vt_symbol = '{}.{}'.format(data["ticker"], MARKET_XTP2VT[data["market"]].value) position = PositionData( + accountid=self.userid, symbol=data["ticker"], exchange=MARKET_XTP2VT[data["market"]], + name=data["ticker_name"], direction=POSITION_DIRECTION_XTP2VT[data["position_direction"]], volume=data["total_qty"], frozen=data["locked_position"], price=data["avg_price"], pnl=data["unrealized_pnl"], yd_volume=data["yesterday_position"], - gateway_name=self.gateway_name + gateway_name=self.gateway_name, + cur_price=self.gateway.prices.get(vt_symbol,0) ) - vt_symbol = position.vt_symbol + if position.volume > 0 and position.cur_price > 0: + position.pnl = round(position.volume * (position.cur_price - position.price),2) self.gateway.on_position(position) # 如果持仓>0 获取持仓对应的当前最新价 @@ -684,7 +695,8 @@ class XtpTdApi(TdApi): balance=balance, # 总资产 margin=self.security_asset, # 证券资产 frozen=data["withholding_amount"], - gateway_name=self.gateway_name + gateway_name=self.gateway_name, + trading_day=datetime.now().strftime('%Y-%m-%d') ) # AccountData缺省的available 计算方法有误,这里直接取可用资金 account.available = cash_asset @@ -745,10 +757,12 @@ class XtpTdApi(TdApi): position = self.short_positions.get(symbol, None) if not position: position = PositionData( + accountid=self.userid, symbol=symbol, exchange=exchange, direction=Direction.SHORT, - gateway_name=self.gateway_name + gateway_name=self.gateway_name, + cur_price=self.gateway.prices.get(f'{symbol}.{exchange.value}',0.0) ) self.short_positions[symbol] = position @@ -855,6 +869,10 @@ class XtpTdApi(TdApi): orderid = self.insertOrder(xtp_req, self.session_id) order = req.create_order_data(str(orderid), self.gateway_name) + order.accountid = self.userid + if order.datetime is None: + order.datetime = datetime.now() + order.time = order.datetime.strftime('%H:%M:%S.%f') self.gateway.on_order(order) return order.vt_orderid diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index cd47a3ad..a94016da 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -219,6 +219,7 @@ class PositionData(BaseData): exchange: Exchange direction: Direction accountid: str = "" # 账号id + name: str = "" volume: float = 0 frozen: float = 0 price: float = 0 @@ -234,6 +235,8 @@ class PositionData(BaseData): self.vt_symbol = f"{self.symbol}.{self.exchange.value}" self.vt_positionid = f"{self.gateway_name}.{self.vt_symbol}.{self.direction.value}" self.vt_accountid = f"{self.gateway_name}.{self.accountid}" + if self.name == "": + self.name = self.vt_symbol @dataclass class AccountData(BaseData): diff --git a/vnpy/trader/ui/widget.py b/vnpy/trader/ui/widget.py index 982bb7cf..646a737e 100644 --- a/vnpy/trader/ui/widget.py +++ b/vnpy/trader/ui/widget.py @@ -460,6 +460,7 @@ class PositionMonitor(BaseMonitor): sorting = True headers = { + "name": {"display": "名称", "cell": BaseCell, "update": False}, "symbol": {"display": "代码", "cell": BaseCell, "update": False}, "exchange": {"display": "交易所", "cell": EnumCell, "update": False}, "direction": {"display": "方向", "cell": DirectionCell, "update": False},