diff --git a/vnpy/data/tdx/README.md b/vnpy/data/tdx/README.md new file mode 100644 index 00000000..ac56cec0 --- /dev/null +++ b/vnpy/data/tdx/README.md @@ -0,0 +1,14 @@ +通达信数据接口封装 + +#安装: + + pip install -U pytdx + +#修改tdx的bug: + + +# 接口说明 + + 1.tdx_stock_data, 股票数据接口 + + 2.tdx_future_data, 期货数据接口 diff --git a/vnpy/data/tdx/tdx_common.py b/vnpy/data/tdx/tdx_common.py new file mode 100644 index 00000000..16f67d62 --- /dev/null +++ b/vnpy/data/tdx/tdx_common.py @@ -0,0 +1,62 @@ +# encoding: UTF-8 + +from functools import lru_cache + +@lru_cache() +def get_tdx_market_code(code): + # 获取通达信股票的market code + code = str(code) + if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204"]: + # 上海证券交易所 + return 1 + # 深圳证券交易所 + return 0 + + +# 通达信 K 线种类 +# 0 - 5 分钟K 线 +# 1 - 15 分钟K 线 +# 2 - 30 分钟K 线 +# 3 - 1 小时K 线 +# 4 - 日K 线 +# 5 - 周K 线 +# 6 - 月K 线 +# 7 - 1 分钟 +# 8 - 1 分钟K 线 +# 9 - 日K 线 +# 10 - 季K 线 +# 11 - 年K 线 +PERIOD_MAPPING = {} +PERIOD_MAPPING['1min'] = 8 +PERIOD_MAPPING['5min'] = 0 +PERIOD_MAPPING['15min'] = 1 +PERIOD_MAPPING['30min'] = 2 +PERIOD_MAPPING['1hour'] = 3 +PERIOD_MAPPING['1day'] = 4 +PERIOD_MAPPING['1week'] = 5 +PERIOD_MAPPING['1month'] = 6 + + +# 期货行情服务器清单 +TDX_FUTURE_HOSTS = [ + {"ip": "112.74.214.43", "port": 7727, "name": "扩展市场深圳双线1"}, + {"ip": "120.24.0.77", "port": 7727, "name": "扩展市场深圳双线2"}, + {"ip": "47.107.75.159", "port": 7727, "name": "扩展市场深圳双线3"}, + + {"ip": "113.105.142.136", "port": 443, "name": "扩展市场东莞主站"}, + {"ip": "113.105.142.133", "port": 443, "name": "港股期货东莞电信"}, + + {"ip": "119.97.185.5", "port": 7727, "name": "扩展市场武汉主站1"}, + {"ip": "119.97.185.7", "port": 7727, "name": "港股期货武汉主站1"}, + {"ip": "119.97.185.9", "port": 7727, "name": "港股期货武汉主站2"}, + {"ip": "59.175.238.38", "port": 7727, "name": "扩展市场武汉主站3"}, + + {"ip": "202.103.36.71", "port": 443, "name": "扩展市场武汉主站2"}, + + {"ip": "47.92.127.181", "port": 7727, "name": "扩展市场北京主站"}, + {"ip": "106.14.95.149", "port": 7727, "name": "扩展市场上海双线"}, + {"ip": '218.80.248.229', 'port': 7721 ,"name":"备用服务器1"}, + {"ip": '124.74.236.94', 'port': 7721, "name": "备用服务器2"}, + {'ip': '58.246.109.27', 'port': 7721,"name": "备用服务器3"} + ] + diff --git a/vnpy/data/tdx/tdx_future_data.py b/vnpy/data/tdx/tdx_future_data.py new file mode 100644 index 00000000..e0179bb9 --- /dev/null +++ b/vnpy/data/tdx/tdx_future_data.py @@ -0,0 +1,734 @@ +# encoding: UTF-8 + +# 从tdx下载期货数据. +# 收盘后的数据基本正确, 但盘中实时拿数据时: +# 1. 1Min的Bar可能不是最新的, 会缺几分钟. +# 2. 当周期>1Min时, 最后一根Bar可能不是完整的, 强制修改后 +# - 5min修改后freq基本正确 +# - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大 +# - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错 + +from datetime import datetime, timezone, timedelta, time +import sys, os, csv, pickle, bz2, copy +import json +import traceback + +from logging import ERROR, INFO +from pandas import to_datetime +from pytdx.exhq import TdxExHq_API + +from vnpy.trader.constant import Exchange +from vnpy.trader.object import BarData +from vnpy.trader.utility import get_underlying_symbol, get_full_symbol, get_trading_date + +from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS, PERIOD_MAPPING + + +# 每个周期包含多少分钟 (估算值, 没考虑夜盘和10:15的影响) +NUM_MINUTE_MAPPING = {} +NUM_MINUTE_MAPPING['1min'] = 1 +NUM_MINUTE_MAPPING['5min'] = 5 +NUM_MINUTE_MAPPING['15min'] = 15 +NUM_MINUTE_MAPPING['30min'] = 30 +NUM_MINUTE_MAPPING['1hour'] = 60 +NUM_MINUTE_MAPPING['1day'] = 60*24 +NUM_MINUTE_MAPPING['1week'] = 60*24*7 +NUM_MINUTE_MAPPING['1month'] = 60*24*7*30 + +Tdx_Vn_Exchange_Map = {} +Tdx_Vn_Exchange_Map['47'] = Exchange.CFFEX +Tdx_Vn_Exchange_Map['28'] = Exchange.CZCE +Tdx_Vn_Exchange_Map['29'] = Exchange.DCE +Tdx_Vn_Exchange_Map['30'] = Exchange.SHFE + +Vn_Tdx_Exchange_Map = {v:k for k,v in Tdx_Vn_Exchange_Map.items()} +# 能源所与上期所,都归纳到 30 +Vn_Tdx_Exchange_Map[Exchange.INE] = '30' +INIT_TDX_MARKET_MAP = {'URL9': 28,'WHL9':28,'ZCL9':28,'AL9':29,'BBL9':29,'BL9':29,'CL9':29,'CSL9':29,'EBL9':29,'EGL9':29,'FBL9':29,'IL9':29, +'JDL9':29,'JL9':29,'JML9':29,'LL9':29,'ML9':29,'PL9':29,'PPL9':29,'RRL9':29,'VL9':29,'YL9':29,'AGL9':30,'ALL9':30,'AUL9':30, +'BUL9':30,'CUL9':30,'FUL9':30,'HCL9':30,'NIL9':30,'NRL9':30,'PBL9':30,'RBL9':30,'RUL9':30,'SCL9':30,'SNL9':30,'SPL9':30,'SSL9':30,'WRL9':30, +'ZNL9':30,'APL9':28,'CFL9':28,'CJL9':28,'CYL9':28,'FGL9':28,'JRL9':28,'LRL9':28,'MAL9':28,'OIL9':28,'PML9':28,'RIL9':28,'RML9':28,'RSL9':28,'SFL9':28, +'SML9':28,'SRL9':28,'TAL9':28,'ICL9':47,'IFL9':47,'IHL9':47,'TFL9':47,'TL9':47,'TSL9':47} +# 常量 +QSIZE = 500 +ALL_MARKET_BEGIN_HOUR = 8 +ALL_MARKET_END_HOUR = 16 + +class TdxFutureData(object): + + # ---------------------------------------------------------------------- + def __init__(self, strategy, best_ip={}): + """ + 构造函数 + :param strategy: 上层策略,主要用与使用write_log() + """ + self.api = None + self.connection_status = False # 连接状态 + self.best_ip = best_ip + self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 + self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典 + self.strategy = strategy + + def write_log(self, content): + if self.strategy: + self.strategy.write_log(content) + else: + print(content) + + def write_error(self, content): + if self.strategy: + self.strategy.write_log(content, level=ERROR) + else: + print(content, file=sys.stderr) + + def connect(self, is_reconnect=False): + """ + 连接API + :return: + """ + + # 创建api连接对象实例 + try: + if self.api is None or self.connection_status == False: + self.write_log(u'开始连接通达信行情服务器') + self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True) + + # 选取最佳服务器 + if is_reconnect or len(self.best_ip) == 0: + self.best_ip = self.select_best_ip() + + self.api.connect(self.best_ip['ip'], self.best_ip['port']) + # 尝试获取市场合约统计 + c = self.api.get_instrument_count() + if c < 10: + err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'], self.best_ip['port']) + self.write_error(err_msg) + else: + self.write_log(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port'])) + # print(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port'])) + self.connection_status = True + if not is_reconnect: + # 更新 symbol_exchange_dict , symbol_market_dict + self.qryInstrument() + except Exception as ex: + self.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc())) + return + + # ---------------------------------------------------------------------- + def ping(self, ip, port=7709): + """ + ping行情服务器 + :param ip: + :param port: + :param type_: + :return: + """ + apix = TdxExHq_API() + __time1 = datetime.now() + try: + with apix.connect(ip, port): + if apix.get_instrument_count() > 10000: + _timestamp = datetime.now() - __time1 + self.write_log('服务器{}:{},耗时:{}'.format(ip,port,_timestamp)) + return _timestamp + else: + self.write_log(u'该服务器IP {}无响应'.format(ip)) + return timedelta(9, 9, 0) + except: + self.write_error(u'tdx ping服务器,异常的响应{}'.format(ip)) + return timedelta(9, 9, 0) + + # ---------------------------------------------------------------------- + def select_best_ip(self): + """ + 选择行情服务器 + :return: + """ + self.write_log(u'选择通达信行情服务器') + + data_future = [self.ping(x['ip'], x['port']) for x in TDX_FUTURE_HOSTS] + + best_future_ip = TDX_FUTURE_HOSTS[data_future.index(min(data_future))] + + self.write_log(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port'])) + # print(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port'])) + return best_future_ip + + # ---------------------------------------------------------------------- + def qryInstrument(self): + """ + 查询/更新合约信息 + :return: + """ + + # 取得所有的合约信息 + num = self.api.get_instrument_count() + if not isinstance(num,int): + return + + all_contacts = sum([self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)],[]) + #[{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}] + + # 对所有合约处理,更新字典 指数合约-tdx市场,指数合约-交易所 + for tdx_contract in all_contacts: + tdx_symbol = tdx_contract.get('code', None) + if tdx_symbol is None: + continue + tdx_market_id = tdx_contract.get('market') + if str(tdx_market_id) in Tdx_Vn_Exchange_Map: + self.symbol_exchange_dict.update({tdx_symbol: Tdx_Vn_Exchange_Map.get(str(tdx_market_id))}) + self.symbol_market_dict.update({tdx_symbol: tdx_market_id}) + #if 'L9' in tdx_symbol: + # print('\'{}\':{},'.format(tdx_symbol, Tdx_Vn_Exchange_Map.get(str(tdx_market_id)))) + + # ---------------------------------------------------------------------- + def get_bars(self, symbol, period, callback, bar_is_completed=False, bar_freq=1, start_dt=None): + """ + 返回k线数据 + symbol:合约 + period: 周期: 1min,3min,5min,15min,30min,1day,3day,1hour,2hour,4hour,6hour,12hour + """ + + ret_bars = [] + tdx_symbol = symbol.upper().replace('_' , '') + tdx_symbol = tdx_symbol.replace('99' , 'L9') + self.connect() + if self.api is None: + return False, ret_bars + + if tdx_symbol not in self.symbol_exchange_dict.keys(): + self.write_error(u'{} 合约{}/{}不在下载清单中: {}'.format(datetime.now(), symbol, tdx_symbol, self.symbol_exchange_dict.keys())) + return False,ret_bars + if period not in PERIOD_MAPPING.keys(): + self.write_error(u'{} 周期{}不在下载清单中: {}'.format(datetime.now(), period, list(PERIOD_MAPPING.keys()))) + return False,ret_bars + + tdx_period = PERIOD_MAPPING.get(period) + + if start_dt is None: + self.write_log(u'没有设置开始时间,缺省为10天前') + qry_start_date = datetime.now() - timedelta(days=10) + else: + qry_start_date = start_dt + end_date = datetime.combine(datetime.now() + timedelta(days=1),time(ALL_MARKET_END_HOUR, 0)) + if qry_start_date > end_date: + qry_start_date = end_date + self.write_log('{}开始下载tdx:{} {}数据, {} to {}.'.format(datetime.now(), tdx_symbol, period, qry_start_date, end_date)) + # print('{}开始下载tdx:{} {}数据, {} to {}.'.format(datetime.now(), tdx_symbol, tdx_period, last_date, end_date)) + + try: + _start_date = end_date + _bars = [] + _pos = 0 + while _start_date > qry_start_date: + _res = self.api.get_instrument_bars( + PERIOD_MAPPING[period], + self.symbol_market_dict.get(tdx_symbol,0), + tdx_symbol, + _pos, + QSIZE) + if _res is not None: + _bars = _res + _bars + _pos += QSIZE + if _res is not None and len(_res) > 0: + _start_date = _res[0]['datetime'] + _start_date = datetime.strptime(_start_date, '%Y-%m-%d %H:%M') + self.write_log(u'分段取数据开始:{}'.format(_start_date)) + else: + break + if len(_bars) == 0: + self.write_error('{} Handling {}, len1={}..., continue'.format( + str(datetime.now()), tdx_symbol, len(_bars))) + return False, ret_bars + + current_datetime = datetime.now() + data = self.api.to_df(_bars) + data = data.assign(datetime=to_datetime(data['datetime'])) + data = data.assign(ticker=symbol) + data['instrument_id'] = data['ticker'] + # if future['market'] == 28 or future['market'] == 47: + # # 大写字母: 郑州商品 or 中金所期货 + # data['instrument_id'] = data['ticker'] + # else: + # data['instrument_id'] = data['ticker'].apply(lambda x: x.lower()) + + data['symbol'] = symbol + data = data.drop( + ['year', 'month', 'day', 'hour', 'minute', 'price', 'amount', 'ticker'], + errors='ignore', + axis=1) + data = data.rename( + index=str, + columns={ + 'position': 'open_interest', + 'trade': 'volume', + }) + if len(data) == 0: + print('{} Handling {}, len2={}..., continue'.format( + str(datetime.now()), tdx_symbol, len(data))) + return False, ret_bars + + data['total_turnover'] = data['volume'] + data["limit_down"] = 0 + data["limit_up"] = 999999 + data['trading_date'] = data['datetime'] + data['trading_date'] = data['trading_date'].apply(lambda x: (x.strftime('%Y-%m-%d'))) + monday_ts = data['datetime'].dt.weekday == 0 # 星期一 + night_ts1 = data['datetime'].dt.hour > ALL_MARKET_END_HOUR + night_ts2 = data['datetime'].dt.hour < ALL_MARKET_BEGIN_HOUR + data.loc[night_ts1, 'datetime'] -= timedelta(days=1) # 所有日期的夜盘(21:00~24:00), 减一天 + monday_ts1 = monday_ts & night_ts1 # 星期一的夜盘(21:00~24:00), 再减两天 + data.loc[monday_ts1, 'datetime'] -= timedelta(days=2) + monday_ts2 = monday_ts & night_ts2 # 星期一的夜盘(00:00~04:00), 再减两天 + data.loc[monday_ts2, 'datetime'] -= timedelta(days=2) + # data['datetime'] -= timedelta(minutes=1) # 直接给Strategy使用, RiceQuant格式, 不需要减1分钟 + data['dt_datetime'] = data['datetime'] + data['date'] = data['datetime'].apply(lambda x: (x.strftime('%Y-%m-%d'))) + data['time'] = data['datetime'].apply(lambda x: (x.strftime('%H:%M:%S'))) + data['datetime'] = data['datetime'].apply(lambda x: float(x.strftime('%Y%m%d%H%M%S'))) + data = data.set_index('dt_datetime', drop=False) + # data = data[int(last_date.strftime('%Y%m%d%H%M%S')):int(end_date.strftime('%Y%m%d%H%M%S'))] + # data = data[str(last_date):str(end_date)] + + for index, row in data.iterrows(): + add_bar = BarData() + try: + add_bar.symbol = row['symbol'] + add_bar.datetime = index + add_bar.date = row['date'] + add_bar.time = row['time'] + add_bar.trading_date = row['trading_date'] + add_bar.open = float(row['open']) + add_bar.high = float(row['high']) + add_bar.low = float(row['low']) + add_bar.close = float(row['close']) + add_bar.volume = float(row['volume']) + add_bar.openInterest = float(row['open_interest']) + except Exception as ex: + self.write_error('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) + # print('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) + return False + + if start_dt is not None and index < start_dt: + continue + ret_bars.append(add_bar) + + if callback is not None: + freq = bar_freq + bar_is_completed = True + if period != '1min' and index == data['dt_datetime'][-1]: + # 最后一个bar,可能是不完整的,强制修改 + # - 5min修改后freq基本正确 + # - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大 + # - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错 + if index > current_datetime: + bar_is_completed = False + # 根据秒数算的话,要+1,例如13:31,freq=31,第31根bar + freq = NUM_MINUTE_MAPPING[period] - int((index - current_datetime).total_seconds() / 60) + callback(add_bar, bar_is_completed, freq) + + return True, ret_bars + except Exception as ex: + self.write_error('exception in get:{},{},{}'.format(tdx_symbol,str(ex), traceback.format_exc())) + # print('exception in get:{},{},{}'.format(tdx_symbol,str(ex), traceback.format_exc())) + self.write_log(u'重置连接') + self.api = None + self.connect(is_reconnect=True) + return False, ret_bars + + def get_price(self, symbol): + """获取最新价格""" + tdx_symbol = symbol.upper().replace('_', '') + + short_symbol = get_underlying_symbol(tdx_symbol).upper() + if tdx_symbol.endswith('99'): + query_symbol = tdx_symbol.replace('99', 'L9') + else: + query_symbol = get_full_symbol(tdx_symbol) + + if query_symbol != tdx_symbol: + self.write_log('转换合约:{}=>{}'.format(tdx_symbol, query_symbol)) + + index_symbol = short_symbol+'L9' + self.connect() + if self.api is None: + return 0 + market_id = self.symbol_market_dict.get(index_symbol,0) + + _res = self.api.get_instrument_quote(market_id,query_symbol) + if not isinstance(_res, list): + return 0 + if len(_res) == 0: + return 0 + + return float(_res[0].get('price', 0)) + + def get_99_contracts(self): + """ + 获取指数合约 + :return: dict list + """ + self.connect() + result = self.api.get_instrument_quote_list(42, 3, 0, 100) + return result + + def get_mi_contracts(self): + """ + 获取主力合约 + :return: dict list + """ + self.connect() + result = self.api.get_instrument_quote_list(60, 3, 0, 100) + return result + + def get_contracts(self, exchange): + self.connect() + market_id = Vn_Tdx_Exchange_Map.get(exchange,None) + if market_id is None: + print(u'市场:{}配置不在Vn_Tdx_Exchange_Map:{}中,不能取市场下所有合约'.format(exchange, Vn_Tdx_Exchange_Map)) + return [] + + index = 0 + count = 100 + results = [] + while(True): + print(u'查询{}下:{}~{}个合约'.format(exchange, index, index+count)) + result = self.api.get_instrument_quote_list(int(market_id), 3, index, count) + results.extend(result) + index += count + if len(result) < count: + break + return results + + def get_mi_contracts2(self): + """ 获取主力合约""" + self.connect() + contracts = [] + for exchange in Vn_Tdx_Exchange_Map.keys(): + contracts.extend(self.get_mi_contracts_from_exchange(exchange)) + + return contracts + + def get_mi_contracts_from_exchange(self, exchange): + contracts = self.get_contracts(exchange) + + if len(contracts) == 0: + print(u'异常,未能获取{}下合约信息'.format(exchange)) + return [] + + mi_contracts = [] + + short_contract_dict = {} + + for contract in contracts: + # 排除指数合约 + code = contract.get('code') + if code[-2:] in ['L9','L8','L0','L1','L2','L3','50'] or (exchange == Exchange.CFFEX and code[-3:] in ['300', '500']): + continue + short_symbol = get_underlying_symbol(code).upper() + contract_list = short_contract_dict.get(short_symbol,[]) + contract_list.append(contract) + short_contract_dict.update({short_symbol:contract_list}) + + for k, v in short_contract_dict.items(): + sorted_list = sorted(v, key=lambda c: c['ZongLiang']) + + mi_contracts.append(sorted_list[-1]) + + return mi_contracts + + def get_markets(self): + """ + 获取市场代码 + :return: + """ + self.connect() + result = self.api.get_markets() + return result + + def get_transaction_data(self, symbol): + """获取当前交易日的历史成交记录""" + ret_datas = [] + max_data_size = sys.maxsize + symbol = symbol.upper() + if '99' in symbol: + symbol = symbol.replace('99', 'L9') + + self.connect() + + q_size = QSIZE * 5 + # 每秒 2个, 10小时 + max_data_size = 1000000 + + self.write_log(u'开始下载{}当日分笔数据'.format(symbol)) + + try: + _datas = [] + _pos = 0 + + while(True): + _res = self.api.get_transaction_data( + market=self.symbol_market_dict.get(symbol,0), + code=symbol, + start=_pos, + count=q_size) + if _res is not None: + for d in _res: + dt = d.pop('date') + # 星期1~星期6 + if dt.hour >= 20 and 1< dt.isoweekday()<=6: + dt = dt - timedelta(days=1) + elif dt.hour >= 20 and dt.isoweekday() == 1: + # 星期一取得20点后数据 + dt = dt - timedelta(days=3) + elif dt.hour < 8 and dt.isoweekday() == 1: + # 星期一取得8点前数据 + dt = dt - timedelta(days=3) + elif dt.hour >= 20 and dt.isoweekday() == 7: + # 星期天取得20点后数据,肯定是星期五夜盘 + dt = dt - timedelta(days=2) + elif dt.isoweekday() == 7: + # 星期日取得其他时间,必然是 星期六凌晨的数据 + dt = dt - timedelta(days=1) + + d.update({'datetime': dt}) + # 接口有bug,返回价格*1000,所以要除以1000 + d.update({'price': d.get('price', 0) / 1000}) + _datas = sorted(_res, key=lambda s: s['datetime']) + _datas + _pos += min(q_size,len(_res)) + + if _res is not None and len(_res) > 0: + self.write_log(u'分段取分笔数据:{} ~{}, {}条,累计:{}条'.format( _res[0]['datetime'],_res[-1]['datetime'], len(_res),_pos)) + else: + break + + if len(_datas) >= max_data_size: + break + + if len(_datas) == 0: + self.write_error(u'{}分笔成交数据获取为空') + + return True, _datas + + except Exception as ex: + self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) + self.write_error(u'当前异常服务器信息:{}'.format(self.best_ip)) + self.write_log(u'重置连接') + self.api = None + self.connect(is_reconnect=True) + return False, ret_datas + + def save_cache(self, cache_folder, cache_symbol, cache_date, data_list): + """保存文件到缓存""" + + os.makedirs(cache_folder, exist_ok=True) + + if not os.path.exists(cache_folder): + self.write_error('缓存目录不存在:{},不能保存'.format(cache_folder)) + return + cache_folder_year_month = os.path.join(cache_folder, cache_date[:6]) + os.makedirs(cache_folder_year_month, exist_ok=True) + + save_file = os.path.join(cache_folder_year_month, '{}_{}.pkz2'.format(cache_symbol, cache_date)) + try: + with bz2.BZ2File(save_file, 'wb') as f: + pickle.dump(data_list, f) + self.write_log(u'缓存成功:{}'.format(save_file)) + except Exception as ex: + self.write_error(u'缓存写入异常:{}'.format(str(ex))) + + def load_cache(self, cache_folder, cache_symbol, cache_date): + """加载缓存数据""" + if not os.path.exists(cache_folder): + self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder)) + return None + cache_folder_year_month = os.path.join(cache_folder, cache_date[:6]) + if not os.path.exists(cache_folder_year_month): + self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder_year_month)) + return None + + cache_file = os.path.join(cache_folder_year_month, '{}_{}.pkz2'.format(cache_symbol, cache_date)) + if not os.path.isfile(cache_file): + self.write_error('缓存文件:{}不存在,不能读取'.format(cache_file)) + return None + with bz2.BZ2File(cache_file, 'rb') as f: + data = pickle.load(f) + return data + + return None + + def get_history_transaction_data(self, symbol, date, cache_folder=None): + """获取当某一交易日的历史成交记录""" + ret_datas = [] + if isinstance(date, datetime): + date = date.strftime('%Y%m%d') + if isinstance(date, str): + date = int(date) + + self.connect() + + cache_symbol = symbol + cache_date = str(date) + + max_data_size = sys.maxsize + symbol = symbol.upper() + if '99' in symbol: + symbol = symbol.replace('99', 'L9') + + q_size = QSIZE * 5 + # 每秒 2个, 10小时 + max_data_size = 1000000 + + # 优先从缓存加载 + if cache_folder: + buffer_data = self.load_cache(cache_folder, cache_symbol, cache_date) + if buffer_data: + self.write_log(u'使用缓存文件') + return True, buffer_data + + self.write_log(u'开始下载{} 历史{}分笔数据'.format(date, symbol)) + cur_trading_date = get_trading_date() + if date == int(cur_trading_date.replace('-', '')): + return self.get_transaction_data(symbol) + try: + _datas = [] + _pos = 0 + + while(True): + _res = self.api.get_history_transaction_data( + market=self.symbol_market_dict.get(symbol,0), + date=date, + code=symbol, + start=_pos, + count=q_size) + if _res is not None: + for d in _res: + dt = d.pop('date') + # 星期1~星期6 + if dt.hour >= 20 and 1< dt.isoweekday()<=6: + dt = dt - timedelta(days=1) + d.update({'datetime':dt}) + elif dt.hour >= 20 and dt.isoweekday() == 1: + # 星期一取得20点后数据 + dt = dt - timedelta(days=3) + d.update({'datetime': dt}) + elif dt.hour < 8 and dt.isoweekday() == 1: + # 星期一取得8点前数据 + dt = dt - timedelta(days=3) + d.update({'datetime': dt}) + elif dt.hour >= 20 and dt.isoweekday() == 7: + # 星期天取得20点后数据,肯定是星期五夜盘 + dt = dt - timedelta(days=2) + d.update({'datetime': dt}) + elif dt.isoweekday() == 7: + # 星期日取得其他时间,必然是 星期六凌晨的数据 + dt = dt - timedelta(days=1) + d.update({'datetime': dt}) + else: + d.update({'datetime': dt}) + # 接口有bug,返回价格*1000,所以要除以1000 + d.update({'price': d.get('price', 0)/1000}) + _datas = sorted(_res, key=lambda s: s['datetime']) + _datas + _pos += min(q_size, len(_res)) + + if _res is not None and len(_res) > 0: + self.write_log(u'分段取分笔数据:{} ~{}, {}条,累计:{}条'.format( _res[0]['datetime'],_res[-1]['datetime'], len(_res),_pos)) + else: + break + + if len(_datas) >= max_data_size: + break + + if len(_datas) == 0: + self.write_error(u'{}分笔成交数据获取为空'.format(date)) + return False,_datas + + # 缓存文件 + if cache_folder: + self.save_cache(cache_folder, cache_symbol, cache_date, _datas) + + return True, _datas + + except Exception as ex: + self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) + self.write_error(u'当前异常服务器信息:{}'.format(self.best_ip)) + self.write_log(u'重置连接') + self.api = None + self.connect(is_reconnect=True) + return False, ret_datas + +class FakeStrategy(object): + + def write_log(self, content, level=INFO): + if level == INFO: + print(content) + else: + print(content, file=sys.stderr) + + def display_bar(self, bar, bar_is_completed=True, freq=1): + print(u'{} {}'.format(bar.vtSymbol, bar.datetime)) + +if __name__ == "__main__": + + t1 = FakeStrategy() + t2 = FakeStrategy() + # 创建API对象 + api_01 = TdxFutureData(t1) + + markets = api_01.get_markets() + str_markets = json.dumps(markets, indent=1, ensure_ascii=False) + print(u'{}'.format(str_markets)) + + # 获取所有的期货合约明细 + api_01.qryInstrument() + + # 获取某个合约得最新价 + price = api_01.get_price('rb2005') + print('price={}'.format(price)) + exit(0) + # 获取主力合约 + #result = api_01.get_mi_contracts() + #str_result = json.dumps(result,indent=1, ensure_ascii=False) + #print(str_result) + + # 获取某个板块的合约 + #result = api_01.get_contracts(exchange=EXCHANGE_CZCE) + + # 获取某个板块的主力合约 + #result = api_01.get_mi_contracts_from_exchange(exchange=EXCHANGE_CZCE) + # 获取主力合约(从各个板块组合获取) + #result = api_01.get_mi_contracts2() + #print(u'一共{}个记录:{}'.format(len(result), [c.get('code') for c in result])) + #str_result = json.dumps(result,indent=1, ensure_ascii=False) + #print(str_result) + + #all_99_ticks= api_01.get_99_contracts() + #str_99_ticks = json.dumps(all_99_ticks, indent=1, ensure_ascii=False) + #print(u'{}'.format(str_99_ticks)) + + # 获取历史分钟线 + """ + ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0)) + line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars] + import pandas as pd + df = pd.DataFrame(line_close_oi) + corr = df.corr() + print(corr) + corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2) + """ + # api.get_bars(symbol, period='5min', callback=display_bar) + #api_01.get_bars('IF99', period='1day', callback=t1.display_bar) + #result,datas = api_01.get_transaction_data(symbol='ni1905') + #api_02 = TdxFutureData(t2) + #api_02.get_bars('IF99', period='1min', callback=t1.display_bar) + + # 获取当前交易日分时数据 + #ret,result = api_01.get_transaction_data('RB99') + #for r in result[0:10] + result[-10:]: + # print(r) + + # 获取历史分时数据 + ret,result = api_01.get_history_transaction_data('J99', '20191009') + for r in result[0:10] + result[-10:]: + print(r) + diff --git a/vnpy/data/tdx/tdx_stock_data.py b/vnpy/data/tdx/tdx_stock_data.py new file mode 100644 index 00000000..fb2b4b98 --- /dev/null +++ b/vnpy/data/tdx/tdx_stock_data.py @@ -0,0 +1,402 @@ +# encoding: UTF-8 + +# 从tdx下载股票数据. +# 收盘后的数据基本正确, 但盘中实时拿数据时: +# 1. 1Min的Bar可能不是最新的, 会缺几分钟. +# 2. 当周期>1Min时, 最后一根Bar可能不是完整的, 强制修改后 +# - 5min修改后freq基本正确 +# - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大 +# - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错 +# https://rainx.gitbooks.io/pytdx/content/pytdx_hq.html +# 华富资产 + +import sys, os, pickle, bz2, traceback +from datetime import datetime, timedelta +from logging import ERROR, INFO +from pytdx.hq import TdxHq_API +from pandas import to_datetime + +from vnpy.trader.object import BarData +from vnpy.data.tdx.tdx_common import PERIOD_MAPPING, get_tdx_market_code + +# 每个周期包含多少分钟 +NUM_MINUTE_MAPPING = {} +NUM_MINUTE_MAPPING['1min'] = 1 +NUM_MINUTE_MAPPING['5min'] = 5 +NUM_MINUTE_MAPPING['15min'] = 15 +NUM_MINUTE_MAPPING['30min'] = 30 +NUM_MINUTE_MAPPING['1hour'] = 60 +NUM_MINUTE_MAPPING['1day'] = 60*5.5 # 股票,收盘时间是15:00,开盘是9:30 + +# 常量 +QSIZE = 800 + +class TdxStockData(object): + best_ip = None + symbol_exchange_dict = {} # tdx合约与vn交易所的字典 + symbol_market_dict = {} # tdx合约与tdx市场的字典 + + # ---------------------------------------------------------------------- + def __init__(self, strategy=None): + """ + 构造函数 + :param strategy: 上层策略,主要用与使用write_log() + """ + self.api = None + + self.connection_status = False # 连接状态 + + self.strategy = strategy + + self.connect() + + def write_log(self, content): + if self.strategy: + self.strategy.write_log(content) + else: + print(content) + + def write_error(self, content): + if self.strategy: + self.strategy.write_log(content, level=ERROR) + else: + print(content, file=sys.stderr) + + def connect(self): + """ + 连接API + :return: + """ + # 创建api连接对象实例 + try: + if self.api is None or self.connection_status == False: + self.write_log(u'开始连接通达信股票行情服务器') + self.api = TdxHq_API(heartbeat=True, auto_retry=True, raise_exception=True) + + # 选取最佳服务器 + if TdxStockData.best_ip is None: + from pytdx.util.best_ip import select_best_ip + TdxStockData.best_ip = select_best_ip() + + self.api.connect(self.best_ip.get('ip'), self.best_ip.get('port')) + self.write_log(f'创建tdx连接, : {self.best_ip}') + TdxStockData.connection_status = True + + except Exception as ex: + self.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc())) + return + + def disconnect(self): + if self.api is not None: + self.api= None + + # ---------------------------------------------------------------------- + def get_bars(self, symbol, period, callback, bar_is_completed=False, bar_freq=1, start_dt=None): + """ + 返回k线数据 + symbol:股票 000001.XG + period: 周期: 1min,5min,15min,30min,1hour,1day, + """ + if self.api is None: + self.connect() + + # 新版一劳永逸偷懒写法zzz + if '.' in symbol: + tdx_code,market_str = symbol.split('.') + market_code = 1 if market_str.upper()== 'XSHG' else 0 + self.symbol_exchange_dict.update({tdx_code:symbol}) # tdx合约与vn交易所的字典 + self.symbol_market_dict.update({tdx_code:market_code}) # tdx合约与tdx市场的字典 + else: + market_code = get_tdx_market_code(symbol) + tdx_code = symbol + self.symbol_exchange_dict.update({symbol: symbol}) # tdx合约与vn交易所的字典 + self.symbol_market_dict.update({symbol: market_code}) # tdx合约与tdx市场的字典 + + # https://github.com/rainx/pytdx/issues/33 + # 0 - 深圳, 1 - 上海 + + ret_bars = [] + + if period not in PERIOD_MAPPING.keys(): + self.write_log(u'{} 周期{}不在下载清单中: {}'.format(datetime.now(), period, list(PERIOD_MAPPING.keys())), level=ERROR) + # print(u'{} 周期{}不在下载清单中: {}'.format(datetime.now(), period, list(PERIOD_MAPPING.keys()))) + return False,ret_bars + + if self.api is None: + return False,ret_bars + + tdx_period = PERIOD_MAPPING.get(period) + + if start_dt is None: + self.write_log(u'没有设置开始时间,缺省为10天前') + qry_start_date = datetime.now() - timedelta(days=10) + start_dt = qry_start_date + else: + qry_start_date = start_dt + end_date = datetime.now() + if qry_start_date > end_date: + qry_start_date = end_date + + self.write_log('{}开始下载tdx股票:{} {}数据, {} to {}.'.format(datetime.now(), tdx_code, tdx_period, qry_start_date, end_date)) + + try: + _start_date = end_date + _bars = [] + _pos = 0 + while _start_date > qry_start_date: + _res = self.api.get_security_bars(category=PERIOD_MAPPING[period], + market=market_code, + code=tdx_code, + start=_pos, + count=QSIZE) + if _res is not None: + _bars = _res + _bars + _pos += QSIZE + if _res is not None and len(_res) > 0: + _start_date = _res[0]['datetime'] + _start_date = datetime.strptime(_start_date, '%Y-%m-%d %H:%M') + self.write_log(u'分段取数据开始:{}'.format(_start_date)) + else: + break + if len(_bars) == 0: + self.write_error('{} Handling {}, len1={}..., continue'.format( + str(datetime.now()), tdx_code, len(_bars))) + return False, ret_bars + + current_datetime = datetime.now() + data = self.api.to_df(_bars) + data = data.assign(datetime=to_datetime(data['datetime'])) + data = data.assign(ticker=symbol) + data['symbol'] = symbol + data = data.drop( + ['year', 'month', 'day', 'hour', 'minute', 'price', 'ticker'], + errors='ignore', + axis=1) + data = data.rename( + index=str, + columns={'amount': 'volume', + }) + if len(data) == 0: + print('{} Handling {}, len2={}..., continue'.format( + str(datetime.now()), tdx_code, len(data))) + return False, ret_bars + + # 通达信是以bar的结束时间标记的,vnpy是以bar开始时间标记的,所以要扣减bar本身的分钟数 + data['datetime'] = data['datetime'].apply(lambda x:x-timedelta(minutes=NUM_MINUTE_MAPPING.get(period,1))) + data['trading_date'] = data['datetime'].apply(lambda x: (x.strftime('%Y-%m-%d'))) + data['date'] = data['datetime'].apply(lambda x: (x.strftime('%Y-%m-%d'))) + data['time'] = data['datetime'].apply(lambda x: (x.strftime('%H:%M:%S'))) + data = data.set_index('datetime', drop=False) + + for index, row in data.iterrows(): + add_bar = BarData() + try: + add_bar.symbol = symbol + add_bar.datetime = index + add_bar.date = row['date'] + add_bar.time = row['time'] + add_bar.trading_date = row['trading_date'] + add_bar.open = float(row['open']) + add_bar.high = float(row['high']) + add_bar.low = float(row['low']) + add_bar.close = float(row['close']) + add_bar.volume = float(row['volume']) + except Exception as ex: + self.write_error('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) + # print('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) + return False + + if start_dt is not None and index < start_dt: + continue + ret_bars.append(add_bar) + + if callback is not None: + freq = bar_freq + bar_is_completed = True + if period != '1min' and index == data['datetime'][-1]: + # 最后一个bar,可能是不完整的,强制修改 + # - 5min修改后freq基本正确 + # - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大 + # - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错 + if index > current_datetime: + bar_is_completed = False + # 根据秒数算的话,要+1,例如13:31,freq=31,第31根bar + freq = NUM_MINUTE_MAPPING[period] - int((index - current_datetime).total_seconds() / 60) + callback(add_bar, bar_is_completed, freq) + + return True,ret_bars + except Exception as ex: + self.write_error('exception in get:{},{},{}'.format(tdx_code,str(ex), traceback.format_exc())) + # print('exception in get:{},{},{}'.format(tdx_symbol,str(ex), traceback.format_exc())) + self.write_log(u'重置连接') + TdxStockData.api = None + self.connect() + return False, ret_bars + + def save_cache(self, cache_folder, cache_symbol, cache_date, data_list): + """保存文件到缓存""" + + os.makedirs(cache_folder,exist_ok=True) + + if not os.path.exists(cache_folder): + self.write_error('缓存目录不存在:{},不能保存'.format(cache_folder)) + return + cache_folder_year_month = os.path.join(cache_folder, cache_date[:6]) + os.makedirs(cache_folder_year_month, exist_ok=True) + + save_file = os.path.join(cache_folder_year_month, '{}_{}.pkb2'.format(cache_symbol, cache_date)) + with bz2.BZ2File(save_file, 'wb') as f: + pickle.dump(data_list, f) + self.write_log(u'缓存成功:{}'.format(save_file)) + + def load_cache(self, cache_folder, cache_symbol, cache_date): + """加载缓存数据""" + if not os.path.exists(cache_folder): + self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder)) + return None + cache_folder_year_month = os.path.join(cache_folder, cache_date[:6]) + if not os.path.exists(cache_folder_year_month): + self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder_year_month)) + return None + + cache_file = os.path.join(cache_folder_year_month, '{}_{}.pkb2'.format(cache_symbol, cache_date)) + if not os.path.isfile(cache_file): + self.write_error('缓存文件:{}不存在,不能读取'.format(cache_file)) + return None + with bz2.BZ2File(cache_file, 'rb') as f: + data = pickle.load(f) + return data + + return None + + def get_history_transaction_data(self, symbol, date, cache_folder=None): + """获取当某一交易日的历史成交记录""" + ret_datas = [] + if isinstance(date, datetime): + date = date.strftime('%Y%m%d') + if isinstance(date, str): + date = int(date) + + cache_symbol = symbol + cache_date = str(date) + + max_data_size = sys.maxsize + # symbol.exchange => tdx_code market_code + if '.' in symbol: + tdx_code, market_str = symbol.split('.') + market_code = 1 if market_str.upper() == 'XSHG' else 0 + self.symbol_exchange_dict.update({tdx_code: symbol}) # tdx合约与vn交易所的字典 + self.symbol_market_dict.update({tdx_code: market_code}) # tdx合约与tdx市场的字典 + else: + market_code = get_tdx_market_code(symbol) + tdx_code = symbol + self.symbol_exchange_dict.update({symbol: symbol}) # tdx合约与vn交易所的字典 + self.symbol_market_dict.update({symbol: market_code}) # tdx合约与tdx市场的字典 + + q_size = QSIZE * 5 + # 每秒 2个, 10小时 + max_data_size = 1000000 + # 优先从缓存加载 + if cache_folder: + buffer_data = self.load_cache(cache_folder, cache_symbol, cache_date) + if buffer_data: + return True, buffer_data + + self.write_log(u'开始下载{} 历史{}分笔数据'.format(date, symbol)) + + is_today = False + if date == int(datetime.now().strftime('%Y%m%d')): + is_today = True + + try: + _datas = [] + _pos = 0 + + while(True): + if is_today: + _res = self.api.get_transaction_data( + market=self.symbol_market_dict[symbol], + code=symbol, + start=_pos, + count=q_size) + else: + _res = self.api.get_history_transaction_data( + market=self.symbol_market_dict[symbol], + date=date, + code=symbol, + start=_pos, + count=q_size) + last_dt = None + if _res is not None: + _datas = _res + _datas + _pos += min(q_size, len(_res)) + + if _res is not None and len(_res) > 0: + self.write_log(u'分段取{}分笔数据:{} ~{}, {}条,累计:{}条'.format(date, _res[0]['time'],_res[-1]['time'], len(_res),_pos)) + else: + break + + if len(_datas) >= max_data_size: + break + + if len(_datas) == 0: + self.write_error(u'{}分笔成交数据获取为空'.format(date)) + return False,_datas + + for d in _datas: + dt = datetime.strptime(str(date) + ' ' + d.get('time'), '%Y%m%d %H:%M') + if last_dt is None or last_dt < dt: + last_dt = dt + else: + if last_dt < dt + timedelta(seconds=59): + last_dt = last_dt + timedelta(seconds=1) + d.update({'datetime': last_dt}) + d.update({'volume': d.pop('vol',0)}) + d.update({'trading_date': last_dt.strftime('%Y-%m-%d')}) + + _datas = sorted(_datas, key=lambda s: s['datetime']) + + # 缓存文件 + if cache_folder and not is_today: + self.save_cache(cache_folder, cache_symbol, cache_date, _datas) + + return True, _datas + + except Exception as ex: + self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) + return False, ret_datas + +if __name__ == "__main__": + class T(object): + def write_log(self,content, level=INFO): + if level == INFO: + print(content) + else: + print(content,file=sys.stderr) + + def display_bar(self,bar, bar_is_completed=True, freq=1): + print(u'{} {}'.format(bar.vtSymbol,bar.datetime)) + + t1 = T() + t2 = T() + # 创建API对象 + api_01 = TdxStockData(t1) + + # 获取历史分钟线 + #api_01.get_bars('002024', period='1hour', callback=t1.display_bar) + + # api.get_bars(symbol, period='5min', callback=display_bar) + # api.get_bars(symbol, period='1day', callback=display_bar) + #api_02 = TdxData(t2) + #api_02.get_bars('601390', period='1day', callback=t1.display_bar) + + # 获取历史分时数据 + # ret,result = api_01.get_history_transaction_data('RB99', '20190909') + # for r in result[0:10] + result[-10:]: + # print(r) + + # 获取历史分时数据 + ret, result = api_01.get_history_transaction_data('600410', '20190925') + for r in result[0:10] + result[-10:]: + print(r) +