diff --git a/vnpy/data/tdx/tdx_common.py b/vnpy/data/tdx/tdx_common.py index 6d45f434..432dbe1c 100644 --- a/vnpy/data/tdx/tdx_common.py +++ b/vnpy/data/tdx/tdx_common.py @@ -6,6 +6,7 @@ import pickle import bz2 from functools import lru_cache from logging import INFO, ERROR +from vnpy.trader.utility import load_json, save_json @lru_cache() @@ -65,6 +66,15 @@ TDX_FUTURE_HOSTS = [ {'ip': '58.246.109.27', 'port': 7721, "name": "备用服务器3"}] +def get_future_contracts(): + """获取期货合约信息""" + return get_cache_json('future_contracts.json') + +def save_future_contracts(future_contracts_dict: dict): + """保存期货合约信息""" + save_cache_json(future_contracts_dict, 'future_contracts.json') + + def get_cache_config(config_file_name): """获取本地缓存的配置地址信息""" config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), config_file_name)) @@ -74,9 +84,9 @@ def get_cache_config(config_file_name): with bz2.BZ2File(config_file_name, 'rb') as f: config = pickle.load(f) return config - return config -def save_cache_config(data: dict, config_file_name ): + +def save_cache_config(data: dict, config_file_name): """保存本地缓存的配置地址信息""" config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), config_file_name)) @@ -84,13 +94,27 @@ def save_cache_config(data: dict, config_file_name ): pickle.dump(data, f) +def get_cache_json(json_file_name: str): + """获取本地缓存的json配置信息""" + config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), json_file_name)) + return load_json(config_file_name) + + +def save_cache_json(data_dict: dict, json_file_name: str): + """保存本地缓存的JSON配置信息""" + config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), json_file_name)) + save_json(filename=config_file_name, data=data_dict) + + class FakeStrategy(object): """制作一个假得策略,用于测试""" + def write_log(self, content, level=INFO): if level == INFO: print(content) else: print(content, file=sys.stderr) + def write_error(self, content): self.write_log(content, level=ERROR) diff --git a/vnpy/data/tdx/tdx_future_data.py b/vnpy/data/tdx/tdx_future_data.py index 428fb16b..1aa83b07 100644 --- a/vnpy/data/tdx/tdx_future_data.py +++ b/vnpy/data/tdx/tdx_future_data.py @@ -25,9 +25,19 @@ from pytdx.exhq import TdxExHq_API from vnpy.trader.constant import Exchange from vnpy.trader.object import BarData -from vnpy.trader.utility import (get_underlying_symbol, get_full_symbol, get_trading_date, load_json, save_json) -from vnpy.data.tdx.tdx_common import (lru_cache, TDX_FUTURE_HOSTS, PERIOD_MAPPING) - +from vnpy.trader.utility import ( + get_underlying_symbol, + get_full_symbol, + get_trading_date, + get_real_symbol_by_exchange) +from vnpy.data.tdx.tdx_common import ( + lru_cache, + TDX_FUTURE_HOSTS, + PERIOD_MAPPING, + get_future_contracts, + save_future_contracts, + get_cache_json, + save_cache_json) # 每个周期包含多少分钟 (估算值, 没考虑夜盘和10:15的影响) NUM_MINUTE_MAPPING: Dict[str, int] = {} @@ -66,17 +76,7 @@ QSIZE = 500 ALL_MARKET_BEGIN_HOUR = 8 ALL_MARKET_END_HOUR = 16 - -def get_cache_ip(): - """获取本地缓存的最快IP地址信息""" - config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tdx_config.json')) - return load_json(config_file_name) - - -def save_cache_ip(best_ip: dict): - """保存本地缓存的最快IP地址信息""" - config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tdx_config.json')) - save_json(filename=config_file_name, data=best_ip) +TDX_FUTURE_CONFIG = 'tdx_future_config.json' @lru_cache() def get_tdx_marketid(symbol): @@ -103,6 +103,7 @@ class TdxFutureData(object): self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典 self.strategy = strategy + self.future_contracts = get_future_contracts() def write_log(self, content): if self.strategy: @@ -130,7 +131,7 @@ class TdxFutureData(object): # 选取最佳服务器 if is_reconnect or len(self.best_ip) == 0: - self.best_ip = get_cache_ip() + self.best_ip = get_cache_json(TDX_FUTURE_CONFIG) if len(self.best_ip) == 0: self.best_ip = self.select_best_ip() @@ -192,7 +193,7 @@ class TdxFutureData(object): self.write_log(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port'])) # print(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port'])) - save_cache_ip(best_future_ip) + save_cache_json(best_future_ip, TDX_FUTURE_CONFIG) return best_future_ip # ---------------------------------------------------------------------- @@ -207,7 +208,8 @@ class TdxFutureData(object): if not isinstance(num, int): return - all_contacts = sum([self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], []) + all_contacts = sum( + [self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], []) # [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}] # 对所有合约处理,更新字典 指数合约-tdx市场,指数合约-交易所 @@ -348,7 +350,8 @@ class TdxFutureData(object): add_bar.volume = float(row['volume']) add_bar.openInterest = float(row['open_interest']) except Exception as ex: - self.write_error('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) + self.write_error( + 'error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) # print('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) return False @@ -434,7 +437,7 @@ class TdxFutureData(object): index = 0 count = 100 results = [] - while(True): + while (True): print(u'查询{}下:{}~{}个合约'.format(exchange, index, index + count)) result = self.api.get_instrument_quote_list(int(market_id), 3, index, count) results.extend(result) @@ -466,7 +469,7 @@ class TdxFutureData(object): for contract in contracts: # 排除指数合约 code = contract.get('code') - if code[-2:] in ['L9', 'L8', 'L0', 'L1', 'L2', 'L3', '50'] or\ + if code[-2:] in ['L9', 'L8', 'L0', 'L1', 'L2', 'L3', '50'] or \ (exchange == Exchange.CFFEX and code[-3:] in ['300', '500']): continue short_symbol = get_underlying_symbol(code).upper() @@ -561,7 +564,8 @@ class TdxFutureData(object): return True, _datas except Exception as ex: - self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) + self.write_error( + 'exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) self.write_error(u'当前异常服务器信息:{}'.format(self.best_ip)) self.write_log(u'重置连接') self.api = None @@ -611,7 +615,7 @@ class TdxFutureData(object): def get_history_transaction_data(self, symbol: str, trading_date, - cache_folder:str = None): + cache_folder: str = None): """获取当某一交易日的历史成交记录""" ret_datas = [] # trading_date, 转换为数字类型得日期 @@ -656,7 +660,7 @@ class TdxFutureData(object): while True: _res = self.api.get_history_transaction_data( market=self.symbol_market_dict.get(tdx_index_symbol, 0), - trading_date=trading_date, + date=trading_date, code=symbol, start=_pos, count=q_size) @@ -718,67 +722,47 @@ class TdxFutureData(object): self.connect(is_reconnect=True) return False, ret_datas + def update_mi_contracts(self): + # 连接通达信,获取主力合约 + if not self.api: + self.connect() -if __name__ == "__main__": - from .tdx_common import FakeStrategy - t1 = FakeStrategy() - t2 = FakeStrategy() - # 创建API对象 - api_01 = TdxFutureData(t1) + mi_contract_quote_list = self.get_mi_contracts2() - markets = api_01.get_markets() - str_markets = json.dumps(markets, indent=1, ensure_ascii=False) - print(u'{}'.format(str_markets)) + self.write_log(u'一共获取:{}个主力合约:{}'.format(len(mi_contract_quote_list), [c.get('code') for c in mi_contract_quote_list])) + should_save = False + # 逐一更新主力合约数据 + for mi_contract in mi_contract_quote_list: + tdx_market_id = mi_contract.get('market') + full_symbol = mi_contract.get('code') + underlying_symbol = get_underlying_symbol(full_symbol).upper() + if underlying_symbol in ['SC', 'NR']: + vn_exchange = Exchange.INE + else: + vn_exchange = Tdx_Vn_Exchange_Map.get(str(tdx_market_id)) + mi_symbol = get_real_symbol_by_exchange(full_symbol, vn_exchange) - # 获取所有的期货合约明细 - api_01.qryInstrument() + # 更新登记 短合约:真实主力合约 + self.write_log('{},{},{},{},{}'.format(tdx_market_id, full_symbol, underlying_symbol, mi_symbol, vn_exchange)) + if underlying_symbol in self.future_contracts: + info = self.future_contracts.get(underlying_symbol) + if mi_symbol > info.get('mi_symbol') : + self.write_log(u'主力合约变化:{} =>{}'.format(info.get('mi_symbol'), mi_symbol)) + info.update({'mi_symbol': mi_symbol, 'full_symbol': full_symbol}) + self.future_contracts.update({underlying_symbol: info}) + should_save = True + else: + # 添加到新合约中 + # todo 这里缺少size和price_tick + info = { + "underlying_symbol": underlying_symbol, + "mi_symbol": mi_symbol, + "full_symbol": full_symbol, + "exchange": vn_exchange.value + } + self.write_log(u'新合约:{}'.format(info)) + self.future_contracts.update({underlying_symbol: info}) + should_save = True - # 获取某个合约得最新价 - price = api_01.get_price('rb2005') - print('price={}'.format(price)) - exit(0) - # 获取主力合约 - # result = api_01.get_mi_contracts() - # str_result = json.dumps(result,indent=1, ensure_ascii=False) - # print(str_result) - - # 获取某个板块的合约 - # result = api_01.get_contracts(exchange=EXCHANGE_CZCE) - - # 获取某个板块的主力合约 - # result = api_01.get_mi_contracts_from_exchange(exchange=EXCHANGE_CZCE) - # 获取主力合约(从各个板块组合获取) - # result = api_01.get_mi_contracts2() - # print(u'一共{}个记录:{}'.format(len(result), [c.get('code') for c in result])) - # str_result = json.dumps(result,indent=1, ensure_ascii=False) - # print(str_result) - - # all_99_ticks= api_01.get_99_contracts() - # str_99_ticks = json.dumps(all_99_ticks, indent=1, ensure_ascii=False) - # print(u'{}'.format(str_99_ticks)) - - # 获取历史分钟线 - """ - ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0)) - line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars] - import pandas as pd - df = pd.DataFrame(line_close_oi) - corr = df.corr() - print(corr) - corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2) - """ - # api.get_bars(symbol, period='5min', callback=display_bar) - # api_01.get_bars('IF99', period='1day', callback=t1.display_bar) - # result,datas = api_01.get_transaction_data(symbol='ni1905') - # api_02 = TdxFutureData(t2) - # api_02.get_bars('IF99', period='1min', callback=t1.display_bar) - - # 获取当前交易日分时数据 - # ret,result = api_01.get_transaction_data('RB99') - # for r in result[0:10] + result[-10:]: - # print(r) - - # 获取历史分时数据 - ret, result = api_01.get_history_transaction_data('rb1905', '20190109') - for r in result[0:10] + result[-10:]: - print(r) + if should_save: + save_future_contracts(self.future_contracts) diff --git a/vnpy/data/tdx/tdx_stock_data.py b/vnpy/data/tdx/tdx_stock_data.py index 0375005c..96c7245b 100644 --- a/vnpy/data/tdx/tdx_stock_data.py +++ b/vnpy/data/tdx/tdx_stock_data.py @@ -41,7 +41,6 @@ NUM_MINUTE_MAPPING['1day'] = 60 * 5.5 # 股票,收盘时间是15:00,开 # 常量 QSIZE = 800 - STOCK_CONFIG_FILE = 'tdx_stock_config.pkb2' # 通达信 <=> 交易所代码 映射 @@ -61,8 +60,6 @@ RQ_TDX_STOCK_MARKET_MAP = {v: k for k, v in TDX_RQ_STOCK_MARKET_MAP.items()} class TdxStockData(object): - - # ---------------------------------------------------------------------- def __init__(self, strategy=None): """ 构造函数 @@ -172,6 +169,7 @@ class TdxStockData(object): results.extend(result) return results + # ---------------------------------------------------------------------- def get_bars(self, symbol: str, @@ -184,7 +182,7 @@ class TdxStockData(object): symbol:股票 000001.XG period: 周期: 1min,5min,15min,30min,1hour,1day, """ - if self.api is None: + if not self.api: self.connect() ret_bars = [] if self.api is None: @@ -375,7 +373,11 @@ class TdxStockData(object): :param cache_folder: :return: """ + if not self.api: + self.connect() + ret_datas = [] + # trading_date ,转为为查询数字类型 if isinstance(trading_date, datetime): trading_date = trading_date.strftime('%Y%m%d') @@ -429,7 +431,7 @@ class TdxStockData(object): # 获取历史交易记录 _res = self.api.get_history_transaction_data( market=self.symbol_market_dict[symbol], - trading_date=trading_date, + date=trading_date, code=symbol, start=_pos, count=q_size) @@ -474,45 +476,3 @@ class TdxStockData(object): self.write_error( 'exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) return False, ret_datas - - -if __name__ == "__main__": - from tdx_common import FakeStrategy - import json - t1 = FakeStrategy() - t2 = FakeStrategy() - # 创建API对象 - api_01 = TdxStockData(t1) - - # 获取市场下股票 - for market_id in range(2): - print('get market_id:{}'.format(market_id)) - security_list = api_01.get_security_list(market_id) - if len(security_list) == 0: - continue - for security in security_list: - if security.get('code', '').startswith('12') or u'转债' in security.get('name', ''): - str_security = json.dumps(security, indent=1, ensure_ascii=False) - print('market_id:{},{}'.format(market_id, str_security)) - - # str_markets = json.dumps(security_list, indent=1, ensure_ascii=False) - # print(u'{}'.format(str_markets)) - - - # 获取历史分钟线 - # api_01.get_bars('002024', period='1hour', callback=t1.display_bar) - - # api.get_bars(symbol, period='5min', callback=display_bar) - # api.get_bars(symbol, period='1day', callback=display_bar) - # api_02 = TdxData(t2) - # api_02.get_bars('601390', period='1day', callback=t1.display_bar) - - # 获取历史分时数据 - # ret,result = api_01.get_history_transaction_data('RB99', '20190909') - # for r in result[0:10] + result[-10:]: - # print(r) - - # 获取历史分时数据 - ret, result = api_01.get_history_transaction_data('600410', '20190925') - for r in result[0:10] + result[-10:]: - print(r) diff --git a/vnpy/data/tdx/test_tdx_future.py b/vnpy/data/tdx/test_tdx_future.py new file mode 100644 index 00000000..627ea4f6 --- /dev/null +++ b/vnpy/data/tdx/test_tdx_future.py @@ -0,0 +1,79 @@ +# flake8: noqa +import os +import sys + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.tdx.tdx_common import FakeStrategy +from vnpy.data.tdx.tdx_future_data import * + +t1 = FakeStrategy() +t2 = FakeStrategy() +# 创建API对象 +api_01 = TdxFutureData(t1) + +# 获取所有市场信息 +markets = api_01.get_markets() +str_markets = json.dumps(markets, indent=1, ensure_ascii=False) +print(u'{}'.format(str_markets)) + +# 获取所有的期货合约明细 +api_01.qryInstrument() + +# 获取某个合约得最新价 +# price = api_01.get_price('rb2005') +# print('price={}'.format(price)) + + +# 获取主力合约 +# result = api_01.get_mi_contracts() +# str_result = json.dumps(result,indent=1, ensure_ascii=False) +# print(str_result) + +# 获取某个板块的合约 +# result = api_01.get_contracts(exchange=Exchange.CZCE) + +# 获取某个板块的主力合约 +# result = api_01.get_mi_contracts_from_exchange(exchange=Exchange.CZCE) +# 获取主力合约(从各个板块组合获取) +# result = api_01.get_mi_contracts2() +# print(u'一共{}个记录:{}'.format(len(result), [c.get('code') for c in result])) +# str_result = json.dumps(result,indent=1, ensure_ascii=False) +# print(str_result) + +# all_99_ticks= api_01.get_99_contracts() +# str_99_ticks = json.dumps(all_99_ticks, indent=1, ensure_ascii=False) +# print(u'{}'.format(str_99_ticks)) + +# 获取历史分钟线 +""" +ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0)) +line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars] +import pandas as pd +df = pd.DataFrame(line_close_oi) +corr = df.corr() +print(corr) +corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2) +""" +# api.get_bars(symbol, period='5min', callback=display_bar) +# api_01.get_bars('IF99', period='1day', callback=t1.display_bar) +# result,datas = api_01.get_transaction_data(symbol='ni1905') +# api_02 = TdxFutureData(t2) +# api_02.get_bars('IF99', period='1min', callback=t1.display_bar) + +# 获取当前交易日分时数据 +# ret,result = api_01.get_transaction_data('RB99') +# for r in result[0:10] + result[-10:]: +# print(r) + +# 获取历史分时数据 +# ret, result = api_01.get_history_transaction_data('RB99', '20190109') +# for r in result[0:10] + result[-10:]: +# print(r) + +# 更新本地合约缓存信息 +api_01.update_mi_contracts() diff --git a/vnpy/data/tdx/test_tdx_stock.py b/vnpy/data/tdx/test_tdx_stock.py new file mode 100644 index 00000000..3083a2c3 --- /dev/null +++ b/vnpy/data/tdx/test_tdx_stock.py @@ -0,0 +1,53 @@ +# flake8: noqa +import os +import sys + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.tdx.tdx_common import FakeStrategy +from vnpy.data.tdx.tdx_stock_data import * + +os.environ["VNPY_TESTING"] = "1" + +import json + +t1 = FakeStrategy() +t2 = FakeStrategy() +# 创建API对象 +api_01 = TdxStockData(t1) + +# 获取市场下股票 +for market_id in range(2): + print('get market_id:{}'.format(market_id)) + security_list = api_01.get_security_list(market_id) + if len(security_list) == 0: + continue + for security in security_list: + if security.get('code', '').startswith('12') or u'转债' in security.get('name', ''): + str_security = json.dumps(security, indent=1, ensure_ascii=False) + print('market_id:{},{}'.format(market_id, str_security)) + + # str_markets = json.dumps(security_list, indent=1, ensure_ascii=False) + # print(u'{}'.format(str_markets)) + +# 获取历史分钟线 +# api_01.get_bars('002024', period='1hour', callback=t1.display_bar) + +# api.get_bars(symbol, period='5min', callback=display_bar) +# api.get_bars(symbol, period='1day', callback=display_bar) +# api_02 = TdxData(t2) +# api_02.get_bars('601390', period='1day', callback=t1.display_bar) + +# 获取历史分时数据 +# ret,result = api_01.get_history_transaction_data('RB99', '20190909') +# for r in result[0:10] + result[-10:]: +# print(r) + +# 获取历史分时数据 +ret, result = api_01.get_history_transaction_data('600410', '20190925') +for r in result[0:10] + result[-10:]: + print(r)