[增强功能] 天勤行情支持

This commit is contained in:
msincenselee 2020-07-20 10:25:37 +08:00
parent 338db92c03
commit 764f70ca16
7 changed files with 729 additions and 81 deletions

132
README.md
View File

@ -10,38 +10,60 @@ github 链接: https://github.com/msincenselee/vnpy
gitee 链接: https://gitee.com/vnpy2/vnpy gitee 链接: https://gitee.com/vnpy2/vnpy
###Fork版本主要改进如下 ###Fork版本主要改进如下
1、 事件引擎,增加运行效率调试功能 15、天勤行情接入
2、 增加rabbitMQ通信组件 - vnpy.data.tq 定制downloder扩展下载字段
- prod.jobs.refill_tq_future.ticks 下载tick
- vnpy.gateway.ctp.ctp_gateway 扩展支持上期所的五档行情和指数行情。在配置文件中增加'tq':{} 即可。
3、 增加tdx 免费数据源,包括 14、GUI界面增强
- 交易界面恢复部分v1版本的快捷功能如快速平仓
- 策略运行界面,增加'保存’,'K线' 按钮保存策略内部数据保存切片查看最新切片K线。
- K线切片,支持同一策略内多周期、多品种K线。
- 修改接口连接配置,采用更新方式,替代覆盖。
- 提供主力合约/指数合约的信息获取 13、 增加App: cta_stock, 包括:
- 提供期货/股票数据bar 和分笔成交数据下载
- 提供每日增量更新期货数据=> csv文件可配合NFS+Celery实现分布式回测
4、 增加App: tick_recorder, 直接异步写入csv文件 - 增加baostock数据源可下载股票基本信息复权因子非复权5Min数据k线满足大部分Cta策略的回测了。
- 使用tdx的历史逐笔成交数据可缓存每日数据=>pkb2文件支持tick回测。
- 独立的CTA引擎 cta_stock运行股票CTA策略时替代原版cta_strategy引擎
- 提供股票专用模板,支持目标股票买入卖出,市场盘面算法交易,支持策略多股票持久化
- 支持策略中获取账号资金/可用余额/当前仓位/风控仓位
- 支持策略中获取账号所有股票持仓
- 支持bar/tick方式回测/组合回测
- 支持可转债日内交易回测,支持动态前复权。
- 支持盘前复权信息事件【待更新】
5、 增加App: index_tick_publisher, 订阅通达信指数行情=》rabbit_mq 推送 12、 增加App: cta_crypto包括
6、 增强ctp_gateway包括: - 增加币安合约交易vnpy.gateway.binancef支持每个合约独立杠杆比率
- 增肌币安合约数据接口 vnpy.data.binance.binance_future_data
- 独立的CTA引擎 cta_crypto运行数字货币时替代原版cta_strategy引擎。
- 支持bar方式回测/组合回测
- 增强期货交易模板
- 修正vnpy.gateway.binance现货网关恢复position
- 提供指数行情订阅 11、算法引擎
- 使用RabbitMQ指数源或tdx单一数据源
- 提供自定义合约功能实时提供其合成后的tick行情
7、 增加component组件包括: - 支持自定义套利合约得算法,及算法下单。
- 可通过vnpy界面/cta_strategy_pro策略直接发出套利单由算法引擎执行
10、增加App: account_recorder 包括:
- 异步更新账号资金/委托/成交信息至Mongo数据库
- 异步更新策略持仓数据至Mongo数据库
- 异步查询股票历史委托/历史成交至Mongo数据库
9、 增强主引擎,包括:
- 支持同一类gateway多个接入配置
- 增加获取当前价格接口
- 增加风控引擎入口 self.rm_engine
- 增加算法引擎入口,支持自定义套利合约得手工/程序化交易转移至算法引擎实现
- 提供cta_line_bar k线组件支持国内文华/交易师/TB等分钟/小时的计算模式,支持任意秒/分钟/小时/天/周等周期支持k线数据实时生成。
- 提供cta_renko_bar k线组件支持x跳动/千分比跳动
- 提供cta_fund_kline 资金曲线组件,策略实例/账号基本的实时资金曲线
- 提供cta_position 组件,支持多/空/净仓位记录,支持套利
- 提供cta_policy 组件,持久化复杂的策略执行逻辑
- 提供cta_period 组件,支持策略中‘周期’的逻辑
- 提供cta_grid_trade组件支持网格交易、复杂的策略持仓逻辑、持久化
8、 增加App: cta_strategy_pro包括 8、 增加App: cta_strategy_pro包括
@ -60,56 +82,52 @@ gitee 链接: https://gitee.com/vnpy2/vnpy
- 增加CtaSpread模板支持FAK正套/反套 - 增加CtaSpread模板支持FAK正套/反套
- 增加Spread组合引擎tick级别回测支持多策略实例得套利共享账号回测。 - 增加Spread组合引擎tick级别回测支持多策略实例得套利共享账号回测。
9、 增强主引擎,包括:
- 支持同一类gateway多个接入配置
- 增加获取当前价格接口
- 增加风控引擎入口 self.rm_engine
- 增加算法引擎入口,支持自定义套利合约得手工/程序化交易转移至算法引擎实现
10、增加App: account_recorder 包括:
- 异步更新账号资金/委托/成交信息至Mongo数据库
- 异步更新策略持仓数据至Mongo数据库
- 异步查询股票历史委托/历史成交至Mongo数据库
11、算法引擎
- 支持自定义套利合约得算法,及算法下单。 7、 增加component组件包括:
- 可通过vnpy界面/cta_strategy_pro策略直接发出套利单由算法引擎执行
12、 增加App: cta_crypto包括
- 增加币安合约交易vnpy.gateway.binancef支持每个合约独立杠杆比率 - 提供cta_line_bar k线组件支持国内文华/交易师/TB等分钟/小时的计算模式,支持任意秒/分钟/小时/天/周等周期支持k线数据实时生成。
- 增肌币安合约数据接口 vnpy.data.binance.binance_future_data - 提供cta_renko_bar k线组件支持x跳动/千分比跳动
- 独立的CTA引擎 cta_crypto运行数字货币时替代原版cta_strategy引擎。 - 提供cta_fund_kline 资金曲线组件,策略实例/账号基本的实时资金曲线
- 支持bar方式回测/组合回测 - 提供cta_position 组件,支持多/空/净仓位记录,支持套利
- 增强期货交易模板 - 提供cta_policy 组件,持久化复杂的策略执行逻辑
- 修正vnpy.gateway.binance现货网关恢复position - 提供cta_period 组件,支持策略中‘周期’的逻辑
- 提供cta_grid_trade组件支持网格交易、复杂的策略持仓逻辑、持久化
13、 增加App: cta_stock, 包括:
- 增加baostock数据源可下载股票基本信息复权因子非复权5Min数据k线满足大部分Cta策略的回测了。 6、 增强ctp_gateway包括:
- 使用tdx的历史逐笔成交数据可缓存每日数据=>pkb2文件支持tick回测。
- 独立的CTA引擎 cta_stock运行股票CTA策略时替代原版cta_strategy引擎
- 提供股票专用模板,支持目标股票买入卖出,市场盘面算法交易,支持策略多股票持久化
- 支持策略中获取账号资金/可用余额/当前仓位/风控仓位
- 支持策略中获取账号所有股票持仓
- 支持bar/tick方式回测/组合回测
- 支持可转债日内交易回测,支持动态前复权。
- 支持盘前复权信息事件【待更新】
14、GUI界面增强
- 交易界面恢复部分v1版本的快捷功能如快速平仓 - 提供指数行情订阅
- 策略运行界面,增加'保存’,'K线' 按钮保存策略内部数据保存切片查看最新切片K线。 - 使用RabbitMQ指数源或tdx单一数据源
- K线切片,支持同一策略内多周期、多品种K线。 - 提供自定义合约功能实时提供其合成后的tick行情
- 增加天勤行情实现上期所5档行情和指数行情
5、 增加App: index_tick_publisher, 订阅通达信指数行情=》rabbit_mq 推送
4、 增加App: tick_recorder, 直接异步写入csv文件
3、 增加tdx 免费数据源,包括
- 提供主力合约/指数合约的信息获取
- 提供期货/股票数据bar 和分笔成交数据下载
- 提供每日增量更新期货数据=> csv文件可配合NFS+Celery实现分布式回测
2、 增加rabbitMQ通信组件
1、 事件引擎,增加运行效率调试功能
大佳 大佳
QQ/Wechat28888502 QQ/Wechat28888502
2020最新套利课程http://www.uquant.org/course/43
-------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------
# 原版 vn.py - 基于python的开源交易平台开发框架 # 原版 vn.py - 基于python的开源交易平台开发框架

View File

@ -0,0 +1,91 @@
# flake8: noqa
"""
下载天勤期货历史tick数据 => vnpy项目目录/bar_data/tq/
"""
import os
import sys
import json
import csv
from collections import OrderedDict
import pandas as pd
from contextlib import closing
from datetime import datetime, timedelta
import argparse
from tqsdk import TqApi, TqSim
vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
if vnpy_root not in sys.path:
sys.path.append(vnpy_root)
os.environ["VNPY_TESTING"] = "1"
from vnpy.data.tdx.tdx_future_data import get_future_contracts, Exchange
from vnpy.trader.utility import get_csv_last_dt, get_underlying_symbol, extract_vt_symbol
from vnpy.data.tq.downloader import DataDownloader
if __name__ == "__main__":
if len(sys.argv) <= 1:
print('请使用 --help 查看说明')
# 参数分析
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--symbol', type=str, default='', help='下载合约,格式: rb2010 或者 SHFE.rb2010')
parser.add_argument('-b', '--begin', type=str, default='20160101', help='开始日期格式20160101')
parser.add_argument('-e', '--end', type=str, default=datetime.now().strftime('%Y%m%d'),
help='结束日期,格式:{}'.format(datetime.now().strftime('%Y%m%d')))
args = parser.parse_args()
if len(args.symbol) == 0:
print('下载合约未设定 参数 -s rb2010')
os._exit(0)
# 开始下载
api = TqApi(TqSim())
download_tasks = {}
begin_date = datetime.strptime(args.begin, '%Y%m%d')
end_date = datetime.strptime(args.end, '%Y%m%d')
n_days = (end_date - begin_date).days
future_contracts = get_future_contracts()
if '.' not in args.symbol:
underly_symbol = get_underlying_symbol(args.symbol).upper()
contract_info = future_contracts.get(underly_symbol)
symbol = args.symbol
exchange = Exchange(contract_info.get('exchange'))
else:
symbol, exchange = extract_vt_symbol(args.symbol)
if n_days <= 0:
n_days = 1
for n in range(n_days):
download_date = begin_date + timedelta(days=n)
if download_date.isoweekday() in [6, 7]:
continue
save_folder = os.path.abspath(os.path.join(
vnpy_root, 'tick_data', 'tq', 'future',
download_date.strftime('%Y%m')))
if not os.path.exists(save_folder):
os.makedirs(save_folder)
save_file = os.path.abspath(os.path.join(save_folder,
"{}_{}.csv".format(symbol, download_date.strftime('%Y%m%d'))))
zip_file = os.path.abspath(os.path.join(save_folder,
"{}_{}.pkb2".format(symbol, download_date.strftime('%Y%m%d'))))
if os.path.exists(save_file):
continue
if os.path.exists(zip_file):
continue
# 下载从 2018-05-01凌晨0点 到 2018-06-01凌晨0点 的 T1809 盘口Tick数据
download_tasks["{}_{}_tick".format(symbol, download_date.strftime('%Y%m%d'))] = DataDownloader(
api,
symbol_list=f"{exchange.value}.{symbol}",
dur_sec=0,
start_dt=download_date.date(),
end_dt=download_date.replace(hour=16), csv_file_name=save_file)
# 使用with closing机制确保下载完成后释放对应的资源
with closing(api):
while not all([v.is_finished() for v in download_tasks.values()]):
api.wait_update()
print("progress: ", {k: ("%.2f%%" % v.get_progress()) for k, v in download_tasks.items()})

View File

@ -1,3 +1,4 @@
six==1.13.0
PyQt5 PyQt5
pyqtgraph pyqtgraph
dataclasses; python_version<="3.6" dataclasses; python_version<="3.6"
@ -7,7 +8,7 @@ websocket-client
peewee peewee
mongoengine mongoengine
numpy numpy
pandas pandas==0.25.2
matplotlib matplotlib
seaborn seaborn
futu-api futu-api
@ -18,6 +19,7 @@ ibapi
deap deap
pyzmq pyzmq
QScintilla QScintilla
pytdx PySocks
pykalman pykalman
cython cython
tqsdk

217
vnpy/data/tq/downloader.py Normal file
View File

@ -0,0 +1,217 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#__author__ = 'yangyang'
# 修改:
# 1 输入单个合约时,标题不再扩展为 合约.标题
# 2. 下载tick时5当行情都下载
import csv
from datetime import date, datetime
from typing import Union, List
from tqsdk.api import TqApi
from tqsdk.datetime import _get_trading_day_start_time, _get_trading_day_end_time
from tqsdk.diff import _get_obj
from tqsdk.utils import _generate_uuid
class DataDownloader:
"""
历史数据下载器, 输出到csv文件
多合约按时间横向对齐
"""
def __init__(self, api: TqApi, symbol_list: Union[str, List[str]], dur_sec: int, start_dt: Union[date, datetime],
end_dt: Union[date, datetime], csv_file_name: str) -> None:
"""
创建历史数据下载器实例
Args:
api (TqApi): TqApi实例该下载器将使用指定的api下载数据
symbol_list (str/list of str): 需要下载数据的合约代码当指定多个合约代码时将其他合约按第一个合约的交易时间对齐
dur_sec (int): 数据周期以秒为单位例如: 1分钟线为60,1小时线为3600,日线为86400,Tick数据为0
start_dt (date/datetime): 起始时间, 如果类型为 date 则指的是交易日, 如果为 datetime 则指的是具体时间点
end_dt (date/datetime): 结束时间, 如果类型为 date 则指的是交易日, 如果为 datetime 则指的是具体时间点
csv_file_name (str): 输出csv的文件名
Example::
from datetime import datetime, date
from contextlib import closing
from tqsdk import TqApi, TqSim
from tqsdk.tools import DataDownloader
api = TqApi(TqSim())
download_tasks = {}
# 下载从 2018-01-01 到 2018-09-01 的 SR901 日线数据
download_tasks["SR_daily"] = DataDownloader(api, symbol_list="CZCE.SR901", dur_sec=24*60*60,
start_dt=date(2018, 1, 1), end_dt=date(2018, 9, 1), csv_file_name="SR901_daily.csv")
# 下载从 2017-01-01 到 2018-09-01 的 rb主连 5分钟线数据
download_tasks["rb_5min"] = DataDownloader(api, symbol_list="KQ.m@SHFE.rb", dur_sec=5*60,
start_dt=date(2017, 1, 1), end_dt=date(2018, 9, 1), csv_file_name="rb_5min.csv")
# 下载从 2018-01-01凌晨6点 到 2018-06-01下午4点 的 cu1805,cu1807,IC1803 分钟线数据,所有数据按 cu1805 的时间对齐
# 例如 cu1805 夜盘交易时段, IC1803 的各项数据为 N/A
# 例如 cu1805 13:00-13:30 不交易, 因此 IC1803 在 13:00-13:30 之间的K线数据会被跳过
download_tasks["cu_min"] = DataDownloader(api, symbol_list=["SHFE.cu1805", "SHFE.cu1807", "CFFEX.IC1803"], dur_sec=60,
start_dt=datetime(2018, 1, 1, 6, 0 ,0), end_dt=datetime(2018, 6, 1, 16, 0, 0), csv_file_name="cu_min.csv")
# 下载从 2018-05-01凌晨0点 到 2018-06-01凌晨0点 的 T1809 盘口Tick数据
download_tasks["T_tick"] = DataDownloader(api, symbol_list=["CFFEX.T1809"], dur_sec=0,
start_dt=datetime(2018, 5, 1), end_dt=datetime(2018, 6, 1), csv_file_name="T1809_tick.csv")
# 使用with closing机制确保下载完成后释放对应的资源
with closing(api):
while not all([v.is_finished() for v in download_tasks.values()]):
api.wait_update()
print("progress: ", { k:("%.2f%%" % v.get_progress()) for k,v in download_tasks.items() })
"""
self._api = api
if isinstance(start_dt, datetime):
self._start_dt_nano = int(start_dt.timestamp() * 1e9)
else:
self._start_dt_nano = _get_trading_day_start_time(int(datetime(start_dt.year, start_dt.month, start_dt.day).timestamp()) * 1000000000)
if isinstance(end_dt, datetime):
self._end_dt_nano = int(end_dt.timestamp() * 1e9)
else:
self._end_dt_nano = _get_trading_day_end_time(int(datetime(end_dt.year, end_dt.month, end_dt.day).timestamp()) * 1000000000)
self._current_dt_nano = self._start_dt_nano
self._symbol_list = symbol_list if isinstance(symbol_list, list) else [symbol_list]
# 检查合约代码是否存在
for symbol in self._symbol_list:
if (not self._api._stock) and symbol not in self._api._data.get("quotes", {}):
raise Exception("代码 %s 不存在, 请检查合约代码是否填写正确" % (symbol))
self._dur_nano = dur_sec * 1000000000
if self._dur_nano == 0 and len(self._symbol_list) != 1:
raise Exception("Tick序列不支持多合约")
self._csv_file_name = csv_file_name
self._task = self._api.create_task(self._download_data())
def is_finished(self) -> bool:
"""
判断是否下载完成
Returns:
bool: 如果数据下载完成则返回 True, 否则返回 False.
"""
return self._task.done()
def get_progress(self) -> float:
"""
获得下载进度百分比
Returns:
float: 下载进度,100表示下载完成
"""
return 100.0 if self._task.done() else (self._current_dt_nano - self._start_dt_nano) / (
self._end_dt_nano - self._start_dt_nano) * 100
async def _download_data(self):
"""下载数据, 多合约横向按时间对齐"""
chart_info = {
"aid": "set_chart",
"chart_id": _generate_uuid("PYSDK_downloader"),
"ins_list": ",".join(self._symbol_list),
"duration": self._dur_nano,
"view_width": 2000,
"focus_datetime": self._start_dt_nano,
"focus_position": 0,
}
# 还没有发送过任何请求, 先请求定位左端点
await self._api._send_chan.send(chart_info)
chart = _get_obj(self._api._data, ["charts", chart_info["chart_id"]])
current_id = None # 当前数据指针
csv_header = []
data_cols = ["open", "high", "low", "close", "volume", "open_oi", "close_oi"] if self._dur_nano != 0 else \
["last_price", "highest", "lowest", "volume",
"amount", "open_interest","upper_limit","lower_limit",
"bid_price1", "bid_volume1", "ask_price1", "ask_volume1",
"bid_price2", "bid_volume2", "ask_price2", "ask_volume2",
"bid_price3", "bid_volume3", "ask_price3", "ask_volume3",
"bid_price4", "bid_volume4", "ask_price4", "ask_volume4",
"bid_price5", "bid_volume5", "ask_price5", "ask_volume5"
]
serials = []
for symbol in self._symbol_list:
path = ["klines", symbol, str(self._dur_nano)] if self._dur_nano != 0 else ["ticks", symbol]
serial = _get_obj(self._api._data, path)
serials.append(serial)
try:
with open(self._csv_file_name, 'w', newline='') as csvfile:
csv_writer = csv.writer(csvfile, dialect='excel')
async with self._api.register_update_notify() as update_chan:
async for _ in update_chan:
if not (chart_info.items() <= _get_obj(chart, ["state"]).items()):
# 当前请求还没收齐回应, 不应继续处理
continue
left_id = chart.get("left_id", -1)
right_id = chart.get("right_id", -1)
if (left_id == -1 and right_id == -1) or self._api._data.get("mdhis_more_data", True):
# 定位信息还没收到, 或数据序列还没收到
continue
for serial in serials:
# 检查合约的数据是否收到
if serial.get("last_id", -1) == -1:
continue
if current_id is None:
current_id = max(left_id, 0)
while current_id <= right_id:
item = serials[0]["data"].get(str(current_id), {})
if item.get("datetime", 0) == 0 or item["datetime"] > self._end_dt_nano:
# 当前 id 已超出 last_id 或k线数据的时间已经超过用户限定的右端
return
if len(csv_header) == 0:
# 写入文件头
csv_header = ["datetime"]
for symbol in self._symbol_list:
for col in data_cols:
if len(self._symbol_list) > 2:
csv_header.append(symbol + "." + col)
else:
csv_header.append(col)
csv_writer.writerow(csv_header)
row = [self._nano_to_str(item["datetime"])]
for col in data_cols:
row.append(self._get_value(item, col))
for i in range(1, len(self._symbol_list)):
symbol = self._symbol_list[i]
tid = serials[0].get("binding", {}).get(symbol, {}).get(str(current_id), -1)
k = {} if tid == -1 else serials[i]["data"].get(str(tid), {})
for col in data_cols:
row.append(self._get_value(k, col))
csv_writer.writerow(row)
current_id += 1
self._current_dt_nano = item["datetime"]
# 当前 id 已超出订阅范围, 需重新订阅后续数据
chart_info.pop("focus_datetime", None)
chart_info.pop("focus_position", None)
chart_info["left_kline_id"] = current_id
await self._api._send_chan.send(chart_info)
finally:
# 释放chart资源
await self._api._send_chan.send({
"aid": "set_chart",
"chart_id": chart_info["chart_id"],
"ins_list": "",
"duration": self._dur_nano,
"view_width": 2000,
})
@staticmethod
def _get_value(obj, key):
if key not in obj:
return "#N/A"
if isinstance(obj[key], str):
return float("nan")
return obj[key]
@staticmethod
def _nano_to_str(nano):
dt = datetime.fromtimestamp(nano // 1000000000)
s = dt.strftime('%Y-%m-%d %H:%M:%S')
s += '.' + str(int(nano % 1000000000)).zfill(9)
return s

View File

@ -5,6 +5,9 @@ import traceback
import json import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from copy import copy, deepcopy from copy import copy, deepcopy
from functools import lru_cache
from typing import List
import pandas as pd
from vnpy.api.ctp import ( from vnpy.api.ctp import (
MdApi, MdApi,
@ -48,11 +51,13 @@ from vnpy.trader.constant import (
OrderType, OrderType,
Product, Product,
Status, Status,
OptionType OptionType,
Interval
) )
from vnpy.trader.gateway import BaseGateway from vnpy.trader.gateway import BaseGateway
from vnpy.trader.object import ( from vnpy.trader.object import (
TickData, TickData,
BarData,
OrderData, OrderData,
TradeData, TradeData,
PositionData, PositionData,
@ -61,6 +66,7 @@ from vnpy.trader.object import (
OrderRequest, OrderRequest,
CancelRequest, CancelRequest,
SubscribeRequest, SubscribeRequest,
HistoryRequest
) )
from vnpy.trader.utility import ( from vnpy.trader.utility import (
extract_vt_symbol, extract_vt_symbol,
@ -73,6 +79,8 @@ from vnpy.trader.utility import (
) )
from vnpy.trader.event import EVENT_TIMER from vnpy.trader.event import EVENT_TIMER
from vnpy.api.websocket import WebsocketClient
# 增加通达信指数接口行情 # 增加通达信指数接口行情
from time import sleep from time import sleep
from threading import Thread from threading import Thread
@ -152,6 +160,64 @@ index_contracts = {}
# tdx 期货配置本地缓存 # tdx 期货配置本地缓存
future_contracts = get_future_contracts() future_contracts = get_future_contracts()
# 时间戳对齐
TIME_GAP = 8 * 60 * 60 * 1000000000
INTERVAL_VT2TQ = {
Interval.MINUTE: 60,
Interval.HOUR: 60 * 60,
Interval.DAILY: 60 * 60 * 24,
}
TQ2VT_TYPE = {
"FUTURE_OPTION": Product.OPTION,
"INDEX": Product.INDEX,
"FUTURE_COMBINE": Product.SPREAD,
"SPOT": Product.SPOT,
"FUTURE_CONT": Product.INDEX,
"FUTURE": Product.FUTURES,
"FUTURE_INDEX": Product.INDEX,
"OPTION": Product.OPTION,
}
@lru_cache(maxsize=9999)
def vt_to_tq_symbol(symbol: str, exchange: Exchange) -> str:
"""
TQSdk exchange first
"""
for count, word in enumerate(symbol):
if word.isdigit():
break
fix_symbol = symbol
if exchange in [Exchange.INE, Exchange.SHFE, Exchange.DCE]:
fix_symbol = symbol.lower()
# Check for index symbol
time_str = symbol[count:]
if time_str in ["88"]:
return f"KQ.m@{exchange.value}.{fix_symbol[:count]}"
if time_str in ["99"]:
return f"KQ.i@{exchange.value}.{fix_symbol[:count]}"
return f"{exchange.value}.{fix_symbol}"
@lru_cache(maxsize=9999)
def tq_to_vt_symbol(tq_symbol: str) -> str:
""""""
if "KQ.m" in tq_symbol:
ins_type, instrument = tq_symbol.split("@")
exchange, symbol = instrument.split(".")
return f"{symbol}88.{exchange}"
elif "KQ.i" in tq_symbol:
ins_type, instrument = tq_symbol.split("@")
exchange, symbol = instrument.split(".")
return f"{symbol}99.{exchange}"
else:
exchange, symbol = tq_symbol.split(".")
return f"{symbol}.{exchange}"
class CtpGateway(BaseGateway): class CtpGateway(BaseGateway):
""" """
@ -185,6 +251,7 @@ class CtpGateway(BaseGateway):
self.md_api = None self.md_api = None
self.tdx_api = None self.tdx_api = None
self.rabbit_api = None self.rabbit_api = None
self.tq_api = None
self.subscribed_symbols = set() # 已订阅合约代码 self.subscribed_symbols = set() # 已订阅合约代码
@ -204,6 +271,7 @@ class CtpGateway(BaseGateway):
auth_code = setting["授权编码"] auth_code = setting["授权编码"]
product_info = setting["产品信息"] product_info = setting["产品信息"]
rabbit_dict = setting.get('rabbit', None) rabbit_dict = setting.get('rabbit', None)
tq_dict = setting.get('tq', None)
if ( if (
(not td_address.startswith("tcp://")) (not td_address.startswith("tcp://"))
and (not td_address.startswith("ssl://")) and (not td_address.startswith("ssl://"))
@ -239,22 +307,28 @@ class CtpGateway(BaseGateway):
self.md_api.connect(md_address, userid, password, brokerid) self.md_api.connect(md_address, userid, password, brokerid)
if rabbit_dict: if rabbit_dict:
self.write_log(f'激活RabbitMQ行情接口')
self.rabbit_api = SubMdApi(gateway=self) self.rabbit_api = SubMdApi(gateway=self)
self.rabbit_api.connect(rabbit_dict) self.rabbit_api.connect(rabbit_dict)
elif tq_dict is not None:
self.write_log(f'激活天勤行情接口')
self.tq_api = TqMdApi(gateway=self)
self.tq_api.connect(tq_dict)
else: else:
self.write_log(f'激活通达信行情接口')
self.tdx_api = TdxMdApi(gateway=self) self.tdx_api = TdxMdApi(gateway=self)
self.tdx_api.connect() self.tdx_api.connect()
self.init_query() self.init_query()
for (vt_symbol, is_bar) in self.subscribed_symbols: for (vt_symbol, is_bar) in list(self.subscribed_symbols):
symbol, exchange = extract_vt_symbol(vt_symbol) symbol, exchange = extract_vt_symbol(vt_symbol)
req = SubscribeRequest( req = SubscribeRequest(
symbol=symbol, symbol=symbol,
exchange=exchange, exchange=exchange,
is_bar=is_bar is_bar=is_bar
) )
# 指数合约从tdx行情订阅 # 指数合约从tdx行情、天勤订阅
if req.symbol[-2:] in ['99']: if req.symbol[-2:] in ['99']:
req.symbol = req.symbol.upper() req.symbol = req.symbol.upper()
if self.tdx_api is not None: if self.tdx_api is not None:
@ -262,7 +336,16 @@ class CtpGateway(BaseGateway):
self.tdx_api.connect() self.tdx_api.connect()
self.tdx_api.subscribe(req) self.tdx_api.subscribe(req)
elif self.rabbit_api is not None: elif self.rabbit_api is not None:
# 使用rabbitmq获取
self.rabbit_api.subscribe(req) self.rabbit_api.subscribe(req)
elif self.tq_api:
# 使用天勤行情获取
self.tq_api.subscribe(req)
else:
# 上期所、上能源支持五档行情,使用天勤接口
if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
self.write_log(f'使用天勤接口订阅')
self.tq_api.subscribe(req)
else: else:
self.md_api.subscribe(req) self.md_api.subscribe(req)
@ -339,10 +422,21 @@ class CtpGateway(BaseGateway):
if req.symbol[-2:] in ['99']: if req.symbol[-2:] in ['99']:
req.symbol = req.symbol.upper() req.symbol = req.symbol.upper()
if self.tdx_api: if self.tdx_api:
self.write_log(f'使用通达信接口订阅{req.symbol}')
self.tdx_api.subscribe(req) self.tdx_api.subscribe(req)
elif self.rabbit_api: elif self.rabbit_api:
self.write_log(f'使用RabbitMQ接口订阅{req.symbol}')
self.rabbit_api.subscribe(req) self.rabbit_api.subscribe(req)
elif self.tq_api:
self.write_log(f'使用天勤接口订阅{ req.symbol}')
self.tq_api.subscribe(req)
else: else:
# 上期所、上能源支持五档行情,使用天勤接口
if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]:
self.write_log(f'使用天勤接口订阅{ req.symbol}')
self.tq_api.subscribe(req)
else:
self.write_log(f'使用CTP接口订阅{req.symbol}')
self.md_api.subscribe(req) self.md_api.subscribe(req)
# Allow the strategies to start before the connection # Allow the strategies to start before the connection
@ -408,6 +502,12 @@ class CtpGateway(BaseGateway):
self.rabbit_api = None self.rabbit_api = None
tmp4.close() tmp4.close()
if self.tq_api:
self.write_log(u'天勤行情API')
tmp5 = self.tq_api
self.tq_api = None
tmp5.close()
def process_timer_event(self, event): def process_timer_event(self, event):
"""""" """"""
self.count += 1 self.count += 1
@ -433,7 +533,6 @@ class CtpGateway(BaseGateway):
tick = copy(tick) tick = copy(tick)
combiner.on_tick(tick) combiner.on_tick(tick)
class CtpMdApi(MdApi): class CtpMdApi(MdApi):
"""""" """"""
@ -1710,6 +1809,222 @@ class SubMdApi():
self.gateway.write_log(u'RabbitMQ行情订阅 {}'.format(str(vn_symbol))) self.gateway.write_log(u'RabbitMQ行情订阅 {}'.format(str(vn_symbol)))
class TqMdApi():
"""天勤行情API"""
def __init__(self, gateway):
""""""
super().__init__()
self.gateway = gateway
self.gateway_name = gateway.gateway_name
self.api = None
self.is_connected = False
self.subscribe_array = []
# 行情对象列表
self.quote_objs = []
# 数据更新线程
self.update_thread = None
# 所有的合约
self.all_instruments = []
self.ticks = {}
def connect(self, setting):
""""""
try:
from tqsdk import TqApi
self.api = TqApi()
except Exception as e:
self.gateway.write_log(f'天勤行情API接入异常'.format(str(e)))
if self.api:
self.is_connected = True
self.gateway.write_log(f'天勤行情API已连接')
self.update_thread = Thread(target=self.update)
self.update_thread.start()
def generate_tick_from_quote(self, vt_symbol, quote) -> TickData:
"""
生成TickData
"""
# 清洗 nan
quote = {k: 0 if v != v else v for k, v in quote.items()}
symbol, exchange = extract_vt_symbol(vt_symbol)
tick = TickData(
symbol=symbol,
exchange=exchange,
datetime=datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"),
name=symbol,
volume=quote["volume"],
open_interest=quote["open_interest"],
last_price=quote["last_price"],
limit_up=quote["upper_limit"],
limit_down=quote["lower_limit"],
open_price=quote["open"],
high_price=quote["highest"],
low_price=quote["lowest"],
pre_close=quote["pre_close"],
bid_price_1=quote["bid_price1"],
bid_price_2=quote["bid_price2"],
bid_price_3=quote["bid_price3"],
bid_price_4=quote["bid_price4"],
bid_price_5=quote["bid_price5"],
ask_price_1=quote["ask_price1"],
ask_price_2=quote["ask_price2"],
ask_price_3=quote["ask_price3"],
ask_price_4=quote["ask_price4"],
ask_price_5=quote["ask_price5"],
bid_volume_1=quote["bid_volume1"],
bid_volume_2=quote["bid_volume2"],
bid_volume_3=quote["bid_volume3"],
bid_volume_4=quote["bid_volume4"],
bid_volume_5=quote["bid_volume5"],
ask_volume_1=quote["ask_volume1"],
ask_volume_2=quote["ask_volume2"],
ask_volume_3=quote["ask_volume3"],
ask_volume_4=quote["ask_volume4"],
ask_volume_5=quote["ask_volume5"],
gateway_name=self.gateway_name
)
if symbol.endswith('99') and tick.ask_price_1 == 0.0 and tick.bid_price_1 == 0.0:
price_tick = quote['price_tick']
if isinstance(price_tick, float) or isinstance(price_tick,int):
tick.ask_price_1 = tick.last_price + price_tick
tick.ask_volume_1 = 1
tick.bid_price_1 = tick.last_price - price_tick
tick.bid_volume_1 = 1
return tick
def update(self) -> None:
"""
更新行情/委托/账户/持仓
"""
while self.api.wait_update():
# 更新行情信息
for vt_symbol, quote in self.quote_objs:
if self.api.is_changing(quote):
tick = self.generate_tick_from_quote(vt_symbol, quote)
tick and self.gateway.on_tick(tick) and self.gateway.on_custom_tick(tick)
def subscribe(self, req: SubscribeRequest) -> None:
"""
订阅行情
"""
if req.vt_symbol not in self.subscribe_array:
symbol, exchange = extract_vt_symbol(req.vt_symbol)
try:
quote = self.api.get_quote(vt_to_tq_symbol(symbol, exchange))
self.quote_objs.append((req.vt_symbol, quote))
self.subscribe_array.append(req.vt_symbol)
except Exception as ex:
self.gateway.write_log('订阅天勤行情异常:{}'.format(str(ex)))
def query_contracts(self) -> None:
""""""
self.all_instruments = [
v for k, v in self.api._data["quotes"].items() if v["expired"] == False
]
for contract in self.all_instruments:
if (
"SSWE" in contract["instrument_id"]
or "CSI" in contract["instrument_id"]
):
# vnpy没有这两个交易所需要可以自行修改vnpy代码
continue
vt_symbol = tq_to_vt_symbol(contract["instrument_id"])
symbol, exchange = extract_vt_symbol(vt_symbol)
if TQ2VT_TYPE[contract["ins_class"]] == Product.OPTION:
contract_data = ContractData(
symbol=symbol,
exchange=exchange,
name=symbol,
product=TQ2VT_TYPE[contract["ins_class"]],
size=contract["volume_multiple"],
pricetick=contract["price_tick"],
history_data=True,
option_strike=contract["strike_price"],
option_underlying=tq_to_vt_symbol(contract["underlying_symbol"]),
option_type=OptionType[contract["option_class"]],
option_expiry=datetime.fromtimestamp(contract["expire_datetime"]),
option_index=tq_to_vt_symbol(contract["underlying_symbol"]),
gateway_name=self.gateway_name,
)
else:
contract_data = ContractData(
symbol=symbol,
exchange=exchange,
name=symbol,
product=TQ2VT_TYPE[contract["ins_class"]],
size=contract["volume_multiple"],
pricetick=contract["price_tick"],
history_data=True,
gateway_name=self.gateway_name,
)
self.gateway.on_contract(contract_data)
def query_history(self, req: HistoryRequest) -> List[BarData]:
"""
获取历史数据
"""
symbol = req.symbol
exchange = req.exchange
interval = req.interval
start = req.start
end = req.end
# 天勤需要的数据
tq_symbol = vt_to_tq_symbol(symbol, exchange)
tq_interval = INTERVAL_VT2TQ.get(interval)
end += timedelta(1)
total_days = end - start
# 一次最多只能下载 8964 根Bar
min_length = min(8964, total_days.days * 500)
df = self.api.get_kline_serial(tq_symbol, tq_interval, min_length).sort_values(
by=["datetime"]
)
# 时间戳对齐
df["datetime"] = pd.to_datetime(df["datetime"] + TIME_GAP)
# 过滤开始结束时间
df = df[(df["datetime"] >= start - timedelta(days=1)) & (df["datetime"] < end)]
data: List[BarData] = []
if df is not None:
for ix, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=exchange,
interval=interval,
datetime=row["datetime"].to_pydatetime(),
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
open_interest=row.get("close_oi", 0),
gateway_name=self.gateway_name,
)
data.append(bar)
return data
def close(self) -> None:
""""""
try:
if self.api:
self.api.close()
self.is_connected = False
if self.update_thread:
self.update_thread.join()
except Exception as e:
self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e)))
class TickCombiner(object): class TickCombiner(object):
""" """
Tick合成类 Tick合成类

View File

@ -330,6 +330,7 @@ class ContractData(BaseData):
option_underlying: str = "" # vt_symbol of underlying contract option_underlying: str = "" # vt_symbol of underlying contract
option_type: OptionType = None option_type: OptionType = None
option_expiry: datetime = None option_expiry: datetime = None
option_index: str = "" # vt_symbol mapping cur option
def __post_init__(self): def __post_init__(self):
"""""" """"""

View File

@ -504,7 +504,7 @@ class ConnectDialog(QtWidgets.QDialog):
def __init__(self, main_engine: MainEngine, gateway_name: str): def __init__(self, main_engine: MainEngine, gateway_name: str):
"""""" """"""
super().__init__() super().__init__()
self.setting = {}
self.main_engine: MainEngine = main_engine self.main_engine: MainEngine = main_engine
self.gateway_name: str = gateway_name self.gateway_name: str = gateway_name
self.filename: str = f"connect_{gateway_name.lower()}.json" self.filename: str = f"connect_{gateway_name.lower()}.json"
@ -524,6 +524,8 @@ class ConnectDialog(QtWidgets.QDialog):
# Saved setting provides field data used last time. # Saved setting provides field data used last time.
loaded_setting = load_json(self.filename) loaded_setting = load_json(self.filename)
self.setting.update(loaded_setting)
# Initialize line edits and form layout based on setting. # Initialize line edits and form layout based on setting.
form = QtWidgets.QFormLayout() form = QtWidgets.QFormLayout()
@ -570,9 +572,11 @@ class ConnectDialog(QtWidgets.QDialog):
field_value = field_type(widget.text()) field_value = field_type(widget.text())
setting[field_name] = field_value setting[field_name] = field_value
save_json(self.filename, setting) self.setting.update(setting)
self.main_engine.connect(setting, self.gateway_name) save_json(self.filename, self.setting)
self.main_engine.connect(self.setting, self.gateway_name)
self.accept() self.accept()