[增强] 增加主力合约查询/更新本地缓存

This commit is contained in:
msincenselee 2019-12-26 10:16:14 +08:00
parent 301326453a
commit 3912b31e83
5 changed files with 232 additions and 132 deletions

View File

@ -6,6 +6,7 @@ import pickle
import bz2 import bz2
from functools import lru_cache from functools import lru_cache
from logging import INFO, ERROR from logging import INFO, ERROR
from vnpy.trader.utility import load_json, save_json
@lru_cache() @lru_cache()
@ -65,6 +66,15 @@ TDX_FUTURE_HOSTS = [
{'ip': '58.246.109.27', 'port': 7721, "name": "备用服务器3"}] {'ip': '58.246.109.27', 'port': 7721, "name": "备用服务器3"}]
def get_future_contracts():
"""获取期货合约信息"""
return get_cache_json('future_contracts.json')
def save_future_contracts(future_contracts_dict: dict):
"""保存期货合约信息"""
save_cache_json(future_contracts_dict, 'future_contracts.json')
def get_cache_config(config_file_name): def get_cache_config(config_file_name):
"""获取本地缓存的配置地址信息""" """获取本地缓存的配置地址信息"""
config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), config_file_name)) config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), config_file_name))
@ -74,7 +84,7 @@ def get_cache_config(config_file_name):
with bz2.BZ2File(config_file_name, 'rb') as f: with bz2.BZ2File(config_file_name, 'rb') as f:
config = pickle.load(f) config = pickle.load(f)
return config return config
return config
def save_cache_config(data: dict, config_file_name): def save_cache_config(data: dict, config_file_name):
"""保存本地缓存的配置地址信息""" """保存本地缓存的配置地址信息"""
@ -84,13 +94,27 @@ def save_cache_config(data: dict, config_file_name ):
pickle.dump(data, f) pickle.dump(data, f)
def get_cache_json(json_file_name: str):
"""获取本地缓存的json配置信息"""
config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), json_file_name))
return load_json(config_file_name)
def save_cache_json(data_dict: dict, json_file_name: str):
"""保存本地缓存的JSON配置信息"""
config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), json_file_name))
save_json(filename=config_file_name, data=data_dict)
class FakeStrategy(object): class FakeStrategy(object):
"""制作一个假得策略,用于测试""" """制作一个假得策略,用于测试"""
def write_log(self, content, level=INFO): def write_log(self, content, level=INFO):
if level == INFO: if level == INFO:
print(content) print(content)
else: else:
print(content, file=sys.stderr) print(content, file=sys.stderr)
def write_error(self, content): def write_error(self, content):
self.write_log(content, level=ERROR) self.write_log(content, level=ERROR)

View File

@ -25,9 +25,19 @@ from pytdx.exhq import TdxExHq_API
from vnpy.trader.constant import Exchange from vnpy.trader.constant import Exchange
from vnpy.trader.object import BarData from vnpy.trader.object import BarData
from vnpy.trader.utility import (get_underlying_symbol, get_full_symbol, get_trading_date, load_json, save_json) from vnpy.trader.utility import (
from vnpy.data.tdx.tdx_common import (lru_cache, TDX_FUTURE_HOSTS, PERIOD_MAPPING) get_underlying_symbol,
get_full_symbol,
get_trading_date,
get_real_symbol_by_exchange)
from vnpy.data.tdx.tdx_common import (
lru_cache,
TDX_FUTURE_HOSTS,
PERIOD_MAPPING,
get_future_contracts,
save_future_contracts,
get_cache_json,
save_cache_json)
# 每个周期包含多少分钟 (估算值, 没考虑夜盘和10:15的影响) # 每个周期包含多少分钟 (估算值, 没考虑夜盘和10:15的影响)
NUM_MINUTE_MAPPING: Dict[str, int] = {} NUM_MINUTE_MAPPING: Dict[str, int] = {}
@ -66,17 +76,7 @@ QSIZE = 500
ALL_MARKET_BEGIN_HOUR = 8 ALL_MARKET_BEGIN_HOUR = 8
ALL_MARKET_END_HOUR = 16 ALL_MARKET_END_HOUR = 16
TDX_FUTURE_CONFIG = 'tdx_future_config.json'
def get_cache_ip():
"""获取本地缓存的最快IP地址信息"""
config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tdx_config.json'))
return load_json(config_file_name)
def save_cache_ip(best_ip: dict):
"""保存本地缓存的最快IP地址信息"""
config_file_name = os.path.abspath(os.path.join(os.path.dirname(__file__), 'tdx_config.json'))
save_json(filename=config_file_name, data=best_ip)
@lru_cache() @lru_cache()
def get_tdx_marketid(symbol): def get_tdx_marketid(symbol):
@ -103,6 +103,7 @@ class TdxFutureData(object):
self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典 self.symbol_exchange_dict = {} # tdx合约与vn交易所的字典
self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典 self.symbol_market_dict = copy.copy(INIT_TDX_MARKET_MAP) # tdx合约与tdx市场的字典
self.strategy = strategy self.strategy = strategy
self.future_contracts = get_future_contracts()
def write_log(self, content): def write_log(self, content):
if self.strategy: if self.strategy:
@ -130,7 +131,7 @@ class TdxFutureData(object):
# 选取最佳服务器 # 选取最佳服务器
if is_reconnect or len(self.best_ip) == 0: if is_reconnect or len(self.best_ip) == 0:
self.best_ip = get_cache_ip() self.best_ip = get_cache_json(TDX_FUTURE_CONFIG)
if len(self.best_ip) == 0: if len(self.best_ip) == 0:
self.best_ip = self.select_best_ip() self.best_ip = self.select_best_ip()
@ -192,7 +193,7 @@ class TdxFutureData(object):
self.write_log(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port'])) self.write_log(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port']))
# print(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port'])) # print(u'选取 {}:{}'.format(best_future_ip['ip'], best_future_ip['port']))
save_cache_ip(best_future_ip) save_cache_json(best_future_ip, TDX_FUTURE_CONFIG)
return best_future_ip return best_future_ip
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
@ -207,7 +208,8 @@ class TdxFutureData(object):
if not isinstance(num, int): if not isinstance(num, int):
return return
all_contacts = sum([self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], []) all_contacts = sum(
[self.api.get_instrument_info((int(num / 500) - i) * 500, 500) for i in range(int(num / 500) + 1)], [])
# [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}] # [{"category":category,"market": int,"code":sting,"name":string,"desc":string},{}]
# 对所有合约处理,更新字典 指数合约-tdx市场指数合约-交易所 # 对所有合约处理,更新字典 指数合约-tdx市场指数合约-交易所
@ -348,7 +350,8 @@ class TdxFutureData(object):
add_bar.volume = float(row['volume']) add_bar.volume = float(row['volume'])
add_bar.openInterest = float(row['open_interest']) add_bar.openInterest = float(row['open_interest'])
except Exception as ex: except Exception as ex:
self.write_error('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) self.write_error(
'error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc()))
# print('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc())) # print('error when convert bar:{},ex:{},t:{}'.format(row, str(ex), traceback.format_exc()))
return False return False
@ -561,7 +564,8 @@ class TdxFutureData(object):
return True, _datas return True, _datas
except Exception as ex: except Exception as ex:
self.write_error('exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) self.write_error(
'exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc()))
self.write_error(u'当前异常服务器信息:{}'.format(self.best_ip)) self.write_error(u'当前异常服务器信息:{}'.format(self.best_ip))
self.write_log(u'重置连接') self.write_log(u'重置连接')
self.api = None self.api = None
@ -656,7 +660,7 @@ class TdxFutureData(object):
while True: while True:
_res = self.api.get_history_transaction_data( _res = self.api.get_history_transaction_data(
market=self.symbol_market_dict.get(tdx_index_symbol, 0), market=self.symbol_market_dict.get(tdx_index_symbol, 0),
trading_date=trading_date, date=trading_date,
code=symbol, code=symbol,
start=_pos, start=_pos,
count=q_size) count=q_size)
@ -718,67 +722,47 @@ class TdxFutureData(object):
self.connect(is_reconnect=True) self.connect(is_reconnect=True)
return False, ret_datas return False, ret_datas
def update_mi_contracts(self):
# 连接通达信,获取主力合约
if not self.api:
self.connect()
if __name__ == "__main__": mi_contract_quote_list = self.get_mi_contracts2()
from .tdx_common import FakeStrategy
t1 = FakeStrategy()
t2 = FakeStrategy()
# 创建API对象
api_01 = TdxFutureData(t1)
markets = api_01.get_markets() self.write_log(u'一共获取:{}个主力合约:{}'.format(len(mi_contract_quote_list), [c.get('code') for c in mi_contract_quote_list]))
str_markets = json.dumps(markets, indent=1, ensure_ascii=False) should_save = False
print(u'{}'.format(str_markets)) # 逐一更新主力合约数据
for mi_contract in mi_contract_quote_list:
tdx_market_id = mi_contract.get('market')
full_symbol = mi_contract.get('code')
underlying_symbol = get_underlying_symbol(full_symbol).upper()
if underlying_symbol in ['SC', 'NR']:
vn_exchange = Exchange.INE
else:
vn_exchange = Tdx_Vn_Exchange_Map.get(str(tdx_market_id))
mi_symbol = get_real_symbol_by_exchange(full_symbol, vn_exchange)
# 获取所有的期货合约明细 # 更新登记 短合约:真实主力合约
api_01.qryInstrument() self.write_log('{},{},{},{},{}'.format(tdx_market_id, full_symbol, underlying_symbol, mi_symbol, vn_exchange))
if underlying_symbol in self.future_contracts:
info = self.future_contracts.get(underlying_symbol)
if mi_symbol > info.get('mi_symbol') :
self.write_log(u'主力合约变化:{} =>{}'.format(info.get('mi_symbol'), mi_symbol))
info.update({'mi_symbol': mi_symbol, 'full_symbol': full_symbol})
self.future_contracts.update({underlying_symbol: info})
should_save = True
else:
# 添加到新合约中
# todo 这里缺少size和price_tick
info = {
"underlying_symbol": underlying_symbol,
"mi_symbol": mi_symbol,
"full_symbol": full_symbol,
"exchange": vn_exchange.value
}
self.write_log(u'新合约:{}'.format(info))
self.future_contracts.update({underlying_symbol: info})
should_save = True
# 获取某个合约得最新价 if should_save:
price = api_01.get_price('rb2005') save_future_contracts(self.future_contracts)
print('price={}'.format(price))
exit(0)
# 获取主力合约
# result = api_01.get_mi_contracts()
# str_result = json.dumps(result,indent=1, ensure_ascii=False)
# print(str_result)
# 获取某个板块的合约
# result = api_01.get_contracts(exchange=EXCHANGE_CZCE)
# 获取某个板块的主力合约
# result = api_01.get_mi_contracts_from_exchange(exchange=EXCHANGE_CZCE)
# 获取主力合约(从各个板块组合获取)
# result = api_01.get_mi_contracts2()
# print(u'一共{}个记录:{}'.format(len(result), [c.get('code') for c in result]))
# str_result = json.dumps(result,indent=1, ensure_ascii=False)
# print(str_result)
# all_99_ticks= api_01.get_99_contracts()
# str_99_ticks = json.dumps(all_99_ticks, indent=1, ensure_ascii=False)
# print(u'{}'.format(str_99_ticks))
# 获取历史分钟线
"""
ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0))
line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars]
import pandas as pd
df = pd.DataFrame(line_close_oi)
corr = df.corr()
print(corr)
corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2)
"""
# api.get_bars(symbol, period='5min', callback=display_bar)
# api_01.get_bars('IF99', period='1day', callback=t1.display_bar)
# result,datas = api_01.get_transaction_data(symbol='ni1905')
# api_02 = TdxFutureData(t2)
# api_02.get_bars('IF99', period='1min', callback=t1.display_bar)
# 获取当前交易日分时数据
# ret,result = api_01.get_transaction_data('RB99')
# for r in result[0:10] + result[-10:]:
# print(r)
# 获取历史分时数据
ret, result = api_01.get_history_transaction_data('rb1905', '20190109')
for r in result[0:10] + result[-10:]:
print(r)

View File

@ -41,7 +41,6 @@ NUM_MINUTE_MAPPING['1day'] = 60 * 5.5 # 股票收盘时间是1500
# 常量 # 常量
QSIZE = 800 QSIZE = 800
STOCK_CONFIG_FILE = 'tdx_stock_config.pkb2' STOCK_CONFIG_FILE = 'tdx_stock_config.pkb2'
# 通达信 <=> 交易所代码 映射 # 通达信 <=> 交易所代码 映射
@ -61,8 +60,6 @@ RQ_TDX_STOCK_MARKET_MAP = {v: k for k, v in TDX_RQ_STOCK_MARKET_MAP.items()}
class TdxStockData(object): class TdxStockData(object):
# ----------------------------------------------------------------------
def __init__(self, strategy=None): def __init__(self, strategy=None):
""" """
构造函数 构造函数
@ -172,6 +169,7 @@ class TdxStockData(object):
results.extend(result) results.extend(result)
return results return results
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def get_bars(self, def get_bars(self,
symbol: str, symbol: str,
@ -184,7 +182,7 @@ class TdxStockData(object):
symbol股票 000001.XG symbol股票 000001.XG
period: 周期: 1min,5min,15min,30min,1hour,1day, period: 周期: 1min,5min,15min,30min,1hour,1day,
""" """
if self.api is None: if not self.api:
self.connect() self.connect()
ret_bars = [] ret_bars = []
if self.api is None: if self.api is None:
@ -375,7 +373,11 @@ class TdxStockData(object):
:param cache_folder: :param cache_folder:
:return: :return:
""" """
if not self.api:
self.connect()
ret_datas = [] ret_datas = []
# trading_date ,转为为查询数字类型 # trading_date ,转为为查询数字类型
if isinstance(trading_date, datetime): if isinstance(trading_date, datetime):
trading_date = trading_date.strftime('%Y%m%d') trading_date = trading_date.strftime('%Y%m%d')
@ -429,7 +431,7 @@ class TdxStockData(object):
# 获取历史交易记录 # 获取历史交易记录
_res = self.api.get_history_transaction_data( _res = self.api.get_history_transaction_data(
market=self.symbol_market_dict[symbol], market=self.symbol_market_dict[symbol],
trading_date=trading_date, date=trading_date,
code=symbol, code=symbol,
start=_pos, start=_pos,
count=q_size) count=q_size)
@ -474,45 +476,3 @@ class TdxStockData(object):
self.write_error( self.write_error(
'exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc())) 'exception in get_transaction_data:{},{},{}'.format(symbol, str(ex), traceback.format_exc()))
return False, ret_datas return False, ret_datas
if __name__ == "__main__":
from tdx_common import FakeStrategy
import json
t1 = FakeStrategy()
t2 = FakeStrategy()
# 创建API对象
api_01 = TdxStockData(t1)
# 获取市场下股票
for market_id in range(2):
print('get market_id:{}'.format(market_id))
security_list = api_01.get_security_list(market_id)
if len(security_list) == 0:
continue
for security in security_list:
if security.get('code', '').startswith('12') or u'转债' in security.get('name', ''):
str_security = json.dumps(security, indent=1, ensure_ascii=False)
print('market_id:{},{}'.format(market_id, str_security))
# str_markets = json.dumps(security_list, indent=1, ensure_ascii=False)
# print(u'{}'.format(str_markets))
# 获取历史分钟线
# api_01.get_bars('002024', period='1hour', callback=t1.display_bar)
# api.get_bars(symbol, period='5min', callback=display_bar)
# api.get_bars(symbol, period='1day', callback=display_bar)
# api_02 = TdxData(t2)
# api_02.get_bars('601390', period='1day', callback=t1.display_bar)
# 获取历史分时数据
# ret,result = api_01.get_history_transaction_data('RB99', '20190909')
# for r in result[0:10] + result[-10:]:
# print(r)
# 获取历史分时数据
ret, result = api_01.get_history_transaction_data('600410', '20190925')
for r in result[0:10] + result[-10:]:
print(r)

View File

@ -0,0 +1,79 @@
# flake8: noqa
import os
import sys
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.tdx.tdx_common import FakeStrategy
from vnpy.data.tdx.tdx_future_data import *
t1 = FakeStrategy()
t2 = FakeStrategy()
# 创建API对象
api_01 = TdxFutureData(t1)
# 获取所有市场信息
markets = api_01.get_markets()
str_markets = json.dumps(markets, indent=1, ensure_ascii=False)
print(u'{}'.format(str_markets))
# 获取所有的期货合约明细
api_01.qryInstrument()
# 获取某个合约得最新价
# price = api_01.get_price('rb2005')
# print('price={}'.format(price))
# 获取主力合约
# result = api_01.get_mi_contracts()
# str_result = json.dumps(result,indent=1, ensure_ascii=False)
# print(str_result)
# 获取某个板块的合约
# result = api_01.get_contracts(exchange=Exchange.CZCE)
# 获取某个板块的主力合约
# result = api_01.get_mi_contracts_from_exchange(exchange=Exchange.CZCE)
# 获取主力合约(从各个板块组合获取)
# result = api_01.get_mi_contracts2()
# print(u'一共{}个记录:{}'.format(len(result), [c.get('code') for c in result]))
# str_result = json.dumps(result,indent=1, ensure_ascii=False)
# print(str_result)
# all_99_ticks= api_01.get_99_contracts()
# str_99_ticks = json.dumps(all_99_ticks, indent=1, ensure_ascii=False)
# print(u'{}'.format(str_99_ticks))
# 获取历史分钟线
"""
ret,bars = api_01.get_bars('I2001', period='1min', callback=t1.display_bar, start_dt=datetime.now().replace(hour=0,minute=0,second=0,microsecond=0))
line_close_oi = [{'close':x.close,'oi':x.openInterest} for x in bars]
import pandas as pd
df = pd.DataFrame(line_close_oi)
corr = df.corr()
print(corr)
corr_rate = round(abs(corr.iloc[0, 1]) * 100, 2)
"""
# api.get_bars(symbol, period='5min', callback=display_bar)
# api_01.get_bars('IF99', period='1day', callback=t1.display_bar)
# result,datas = api_01.get_transaction_data(symbol='ni1905')
# api_02 = TdxFutureData(t2)
# api_02.get_bars('IF99', period='1min', callback=t1.display_bar)
# 获取当前交易日分时数据
# ret,result = api_01.get_transaction_data('RB99')
# for r in result[0:10] + result[-10:]:
# print(r)
# 获取历史分时数据
# ret, result = api_01.get_history_transaction_data('RB99', '20190109')
# for r in result[0:10] + result[-10:]:
# print(r)
# 更新本地合约缓存信息
api_01.update_mi_contracts()

View File

@ -0,0 +1,53 @@
# flake8: noqa
import os
import sys
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.tdx.tdx_common import FakeStrategy
from vnpy.data.tdx.tdx_stock_data import *
os.environ["VNPY_TESTING"] = "1"
import json
t1 = FakeStrategy()
t2 = FakeStrategy()
# 创建API对象
api_01 = TdxStockData(t1)
# 获取市场下股票
for market_id in range(2):
print('get market_id:{}'.format(market_id))
security_list = api_01.get_security_list(market_id)
if len(security_list) == 0:
continue
for security in security_list:
if security.get('code', '').startswith('12') or u'转债' in security.get('name', ''):
str_security = json.dumps(security, indent=1, ensure_ascii=False)
print('market_id:{},{}'.format(market_id, str_security))
# str_markets = json.dumps(security_list, indent=1, ensure_ascii=False)
# print(u'{}'.format(str_markets))
# 获取历史分钟线
# api_01.get_bars('002024', period='1hour', callback=t1.display_bar)
# api.get_bars(symbol, period='5min', callback=display_bar)
# api.get_bars(symbol, period='1day', callback=display_bar)
# api_02 = TdxData(t2)
# api_02.get_bars('601390', period='1day', callback=t1.display_bar)
# 获取历史分时数据
# ret,result = api_01.get_history_transaction_data('RB99', '20190909')
# for r in result[0:10] + result[-10:]:
# print(r)
# 获取历史分时数据
ret, result = api_01.get_history_transaction_data('600410', '20190925')
for r in result[0:10] + result[-10:]:
print(r)