[新功能] tdx免费数据源

This commit is contained in:
msincenselee 2019-11-30 10:42:06 +08:00
parent f17e6537f9
commit da0e116e16
4 changed files with 1212 additions and 0 deletions

14
vnpy/data/tdx/README.md Normal file
View File

@ -0,0 +1,14 @@
通达信数据接口封装
#安装
pip install -U pytdx
#修改tdx的bug:
# 接口说明
1.tdx_stock_data, 股票数据接口
2.tdx_future_data, 期货数据接口

View File

@ -0,0 +1,62 @@
# encoding: UTF-8
from functools import lru_cache
@lru_cache()
def get_tdx_market_code(code):
# 获取通达信股票的market code
code = str(code)
if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204"]:
# 上海证券交易所
return 1
# 深圳证券交易所
return 0
# 通达信 K 线种类
# 0 - 5 分钟K 线
# 1 - 15 分钟K 线
# 2 - 30 分钟K 线
# 3 - 1 小时K 线
# 4 - 日K 线
# 5 - 周K 线
# 6 - 月K 线
# 7 - 1 分钟
# 8 - 1 分钟K 线
# 9 - 日K 线
# 10 - 季K 线
# 11 - 年K 线
PERIOD_MAPPING = {}
PERIOD_MAPPING['1min'] = 8
PERIOD_MAPPING['5min'] = 0
PERIOD_MAPPING['15min'] = 1
PERIOD_MAPPING['30min'] = 2
PERIOD_MAPPING['1hour'] = 3
PERIOD_MAPPING['1day'] = 4
PERIOD_MAPPING['1week'] = 5
PERIOD_MAPPING['1month'] = 6
# 期货行情服务器清单
TDX_FUTURE_HOSTS = [
{"ip": "112.74.214.43", "port": 7727, "name": "扩展市场深圳双线1"},
{"ip": "120.24.0.77", "port": 7727, "name": "扩展市场深圳双线2"},
{"ip": "47.107.75.159", "port": 7727, "name": "扩展市场深圳双线3"},
{"ip": "113.105.142.136", "port": 443, "name": "扩展市场东莞主站"},
{"ip": "113.105.142.133", "port": 443, "name": "港股期货东莞电信"},
{"ip": "119.97.185.5", "port": 7727, "name": "扩展市场武汉主站1"},
{"ip": "119.97.185.7", "port": 7727, "name": "港股期货武汉主站1"},
{"ip": "119.97.185.9", "port": 7727, "name": "港股期货武汉主站2"},
{"ip": "59.175.238.38", "port": 7727, "name": "扩展市场武汉主站3"},
{"ip": "202.103.36.71", "port": 443, "name": "扩展市场武汉主站2"},
{"ip": "47.92.127.181", "port": 7727, "name": "扩展市场北京主站"},
{"ip": "106.14.95.149", "port": 7727, "name": "扩展市场上海双线"},
{"ip": '218.80.248.229', 'port': 7721 ,"name":"备用服务器1"},
{"ip": '124.74.236.94', 'port': 7721, "name": "备用服务器2"},
{'ip': '58.246.109.27', 'port': 7721,"name": "备用服务器3"}
]

View File

@ -0,0 +1,734 @@
# encoding: UTF-8
# 从tdx下载期货数据.
# 收盘后的数据基本正确, 但盘中实时拿数据时:
# 1. 1Min的Bar可能不是最新的, 会缺几分钟.
# 2. 当周期>1Min时, 最后一根Bar可能不是完整的, 强制修改后
# - 5min修改后freq基本正确
# - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大
# - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错
from datetime import datetime, timezone, timedelta, time
import sys, os, csv, pickle, bz2, copy
import json
import traceback
from logging import ERROR, INFO
from pandas import to_datetime
from pytdx.exhq import TdxExHq_API
from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData
from vnpy.trader.utility import get_underlying_symbol, get_full_symbol, get_trading_date
from vnpy.data.tdx.tdx_common import TDX_FUTURE_HOSTS, PERIOD_MAPPING
# 每个周期包含多少分钟 (估算值, 没考虑夜盘和10:15的影响)
NUM_MINUTE_MAPPING = {}
NUM_MINUTE_MAPPING['1min'] = 1
NUM_MINUTE_MAPPING['5min'] = 5
NUM_MINUTE_MAPPING['15min'] = 15
NUM_MINUTE_MAPPING['30min'] = 30
NUM_MINUTE_MAPPING['1hour'] = 60
NUM_MINUTE_MAPPING['1day'] = 60*24
NUM_MINUTE_MAPPING['1week'] = 60*24*7
NUM_MINUTE_MAPPING['1month'] = 60*24*7*30
Tdx_Vn_Exchange_Map = {}
Tdx_Vn_Exchange_Map['47'] = Exchange.CFFEX
Tdx_Vn_Exchange_Map['28'] = Exchange.CZCE
Tdx_Vn_Exchange_Map['29'] = Exchange.DCE
Tdx_Vn_Exchange_Map['30'] = Exchange.SHFE
Vn_Tdx_Exchange_Map = {v:k for k,v in Tdx_Vn_Exchange_Map.items()}
# 能源所与上期所,都归纳到 30
Vn_Tdx_Exchange_Map[Exchange.INE] = '30'
INIT_TDX_MARKET_MAP = {'URL9': 28,'WHL9':28,'ZCL9':28,'AL9':29,'BBL9':29,'BL9':29,'CL9':29,'CSL9':29,'EBL9':29,'EGL9':29,'FBL9':29,'IL9':29,
'JDL9':29,'JL9':29,'JML9':29,'LL9':29,'ML9':29,'PL9':29,'PPL9':29,'RRL9':29,'VL9':29,'YL9':29,'AGL9':30,'ALL9':30,'AUL9':30,
'BUL9':30,'CUL9':30,'FUL9':30,'HCL9':30,'NIL9':30,'NRL9':30,'PBL9':30,'RBL9':30,'RUL9':30,'SCL9':30,'SNL9':30,'SPL9':30,'SSL9':30,'WRL9':30,
'ZNL9':30,'APL9':28,'CFL9':28,'CJL9':28,'CYL9':28,'FGL9':28,'JRL9':28,'LRL9':28,'MAL9':28,'OIL9':28,'PML9':28,'RIL9':28,'RML9':28,'RSL9':28,'SFL9':28,
'SML9':28,'SRL9':28,'TAL9':28,'ICL9':47,'IFL9':47,'IHL9':47,'TFL9':47,'TL9':47,'TSL9':47}
# 常量
QSIZE = 500
ALL_MARKET_BEGIN_HOUR = 8
ALL_MARKET_END_HOUR = 16
class TdxFutureData(object):
# ----------------------------------------------------------------------
def __init__(self, strategy, best_ip={}):
"""
构造函数
:param strategy: 上层策略主要用与使用write_log
"""
self.api = None
self.connection_status = False # 连接状态
self.best_ip = best_ip
self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典
self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典
self.strategy = strategy
def write_log(self, content):
if self.strategy:
self.strategy.write_log(content)
else:
print(content)
def write_error(self, content):
if self.strategy:
self.strategy.write_log(content, level=ERROR)
else:
print(content, file=sys.stderr)
def connect(self, is_reconnect=False):
"""
连接API
:return:
"""
# 创建api连接对象实例
try:
if self.api is None or self.connection_status == False:
self.write_log(u'开始连接通达信行情服务器')
self.api = TdxExHq_API(heartbeat=True, auto_retry=True, raise_exception=True)
# 选取最佳服务器
if is_reconnect or len(self.best_ip) == 0:
self.best_ip = self.select_best_ip()
self.api.connect(self.best_ip['ip'], self.best_ip['port'])
# 尝试获取市场合约统计
c = self.api.get_instrument_count()
if c < 10:
err_msg = u'该服务器IP {}/{}无响应'.format(self.best_ip['ip'], self.best_ip['port'])
self.write_error(err_msg)
else:
self.write_log(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port']))
# print(u'创建tdx连接, IP: {}/{}'.format(self.best_ip['ip'], self.best_ip['port']))
self.connection_status = True
if not is_reconnect:
# 更新 symbol_exchange_dict , symbol_market_dict
self.qryInstrument()
except Exception as ex:
self.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc()))
return
# ----------------------------------------------------------------------
def ping(self, ip, port=7709):
"""
ping行情服务器
:param ip:
:param port:
:param type_:
:return:
"""
apix = TdxExHq_API()
__time1 = datetime.now()
try:
with apix.connect(ip, port):
if apix.get_instrument_count() > 10000:
_timestamp = datetime.now() - __time1
self.write_log('服务器{}:{},耗时:{}'.format(ip,port,_timestamp))
return _timestamp
else:
self.write_log(u'该服务器IP {}无响应'.format(ip))
return timedelta(9, 9, 0)
except:
self.write_error(u'tdx ping服务器异常的响应{}'.format(ip))
return timedelta(9, 9, 0)
# ----------------------------------------------------------------------
def select_best_ip(self):
"""
选择行情服务器
:return:
"""
self.write_log(u'选择通达信行情服务器')
data_future = [self.ping(x['ip'], x['port']) for x in TDX_FUTURE_HOSTS]
best_future_ip = TDX_FUTURE_HOSTS[data_future.index(min(data_future))]
self.write_log(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port']))
# print(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port']))
return best_future_ip
# ----------------------------------------------------------------------
def qryInstrument(self):
"""
查询/更新合约信息
:return:
"""
# 取得所有的合约信息
num = self.api.get_instrument_count()
if not isinstance(num,int):
return
all_contacts = sum([self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)],[])
#[{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}]
# 对所有合约处理,更新字典 指数合约-tdx市场指数合约-交易所
for tdx_contract in all_contacts:
tdx_symbol = tdx_contract.get('code', None)
if tdx_symbol is None:
continue
tdx_market_id = tdx_contract.get('market')
if str(tdx_market_id) in Tdx_Vn_Exchange_Map:
self.symbol_exchange_dict.update({tdx_symbol: Tdx_Vn_Exchange_Map.get(str(tdx_market_id))})
self.symbol_market_dict.update({tdx_symbol: tdx_market_id})
#if 'L9' in tdx_symbol:
# print('\'{}\':{},'.format(tdx_symbol, Tdx_Vn_Exchange_Map.get(str(tdx_market_id))))
# ----------------------------------------------------------------------
def get_bars(self, symbol, period, callback, bar_is_completed=False, bar_freq=1, start_dt=None):
"""
返回k线数据
symbol合约
period: 周期: 1min,3min,5min,15min,30min,1day,3day,1hour,2hour,4hour,6hour,12hour
"""
ret_bars = []
tdx_symbol = symbol.upper().replace('_' , '')
tdx_symbol = tdx_symbol.replace('99' , 'L9')
self.connect()
if self.api is None:
return False, ret_bars
if tdx_symbol not in self.symbol_exchange_dict.keys():
self.write_error(u'{} 合约{}/{}不在下载清单中: {}'.format(datetime.now(), symbol, tdx_symbol, self.symbol_exchange_dict.keys()))
return False,ret_bars
if period not in PERIOD_MAPPING.keys():
self.write_error(u'{} 周期{}不在下载清单中: {}'.format(datetime.now(), period, list(PERIOD_MAPPING.keys())))
return False,ret_bars
tdx_period = PERIOD_MAPPING.get(period)
if start_dt is None:
self.write_log(u'没有设置开始时间缺省为10天前')
qry_start_date = datetime.now() - timedelta(days=10)
else:
qry_start_date = start_dt
end_date = datetime.combine(datetime.now() + timedelta(days=1),time(ALL_MARKET_END_HOUR, 0))
if qry_start_date > end_date:
qry_start_date = end_date
self.write_log('{}开始下载tdx:{} {}数据, {} to {}.'.format(datetime.now(), tdx_symbol, period, qry_start_date, end_date))
# print('{}开始下载tdx:{} {}数据, {} to {}.'.format(datetime.now(), tdx_symbol, tdx_period, last_date, end_date))
try:
_start_date = end_date
_bars = []
_pos = 0
while _start_date > qry_start_date:
_res = self.api.get_instrument_bars(
PERIOD_MAPPING[period],
self.symbol_market_dict.get(tdx_symbol,0),
tdx_symbol,
_pos,
QSIZE)
if _res is not None:
_bars = _res + _bars
_pos += QSIZE
if _res is not None and len(_res) > 0:
_start_date = _res[0]['datetime']
_start_date = datetime.strptime(_start_date, '%Y-%m-%d %H:%M')
self.write_log(u'分段取数据开始:{}'.format(_start_date))
else:
break
if len(_bars) == 0:
self.write_error('{} Handling {}, len1={}..., continue'.format(
str(datetime.now()), tdx_symbol, len(_bars)))
return False, ret_bars
current_datetime = datetime.now()
data = self.api.to_df(_bars)
data = data.assign(datetime=to_datetime(data['datetime']))
data = data.assign(ticker=symbol)
data['instrument_id'] = data['ticker']
# if future['market'] == 28 or future['market'] == 47:
# # 大写字母: 郑州商品 or 中金所期货
# data['instrument_id'] = data['ticker']
# else:
# data['instrument_id'] = data['ticker'].apply(lambda x: x.lower())
data['symbol'] = symbol
data = data.drop(
['year', 'month', 'day', 'hour', 'minute', 'price', 'amount', 'ticker'],
errors='ignore',
axis=1)
data = data.rename(
index=str,
columns={
'position': 'open_interest',
'trade': 'volume',
})
if len(data) == 0:
print('{} Handling {}, len2={}..., continue'.format(
str(datetime.now()), tdx_symbol, len(data)))
return False, ret_bars
data['total_turnover'] = data['volume']
data["limit_down"] = 0
data["limit_up"] = 999999
data['trading_date'] = data['datetime']
data['trading_date'] = data['trading_date'].apply(lambda x: (x.strftime('%Y-%m-%d')))
monday_ts = data['datetime'].dt.weekday == 0 # 星期一
night_ts1 = data['datetime'].dt.hour > ALL_MARKET_END_HOUR
night_ts2 = data['datetime'].dt.hour < ALL_MARKET_BEGIN_HOUR
data.loc[night_ts1, 'datetime'] -= timedelta(days=1) # 所有日期的夜盘(21:00~24:00), 减一天
monday_ts1 = monday_ts & night_ts1 # 星期一的夜盘(21:00~24:00), 再减两天
data.loc[monday_ts1, 'datetime'] -= timedelta(days=2)
monday_ts2 = monday_ts & night_ts2 # 星期一的夜盘(00:00~04:00), 再减两天
data.loc[monday_ts2, 'datetime'] -= timedelta(days=2)
# data['datetime'] -= timedelta(minutes=1) # 直接给Strategy使用, RiceQuant格式, 不需要减1分钟
data['dt_datetime'] = data['datetime']
data['date'] = data['datetime'].apply(lambda x: (x.strftime('%Y-%m-%d')))
data['time'] = data['datetime'].apply(lambda x: (x.strftime('%H:%M:%S')))
data['datetime'] = data['datetime'].apply(lambda x: float(x.strftime('%Y%m%d%H%M%S')))
data = data.set_index('dt_datetime', drop=False)
# data = data[int(last_date.strftime('%Y%m%d%H%M%S')):int(end_date.strftime('%Y%m%d%H%M%S'))]
# data = data[str(last_date):str(end_date)]
for index, row in data.iterrows():
add_bar = BarData()
try:
add_bar.symbol = row['symbol']
add_bar.datetime = index
add_bar.date = row['date']
add_bar.time = row['time']
add_bar.trading_date = row['trading_date']
add_bar.open = float(row['open'])
add_bar.high = float(row['high'])
add_bar.low = float(row['low'])
add_bar.close = float(row['close'])
add_bar.volume = float(row['volume'])
add_bar.openInterest = float(row['open_interest'])
except Exception as ex:
self.write_error('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc()))
# print('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc()))
return False
if start_dt is not None and index < start_dt:
continue
ret_bars.append(add_bar)
if callback is not None:
freq = bar_freq
bar_is_completed = True
if period != '1min' and index == data['dt_datetime'][-1]:
# 最后一个bar可能是不完整的强制修改
# - 5min修改后freq基本正确
# - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大
# - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错
if index > current_datetime:
bar_is_completed = False
# 根据秒数算的话,要+1例如13:31,freq=31第31根bar
freq = NUM_MINUTE_MAPPING[period] - int((index - current_datetime).total_seconds() / 60)
callback(add_bar, bar_is_completed, freq)
return True, ret_bars
except Exception as ex:
self.write_error('exception in get:{},{},{}'.format(tdx_symbol,str(ex), traceback.format_exc()))
# print('exception in get:{},{},{}'.format(tdx_symbol,str(ex), traceback.format_exc()))
self.write_log(u'重置连接')
self.api = None
self.connect(is_reconnect=True)
return False, ret_bars
def get_price(self, symbol):
"""获取最新价格"""
tdx_symbol = symbol.upper().replace('_', '')
short_symbol = get_underlying_symbol(tdx_symbol).upper()
if tdx_symbol.endswith('99'):
query_symbol = tdx_symbol.replace('99', 'L9')
else:
query_symbol = get_full_symbol(tdx_symbol)
if query_symbol != tdx_symbol:
self.write_log('转换合约:{}=>{}'.format(tdx_symbol, query_symbol))
index_symbol = short_symbol+'L9'
self.connect()
if self.api is None:
return 0
market_id = self.symbol_market_dict.get(index_symbol,0)
_res = self.api.get_instrument_quote(market_id,query_symbol)
if not isinstance(_res, list):
return 0
if len(_res) == 0:
return 0
return float(_res[0].get('price', 0))
def get_99_contracts(self):
"""
获取指数合约
:return: dict list
"""
self.connect()
result = self.api.get_instrument_quote_list(42, 3, 0, 100)
return result
def get_mi_contracts(self):
"""
获取主力合约
:return: dict list
"""
self.connect()
result = self.api.get_instrument_quote_list(60, 3, 0, 100)
return result
def get_contracts(self, exchange):
self.connect()
market_id = Vn_Tdx_Exchange_Map.get(exchange,None)
if market_id is None:
print(u'市场:{}配置不在Vn_Tdx_Exchange_Map:{}中,不能取市场下所有合约'.format(exchange, Vn_Tdx_Exchange_Map))
return []
index = 0
count = 100
results = []
while(True):
print(u'查询{}下:{}~{}个合约'.format(exchange, index, index+count))
result = self.api.get_instrument_quote_list(int(market_id), 3, index, count)
results.extend(result)
index += count
if len(result) < count:
break
return results
def get_mi_contracts2(self):
""" 获取主力合约"""
self.connect()
contracts = []
for exchange in Vn_Tdx_Exchange_Map.keys():
contracts.extend(self.get_mi_contracts_from_exchange(exchange))
return contracts
def get_mi_contracts_from_exchange(self, exchange):
contracts = self.get_contracts(exchange)
if len(contracts) == 0:
print(u'异常,未能获取{}下合约信息'.format(exchange))
return []
mi_contracts = []
short_contract_dict = {}
for contract in contracts:
# 排除指数合约
code = contract.get('code')
if code[-2:] in ['L9','L8','L0','L1','L2','L3','50'] or (exchange == Exchange.CFFEX and code[-3:] in ['300', '500']):
continue
short_symbol = get_underlying_symbol(code).upper()
contract_list = short_contract_dict.get(short_symbol,[])
contract_list.append(contract)
short_contract_dict.update({short_symbol:contract_list})
for k, v in short_contract_dict.items():
sorted_list = sorted(v, key=lambda c: c['ZongLiang'])
mi_contracts.append(sorted_list[-1])
return mi_contracts
def get_markets(self):
"""
获取市场代码
:return:
"""
self.connect()
result = self.api.get_markets()
return result
def get_transaction_data(self, symbol):
"""获取当前交易日的历史成交记录"""
ret_datas = []
max_data_size = sys.maxsize
symbol = symbol.upper()
if '99' in symbol:
symbol = symbol.replace('99', 'L9')
self.connect()
q_size = QSIZE * 5
# 每秒 2个 10小时
max_data_size = 1000000
self.write_log(u'开始下载{}当日分笔数据'.format(symbol))
try:
_datas = []
_pos = 0
while(True):
_res = self.api.get_transaction_data(
market=self.symbol_market_dict.get(symbol,0),
code=symbol,
start=_pos,
count=q_size)
if _res is not None:
for d in _res:
dt = d.pop('date')
# 星期1~星期6
if dt.hour >= 20 and 1< dt.isoweekday()<=6:
dt = dt - timedelta(days=1)
elif dt.hour >= 20 and dt.isoweekday() == 1:
# 星期一取得20点后数据
dt = dt - timedelta(days=3)
elif dt.hour < 8 and dt.isoweekday() == 1:
# 星期一取得8点前数据
dt = dt - timedelta(days=3)
elif dt.hour >= 20 and dt.isoweekday() == 7:
# 星期天取得20点后数据肯定是星期五夜盘
dt = dt - timedelta(days=2)
elif dt.isoweekday() == 7:
# 星期日取得其他时间,必然是 星期六凌晨的数据
dt = dt - timedelta(days=1)
d.update({'datetime': dt})
# 接口有bug返回价格*1000所以要除以1000
d.update({'price': d.get('price', 0) / 1000})
_datas = sorted(_res, key=lambda s: s['datetime']) + _datas
_pos += min(q_size,len(_res))
if _res is not None and len(_res) > 0:
self.write_log(u'分段取分笔数据:{} ~{}, {}条,累计:{}'.format( _res[0]['datetime'],_res[-1]['datetime'], len(_res),_pos))
else:
break
if len(_datas) >= max_data_size:
break
if len(_datas) == 0:
self.write_error(u'{}分笔成交数据获取为空')
return True, _datas
except Exception as ex:
self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc()))
self.write_error(u'当前异常服务器信息:{}'.format(self.best_ip))
self.write_log(u'重置连接')
self.api = None
self.connect(is_reconnect=True)
return False, ret_datas
def save_cache(self, cache_folder, cache_symbol, cache_date, data_list):
"""保存文件到缓存"""
os.makedirs(cache_folder, exist_ok=True)
if not os.path.exists(cache_folder):
self.write_error('缓存目录不存在:{},不能保存'.format(cache_folder))
return
cache_folder_year_month = os.path.join(cache_folder, cache_date[:6])
os.makedirs(cache_folder_year_month, exist_ok=True)
save_file = os.path.join(cache_folder_year_month, '{}_{}.pkz2'.format(cache_symbol, cache_date))
try:
with bz2.BZ2File(save_file, 'wb') as f:
pickle.dump(data_list, f)
self.write_log(u'缓存成功:{}'.format(save_file))
except Exception as ex:
self.write_error(u'缓存写入异常:{}'.format(str(ex)))
def load_cache(self, cache_folder, cache_symbol, cache_date):
"""加载缓存数据"""
if not os.path.exists(cache_folder):
self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder))
return None
cache_folder_year_month = os.path.join(cache_folder, cache_date[:6])
if not os.path.exists(cache_folder_year_month):
self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder_year_month))
return None
cache_file = os.path.join(cache_folder_year_month, '{}_{}.pkz2'.format(cache_symbol, cache_date))
if not os.path.isfile(cache_file):
self.write_error('缓存文件:{}不存在,不能读取'.format(cache_file))
return None
with bz2.BZ2File(cache_file, 'rb') as f:
data = pickle.load(f)
return data
return None
def get_history_transaction_data(self, symbol, date, cache_folder=None):
"""获取当某一交易日的历史成交记录"""
ret_datas = []
if isinstance(date, datetime):
date = date.strftime('%Y%m%d')
if isinstance(date, str):
date = int(date)
self.connect()
cache_symbol = symbol
cache_date = str(date)
max_data_size = sys.maxsize
symbol = symbol.upper()
if '99' in symbol:
symbol = symbol.replace('99', 'L9')
q_size = QSIZE * 5
# 每秒 2个 10小时
max_data_size = 1000000
# 优先从缓存加载
if cache_folder:
buffer_data = self.load_cache(cache_folder, cache_symbol, cache_date)
if buffer_data:
self.write_log(u'使用缓存文件')
return True, buffer_data
self.write_log(u'开始下载{} 历史{}分笔数据'.format(date, symbol))
cur_trading_date = get_trading_date()
if date == int(cur_trading_date.replace('-', '')):
return self.get_transaction_data(symbol)
try:
_datas = []
_pos = 0
while(True):
_res = self.api.get_history_transaction_data(
market=self.symbol_market_dict.get(symbol,0),
date=date,
code=symbol,
start=_pos,
count=q_size)
if _res is not None:
for d in _res:
dt = d.pop('date')
# 星期1~星期6
if dt.hour >= 20 and 1< dt.isoweekday()<=6:
dt = dt - timedelta(days=1)
d.update({'datetime':dt})
elif dt.hour >= 20 and dt.isoweekday() == 1:
# 星期一取得20点后数据
dt = dt - timedelta(days=3)
d.update({'datetime': dt})
elif dt.hour < 8 and dt.isoweekday() == 1:
# 星期一取得8点前数据
dt = dt - timedelta(days=3)
d.update({'datetime': dt})
elif dt.hour >= 20 and dt.isoweekday() == 7:
# 星期天取得20点后数据肯定是星期五夜盘
dt = dt - timedelta(days=2)
d.update({'datetime': dt})
elif dt.isoweekday() == 7:
# 星期日取得其他时间,必然是 星期六凌晨的数据
dt = dt - timedelta(days=1)
d.update({'datetime': dt})
else:
d.update({'datetime': dt})
# 接口有bug返回价格*1000所以要除以1000
d.update({'price': d.get('price', 0)/1000})
_datas = sorted(_res, key=lambda s: s['datetime']) + _datas
_pos += min(q_size, len(_res))
if _res is not None and len(_res) > 0:
self.write_log(u'分段取分笔数据:{} ~{}, {}条,累计:{}'.format( _res[0]['datetime'],_res[-1]['datetime'], len(_res),_pos))
else:
break
if len(_datas) >= max_data_size:
break
if len(_datas) == 0:
self.write_error(u'{}分笔成交数据获取为空'.format(date))
return False,_datas
# 缓存文件
if cache_folder:
self.save_cache(cache_folder, cache_symbol, cache_date, _datas)
return True, _datas
except Exception as ex:
self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc()))
self.write_error(u'当前异常服务器信息:{}'.format(self.best_ip))
self.write_log(u'重置连接')
self.api = None
self.connect(is_reconnect=True)
return False, ret_datas
class FakeStrategy(object):
def write_log(self, content, level=INFO):
if level == INFO:
print(content)
else:
print(content, file=sys.stderr)
def display_bar(self, bar, bar_is_completed=True, freq=1):
print(u'{} {}'.format(bar.vtSymbol, bar.datetime))
if __name__ == "__main__":
t1 = FakeStrategy()
t2 = FakeStrategy()
# 创建API对象
api_01 = TdxFutureData(t1)
markets = api_01.get_markets()
str_markets = json.dumps(markets, indent=1, ensure_ascii=False)
print(u'{}'.format(str_markets))
# 获取所有的期货合约明细
api_01.qryInstrument()
# 获取某个合约得最新价
price = api_01.get_price('rb2005')
print('price={}'.format(price))
exit(0)
# 获取主力合约
#result = api_01.get_mi_contracts()
#str_result = json.dumps(result,indent=1, ensure_ascii=False)
#print(str_result)
# 获取某个板块的合约
#result = api_01.get_contracts(exchange=EXCHANGE_CZCE)
# 获取某个板块的主力合约
#result = api_01.get_mi_contracts_from_exchange(exchange=EXCHANGE_CZCE)
# 获取主力合约(从各个板块组合获取)
#result = api_01.get_mi_contracts2()
#print(u'一共{}个记录:{}'.format(len(result), [c.get('code') for c in result]))
#str_result = json.dumps(result,indent=1, ensure_ascii=False)
#print(str_result)
#all_99_ticks= api_01.get_99_contracts()
#str_99_ticks = json.dumps(all_99_ticks, indent=1, ensure_ascii=False)
#print(u'{}'.format(str_99_ticks))
# 获取历史分钟线
"""
ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0))
line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars]
import pandas as pd
df = pd.DataFrame(line_close_oi)
corr = df.corr()
print(corr)
corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2)
"""
# api.get_bars(symbol, period='5min', callback=display_bar)
#api_01.get_bars('IF99', period='1day', callback=t1.display_bar)
#result,datas = api_01.get_transaction_data(symbol='ni1905')
#api_02 = TdxFutureData(t2)
#api_02.get_bars('IF99', period='1min', callback=t1.display_bar)
# 获取当前交易日分时数据
#ret,result = api_01.get_transaction_data('RB99')
#for r in result[0:10] + result[-10:]:
# print(r)
# 获取历史分时数据
ret,result = api_01.get_history_transaction_data('J99', '20191009')
for r in result[0:10] + result[-10:]:
print(r)

View File

@ -0,0 +1,402 @@
# encoding: UTF-8
# 从tdx下载股票数据.
# 收盘后的数据基本正确, 但盘中实时拿数据时:
# 1. 1Min的Bar可能不是最新的, 会缺几分钟.
# 2. 当周期>1Min时, 最后一根Bar可能不是完整的, 强制修改后
# - 5min修改后freq基本正确
# - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大
# - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错
# https://rainx.gitbooks.io/pytdx/content/pytdx_hq.html
# 华富资产
import sys, os, pickle, bz2, traceback
from datetime import datetime, timedelta
from logging import ERROR, INFO
from pytdx.hq import TdxHq_API
from pandas import to_datetime
from vnpy.trader.object import BarData
from vnpy.data.tdx.tdx_common import PERIOD_MAPPING, get_tdx_market_code
# 每个周期包含多少分钟
NUM_MINUTE_MAPPING = {}
NUM_MINUTE_MAPPING['1min'] = 1
NUM_MINUTE_MAPPING['5min'] = 5
NUM_MINUTE_MAPPING['15min'] = 15
NUM_MINUTE_MAPPING['30min'] = 30
NUM_MINUTE_MAPPING['1hour'] = 60
NUM_MINUTE_MAPPING['1day'] = 60*5.5 # 股票收盘时间是1500开盘是930
# 常量
QSIZE = 800
class TdxStockData(object):
best_ip = None
symbol_exchange_dict = {} # tdx合约与vn交易所的字典
symbol_market_dict = {} # tdx合约与tdx市场的字典
# ----------------------------------------------------------------------
def __init__(self, strategy=None):
"""
构造函数
:param strategy: 上层策略主要用与使用write_log
"""
self.api = None
self.connection_status = False # 连接状态
self.strategy = strategy
self.connect()
def write_log(self, content):
if self.strategy:
self.strategy.write_log(content)
else:
print(content)
def write_error(self, content):
if self.strategy:
self.strategy.write_log(content, level=ERROR)
else:
print(content, file=sys.stderr)
def connect(self):
"""
连接API
:return:
"""
# 创建api连接对象实例
try:
if self.api is None or self.connection_status == False:
self.write_log(u'开始连接通达信股票行情服务器')
self.api = TdxHq_API(heartbeat=True, auto_retry=True, raise_exception=True)
# 选取最佳服务器
if TdxStockData.best_ip is None:
from pytdx.util.best_ip import select_best_ip
TdxStockData.best_ip = select_best_ip()
self.api.connect(self.best_ip.get('ip'), self.best_ip.get('port'))
self.write_log(f'创建tdx连接, : {self.best_ip}')
TdxStockData.connection_status = True
except Exception as ex:
self.write_log(u'连接服务器tdx异常:{},{}'.format(str(ex), traceback.format_exc()))
return
def disconnect(self):
if self.api is not None:
self.api= None
# ----------------------------------------------------------------------
def get_bars(self, symbol, period, callback, bar_is_completed=False, bar_freq=1, start_dt=None):
"""
返回k线数据
symbol股票 000001.XG
period: 周期: 1min,5min,15min,30min,1hour,1day,
"""
if self.api is None:
self.connect()
# 新版一劳永逸偷懒写法zzz
if '.' in symbol:
tdx_code,market_str = symbol.split('.')
market_code = 1 if market_str.upper()== 'XSHG' else 0
self.symbol_exchange_dict.update({tdx_code:symbol}) # tdx合约与vn交易所的字典
self.symbol_market_dict.update({tdx_code:market_code}) # tdx合约与tdx市场的字典
else:
market_code = get_tdx_market_code(symbol)
tdx_code = symbol
self.symbol_exchange_dict.update({symbol: symbol}) # tdx合约与vn交易所的字典
self.symbol_market_dict.update({symbol: market_code}) # tdx合约与tdx市场的字典
# https://github.com/rainx/pytdx/issues/33
# 0 - 深圳, 1 - 上海
ret_bars = []
if period not in PERIOD_MAPPING.keys():
self.write_log(u'{} 周期{}不在下载清单中: {}'.format(datetime.now(), period, list(PERIOD_MAPPING.keys())), level=ERROR)
# print(u'{} 周期{}不在下载清单中: {}'.format(datetime.now(), period, list(PERIOD_MAPPING.keys())))
return False,ret_bars
if self.api is None:
return False,ret_bars
tdx_period = PERIOD_MAPPING.get(period)
if start_dt is None:
self.write_log(u'没有设置开始时间缺省为10天前')
qry_start_date = datetime.now() - timedelta(days=10)
start_dt = qry_start_date
else:
qry_start_date = start_dt
end_date = datetime.now()
if qry_start_date > end_date:
qry_start_date = end_date
self.write_log('{}开始下载tdx股票:{} {}数据, {} to {}.'.format(datetime.now(), tdx_code, tdx_period, qry_start_date, end_date))
try:
_start_date = end_date
_bars = []
_pos = 0
while _start_date > qry_start_date:
_res = self.api.get_security_bars(category=PERIOD_MAPPING[period],
market=market_code,
code=tdx_code,
start=_pos,
count=QSIZE)
if _res is not None:
_bars = _res + _bars
_pos += QSIZE
if _res is not None and len(_res) > 0:
_start_date = _res[0]['datetime']
_start_date = datetime.strptime(_start_date, '%Y-%m-%d %H:%M')
self.write_log(u'分段取数据开始:{}'.format(_start_date))
else:
break
if len(_bars) == 0:
self.write_error('{} Handling {}, len1={}..., continue'.format(
str(datetime.now()), tdx_code, len(_bars)))
return False, ret_bars
current_datetime = datetime.now()
data = self.api.to_df(_bars)
data = data.assign(datetime=to_datetime(data['datetime']))
data = data.assign(ticker=symbol)
data['symbol'] = symbol
data = data.drop(
['year', 'month', 'day', 'hour', 'minute', 'price', 'ticker'],
errors='ignore',
axis=1)
data = data.rename(
index=str,
columns={'amount': 'volume',
})
if len(data) == 0:
print('{} Handling {}, len2={}..., continue'.format(
str(datetime.now()), tdx_code, len(data)))
return False, ret_bars
# 通达信是以bar的结束时间标记的vnpy是以bar开始时间标记的,所以要扣减bar本身的分钟数
data['datetime'] = data['datetime'].apply(lambda x:x-timedelta(minutes=NUM_MINUTE_MAPPING.get(period,1)))
data['trading_date'] = data['datetime'].apply(lambda x: (x.strftime('%Y-%m-%d')))
data['date'] = data['datetime'].apply(lambda x: (x.strftime('%Y-%m-%d')))
data['time'] = data['datetime'].apply(lambda x: (x.strftime('%H:%M:%S')))
data = data.set_index('datetime', drop=False)
for index, row in data.iterrows():
add_bar = BarData()
try:
add_bar.symbol = symbol
add_bar.datetime = index
add_bar.date = row['date']
add_bar.time = row['time']
add_bar.trading_date = row['trading_date']
add_bar.open = float(row['open'])
add_bar.high = float(row['high'])
add_bar.low = float(row['low'])
add_bar.close = float(row['close'])
add_bar.volume = float(row['volume'])
except Exception as ex:
self.write_error('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc()))
# print('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc()))
return False
if start_dt is not None and index < start_dt:
continue
ret_bars.append(add_bar)
if callback is not None:
freq = bar_freq
bar_is_completed = True
if period != '1min' and index == data['datetime'][-1]:
# 最后一个bar可能是不完整的强制修改
# - 5min修改后freq基本正确
# - 1day在VNPY合成时不关心已经收到多少Bar, 所以影响也不大
# - 但其它分钟周期因为不好精确到每个品种, 修改后的freq可能有错
if index > current_datetime:
bar_is_completed = False
# 根据秒数算的话,要+1例如13:31,freq=31第31根bar
freq = NUM_MINUTE_MAPPING[period] - int((index - current_datetime).total_seconds() / 60)
callback(add_bar, bar_is_completed, freq)
return True,ret_bars
except Exception as ex:
self.write_error('exception in get:{},{},{}'.format(tdx_code,str(ex), traceback.format_exc()))
# print('exception in get:{},{},{}'.format(tdx_symbol,str(ex), traceback.format_exc()))
self.write_log(u'重置连接')
TdxStockData.api = None
self.connect()
return False, ret_bars
def save_cache(self, cache_folder, cache_symbol, cache_date, data_list):
"""保存文件到缓存"""
os.makedirs(cache_folder,exist_ok=True)
if not os.path.exists(cache_folder):
self.write_error('缓存目录不存在:{},不能保存'.format(cache_folder))
return
cache_folder_year_month = os.path.join(cache_folder, cache_date[:6])
os.makedirs(cache_folder_year_month, exist_ok=True)
save_file = os.path.join(cache_folder_year_month, '{}_{}.pkb2'.format(cache_symbol, cache_date))
with bz2.BZ2File(save_file, 'wb') as f:
pickle.dump(data_list, f)
self.write_log(u'缓存成功:{}'.format(save_file))
def load_cache(self, cache_folder, cache_symbol, cache_date):
"""加载缓存数据"""
if not os.path.exists(cache_folder):
self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder))
return None
cache_folder_year_month = os.path.join(cache_folder, cache_date[:6])
if not os.path.exists(cache_folder_year_month):
self.write_error('缓存目录:{}不存在,不能读取'.format(cache_folder_year_month))
return None
cache_file = os.path.join(cache_folder_year_month, '{}_{}.pkb2'.format(cache_symbol, cache_date))
if not os.path.isfile(cache_file):
self.write_error('缓存文件:{}不存在,不能读取'.format(cache_file))
return None
with bz2.BZ2File(cache_file, 'rb') as f:
data = pickle.load(f)
return data
return None
def get_history_transaction_data(self, symbol, date, cache_folder=None):
"""获取当某一交易日的历史成交记录"""
ret_datas = []
if isinstance(date, datetime):
date = date.strftime('%Y%m%d')
if isinstance(date, str):
date = int(date)
cache_symbol = symbol
cache_date = str(date)
max_data_size = sys.maxsize
# symbol.exchange => tdx_code market_code
if '.' in symbol:
tdx_code, market_str = symbol.split('.')
market_code = 1 if market_str.upper() == 'XSHG' else 0
self.symbol_exchange_dict.update({tdx_code: symbol}) # tdx合约与vn交易所的字典
self.symbol_market_dict.update({tdx_code: market_code}) # tdx合约与tdx市场的字典
else:
market_code = get_tdx_market_code(symbol)
tdx_code = symbol
self.symbol_exchange_dict.update({symbol: symbol}) # tdx合约与vn交易所的字典
self.symbol_market_dict.update({symbol: market_code}) # tdx合约与tdx市场的字典
q_size = QSIZE * 5
# 每秒 2个 10小时
max_data_size = 1000000
# 优先从缓存加载
if cache_folder:
buffer_data = self.load_cache(cache_folder, cache_symbol, cache_date)
if buffer_data:
return True, buffer_data
self.write_log(u'开始下载{} 历史{}分笔数据'.format(date, symbol))
is_today = False
if date == int(datetime.now().strftime('%Y%m%d')):
is_today = True
try:
_datas = []
_pos = 0
while(True):
if is_today:
_res = self.api.get_transaction_data(
market=self.symbol_market_dict[symbol],
code=symbol,
start=_pos,
count=q_size)
else:
_res = self.api.get_history_transaction_data(
market=self.symbol_market_dict[symbol],
date=date,
code=symbol,
start=_pos,
count=q_size)
last_dt = None
if _res is not None:
_datas = _res + _datas
_pos += min(q_size, len(_res))
if _res is not None and len(_res) > 0:
self.write_log(u'分段取{}分笔数据:{} ~{}, {}条,累计:{}'.format(date, _res[0]['time'],_res[-1]['time'], len(_res),_pos))
else:
break
if len(_datas) >= max_data_size:
break
if len(_datas) == 0:
self.write_error(u'{}分笔成交数据获取为空'.format(date))
return False,_datas
for d in _datas:
dt = datetime.strptime(str(date) + ' ' + d.get('time'), '%Y%m%d %H:%M')
if last_dt is None or last_dt < dt:
last_dt = dt
else:
if last_dt < dt + timedelta(seconds=59):
last_dt = last_dt + timedelta(seconds=1)
d.update({'datetime': last_dt})
d.update({'volume': d.pop('vol',0)})
d.update({'trading_date': last_dt.strftime('%Y-%m-%d')})
_datas = sorted(_datas, key=lambda s: s['datetime'])
# 缓存文件
if cache_folder and not is_today:
self.save_cache(cache_folder, cache_symbol, cache_date, _datas)
return True, _datas
except Exception as ex:
self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc()))
return False, ret_datas
if __name__ == "__main__":
class T(object):
def write_log(self,content, level=INFO):
if level == INFO:
print(content)
else:
print(content,file=sys.stderr)
def display_bar(self,bar, bar_is_completed=True, freq=1):
print(u'{} {}'.format(bar.vtSymbol,bar.datetime))
t1 = T()
t2 = T()
# 创建API对象
api_01 = TdxStockData(t1)
# 获取历史分钟线
#api_01.get_bars('002024', period='1hour', callback=t1.display_bar)
# api.get_bars(symbol, period='5min', callback=display_bar)
# api.get_bars(symbol, period='1day', callback=display_bar)
#api_02 = TdxData(t2)
#api_02.get_bars('601390', period='1day', callback=t1.display_bar)
# 获取历史分时数据
# ret,result = api_01.get_history_transaction_data('RB99', '20190909')
# for r in result[0:10] + result[-10:]:
# print(r)
# 获取历史分时数据
ret, result = api_01.get_history_transaction_data('600410', '20190925')
for r in result[0:10] + result[-10:]:
print(r)