diff --git a/vnpy/data/tdx/tdx_future_data.py b/vnpy/data/tdx/tdx_future_data.py index b59c4b37..dc31a428 100644 --- a/vnpy/data/tdx/tdx_future_data.py +++ b/vnpy/data/tdx/tdx_future_data.py @@ -90,6 +90,7 @@ def get_tdx_marketid(symbol): class TdxFutureData(object): exclude_ips = [] + # ---------------------------------------------------------------------- def __init__(self, strategy=None, best_ip={}, proxy_ip="", proxy_port=0): """ @@ -108,7 +109,6 @@ class TdxFutureData(object): # 所有期货合约的本地缓存 self.future_contracts = get_future_contracts() - def write_log(self, content): if self.strategy: self.strategy.write_log(content) @@ -152,7 +152,7 @@ class TdxFutureData(object): if (datetime.now() - last_datetime).total_seconds() > 60 * 60 * 2: self.best_ip = {} self.exclude_ips = [] - except Exception as ex: # noqa + except Exception as ex: # noqa self.best_ip = {} else: self.best_ip = {} @@ -186,7 +186,7 @@ class TdxFutureData(object): # self.qryInstrument() except Exception as ex: ip = self.best_ip.get('ip', '') - self.write_log(u'连接服务器tdx:{} 异常:{},{}'.format(ip,str(ex), traceback.format_exc())) + self.write_log(u'连接服务器tdx:{} 异常:{},{}'.format(ip, str(ex), traceback.format_exc())) if ip not in self.exclude_ips: self.write_log(f'添加{ip}到异常列表中') self.exclude_ips.append(ip) @@ -550,9 +550,11 @@ class TdxFutureData(object): for contract in contracts: # 排除指数合约 code = contract.get('code') + if code[0:2] in ['IF', 'IC']: + a = 1 if code[-2:] in ['L9', 'L8', 'L0', 'L1', 'L2', 'L3', '50'] or \ (exchange == Exchange.CFFEX and code[-3:] in ['300', '500']): - #self.write_log(f'过滤:{exchange.value}:{code}') + # self.write_log(f'过滤:{exchange.value}:{code}') continue short_symbol = get_underlying_symbol(code).upper() contract_list = short_contract_dict.get(short_symbol, []) @@ -567,9 +569,18 @@ class TdxFutureData(object): # 缓存的期货合约配置 cache_info = self.future_contracts.get(k, {}) - # 缓存的所有当前合约清单 - cache_symbols = cache_info.get('symbols', []) - new_symbols = sorted([c.get('code') for c in v]) + + # 缓存的所有当前合约字典 + cache_symbols = cache_info.get('symbols', {}) + + if isinstance(cache_symbols, list): + self.write_log(f'移除旧版symbols列表') + cache_symbols = {} + + # 获取 {合约:总量} + new_symbols = {c.get('code'): c.get('ZongLiang') for c in v} + # 更新字典 + cache_symbols.update(new_symbols) # 检查交易所是否一致 cache_exchange = cache_info.get('exchange', '') @@ -577,45 +588,41 @@ class TdxFutureData(object): if not (cache_exchange == 'INE' and exchange == Exchange.SHFE): continue - # 判断前置条件1:缓存的清单数量, - if len(cache_symbols) > 0: - if len(new_symbols) < len(cache_symbols) * 0.8: - self.write_error(f'查询的期货合约{new_symbols} 总数小于 缓存 {cache_symbols} 的80%数量,不做处理') - continue + # 获取短合约 + underlying_symbol = cache_info.get('underlying_symbol', None) + if not underlying_symbol: + self.write_error(f'不能获取短合约') + cache_info.update({'underlying_symbol': k}) + underlying_symbol = k + + # 当前月份的合约 + last_full_symbol = '{}{}'.format(underlying_symbol.upper(), datetime.now().strftime('%y%m')) + # 排除旧的合约 + cache_symbols = {k: v for k, v in cache_symbols.items() if + k >= last_full_symbol and len(k) == len(last_full_symbol)} # 判断前置条件2: cache_mi_symbol = cache_info.get('full_symbol') - # 之前的主力合约不在当前所有合约清单中 - if cache_mi_symbol and cache_mi_symbol not in new_symbols: - # 之前的主力合约,必须小于所有的合约 - if not all([cache_mi_symbol 0: cache_info.update({'symbols': new_symbols}) + cache_info.update({'open_interesting': new_oi}) + self.future_contracts.update({k: cache_info}) # 更新 - mi_contracts.append(select_data) - + mi_contracts.append({'code': new_mi_symbol, 'ZongLiang': new_oi, "market": Vn_Tdx_Exchange_Map[exchange]}) return mi_contracts @@ -649,7 +656,7 @@ class TdxFutureData(object): # 每秒 2个, 10小时 max_data_size = 1000000 market_id = INIT_TDX_MARKET_MAP.get(tdx_index_symbol, 0) - self.write_log(u'开始下载{}=>{}, market_id={} 当日分笔数据'.format(symbol,tdx_index_symbol, market_id)) + self.write_log(u'开始下载{}=>{}, market_id={} 当日分笔数据'.format(symbol, tdx_index_symbol, market_id)) try: _datas = [] @@ -882,12 +889,14 @@ class TdxFutureData(object): vn_exchange = Tdx_Vn_Exchange_Map.get(str(tdx_market_id)) mi_symbol = get_real_symbol_by_exchange(full_symbol, vn_exchange) + # if underlying_symbol == 'IC': + # debug = 1 # 更新登记 短合约:真实主力合约 self.write_log( '{},{},{},{},{}'.format(tdx_market_id, full_symbol, underlying_symbol, mi_symbol, vn_exchange)) if underlying_symbol in self.future_contracts: info = self.future_contracts.get(underlying_symbol) - if mi_symbol > info.get('mi_symbol', ''): + if mi_symbol != info.get('mi_symbol', ''): self.write_log(u'主力合约变化:{} =>{}'.format(info.get('mi_symbol'), mi_symbol)) info.update({'mi_symbol': mi_symbol, 'full_symbol': full_symbol}) self.future_contracts.update({underlying_symbol: info})