From 764f70ca1613283866acb01da55d675ad83d5f40 Mon Sep 17 00:00:00 2001 From: msincenselee Date: Mon, 20 Jul 2020 10:25:37 +0800 Subject: [PATCH] =?UTF-8?q?[=E5=A2=9E=E5=BC=BA=E5=8A=9F=E8=83=BD]=20?= =?UTF-8?q?=E5=A4=A9=E5=8B=A4=E8=A1=8C=E6=83=85=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 158 ++++++++------ prod/jobs/refill_tq_future_ticks.py | 91 ++++++++ requirements.txt | 6 +- vnpy/data/tq/downloader.py | 217 ++++++++++++++++++ vnpy/gateway/ctp/ctp_gateway.py | 327 +++++++++++++++++++++++++++- vnpy/trader/object.py | 1 + vnpy/trader/ui/widget.py | 10 +- 7 files changed, 729 insertions(+), 81 deletions(-) create mode 100644 prod/jobs/refill_tq_future_ticks.py create mode 100644 vnpy/data/tq/downloader.py diff --git a/README.md b/README.md index 8c96dba8..982a8a7a 100644 --- a/README.md +++ b/README.md @@ -10,38 +10,60 @@ github 链接: https://github.com/msincenselee/vnpy gitee 链接: https://gitee.com/vnpy2/vnpy ###Fork版本主要改进如下 -1、 事件引擎,增加运行效率调试功能 - -2、 增加rabbitMQ通信组件 - -3、 增加tdx 免费数据源,包括 - +15、天勤行情接入 - - 提供主力合约/指数合约的信息获取 - - 提供期货/股票数据bar 和分笔成交数据下载 - - 提供每日增量更新期货数据=> csv文件,可配合NFS+Celery,实现分布式回测 - -4、 增加App: tick_recorder, 直接异步写入csv文件 - -5、 增加App: index_tick_publisher, 订阅通达信指数行情=》rabbit_mq 推送 - -6、 增强ctp_gateway,包括: - + - vnpy.data.tq 定制downloder,扩展下载字段 + - prod.jobs.refill_tq_future.ticks, 下载tick + - vnpy.gateway.ctp.ctp_gateway 扩展支持上期所的五档行情和指数行情。在配置文件中增加'tq':{} 即可。 - - 提供指数行情订阅 - - 使用RabbitMQ指数源,或tdx单一数据源 - - 提供自定义合约功能,实时提供其合成后的tick行情 - -7、 增加component组件,包括: +14、GUI界面增强 + + - 交易界面,恢复部分v1版本的快捷功能,如快速平仓 + - 策略运行界面,增加'保存’,'K线' 按钮,保存策略内部数据,保存切片,查看最新切片K线。 + - K线切片,支持同一策略内多周期、多品种K线。 + - 修改接口连接配置,采用更新方式,替代覆盖。 - - 提供cta_line_bar k线组件,支持国内文华/交易师/TB等分钟/小时的计算模式,支持任意秒/分钟/小时/天/周等周期,支持k线数据实时生成。 - - 提供cta_renko_bar k线组件,支持x跳动/千分比跳动 - - 提供cta_fund_kline 资金曲线组件,策略实例/账号基本的实时资金曲线 - - 提供cta_position 组件,支持多/空/净仓位记录,支持套利 - - 提供cta_policy 组件,持久化复杂的策略执行逻辑 - - 提供cta_period 组件,支持策略中‘周期’的逻辑 - - 提供cta_grid_trade组件,支持网格交易、复杂的策略持仓逻辑、持久化 +13、 增加App: cta_stock, 包括: + + - 增加baostock数据源,可下载股票基本信息,复权因子,非复权5Min数据k线,满足大部分Cta策略的回测了。 + - 使用tdx的历史逐笔成交数据,可缓存每日数据=>pkb2文件,支持tick回测。 + - 独立的CTA引擎 cta_stock,运行股票CTA策略时,替代原版cta_strategy引擎 + - 提供股票专用模板,支持目标股票买入卖出,市场盘面算法交易,支持策略多股票持久化 + - 支持策略中获取账号资金/可用余额/当前仓位/风控仓位 + - 支持策略中获取账号所有股票持仓 + - 支持bar/tick方式回测/组合回测 + - 支持可转债日内交易回测,支持动态前复权。 + - 支持盘前复权信息事件【待更新】 + +12、 增加App: cta_crypto,包括: + + - 增加币安合约交易vnpy.gateway.binancef,支持每个合约独立杠杆比率 + - 增肌币安合约数据接口 vnpy.data.binance.binance_future_data + - 独立的CTA引擎 cta_crypto,运行数字货币时,替代原版cta_strategy引擎。 + - 支持bar方式回测/组合回测 + - 增强期货交易模板 + - 修正vnpy.gateway.binance现货网关,恢复position + + +11、算法引擎 + + - 支持自定义套利合约得算法,及算法下单。 + - 可通过vnpy界面/cta_strategy_pro策略,直接发出套利单,由算法引擎执行 + +10、增加App: account_recorder, 包括: + + - 异步更新账号资金/委托/成交信息至Mongo数据库 + - 异步更新策略持仓数据至Mongo数据库 + - 异步查询股票历史委托/历史成交至Mongo数据库 + +9、 增强主引擎,包括: + + - 支持同一类gateway,多个接入配置 + - 增加获取当前价格接口 + - 增加风控引擎入口 self.rm_engine + - 增加算法引擎入口,支持自定义套利合约得手工/程序化交易转移至算法引擎实现 + 8、 增加App: cta_strategy_pro,包括: @@ -60,56 +82,52 @@ gitee 链接: https://gitee.com/vnpy2/vnpy - 增加CtaSpread模板,支持FAK正套/反套 - 增加Spread组合引擎tick级别回测,支持多策略实例得套利共享账号回测。 -9、 增强主引擎,包括: - - - 支持同一类gateway,多个接入配置 - - 增加获取当前价格接口 - - 增加风控引擎入口 self.rm_engine - - 增加算法引擎入口,支持自定义套利合约得手工/程序化交易转移至算法引擎实现 - -10、增加App: account_recorder, 包括: - - - 异步更新账号资金/委托/成交信息至Mongo数据库 - - 异步更新策略持仓数据至Mongo数据库 - - 异步查询股票历史委托/历史成交至Mongo数据库 - -11、算法引擎 - - 支持自定义套利合约得算法,及算法下单。 - - 可通过vnpy界面/cta_strategy_pro策略,直接发出套利单,由算法引擎执行 +7、 增加component组件,包括: + + + - 提供cta_line_bar k线组件,支持国内文华/交易师/TB等分钟/小时的计算模式,支持任意秒/分钟/小时/天/周等周期,支持k线数据实时生成。 + - 提供cta_renko_bar k线组件,支持x跳动/千分比跳动 + - 提供cta_fund_kline 资金曲线组件,策略实例/账号基本的实时资金曲线 + - 提供cta_position 组件,支持多/空/净仓位记录,支持套利 + - 提供cta_policy 组件,持久化复杂的策略执行逻辑 + - 提供cta_period 组件,支持策略中‘周期’的逻辑 + - 提供cta_grid_trade组件,支持网格交易、复杂的策略持仓逻辑、持久化 -12、 增加App: cta_crypto,包括: - - - 增加币安合约交易vnpy.gateway.binancef,支持每个合约独立杠杆比率 - - 增肌币安合约数据接口 vnpy.data.binance.binance_future_data - - 独立的CTA引擎 cta_crypto,运行数字货币时,替代原版cta_strategy引擎。 - - 支持bar方式回测/组合回测 - - 增强期货交易模板 - - 修正vnpy.gateway.binance现货网关,恢复position - -13、 增加App: cta_stock, 包括: - - - 增加baostock数据源,可下载股票基本信息,复权因子,非复权5Min数据k线,满足大部分Cta策略的回测了。 - - 使用tdx的历史逐笔成交数据,可缓存每日数据=>pkb2文件,支持tick回测。 - - 独立的CTA引擎 cta_stock,运行股票CTA策略时,替代原版cta_strategy引擎 - - 提供股票专用模板,支持目标股票买入卖出,市场盘面算法交易,支持策略多股票持久化 - - 支持策略中获取账号资金/可用余额/当前仓位/风控仓位 - - 支持策略中获取账号所有股票持仓 - - 支持bar/tick方式回测/组合回测 - - 支持可转债日内交易回测,支持动态前复权。 - - 支持盘前复权信息事件【待更新】 - -14、GUI界面增强 - - - 交易界面,恢复部分v1版本的快捷功能,如快速平仓 - - 策略运行界面,增加'保存’,'K线' 按钮,保存策略内部数据,保存切片,查看最新切片K线。 - - K线切片,支持同一策略内多周期、多品种K线。 + +6、 增强ctp_gateway,包括: + + - 提供指数行情订阅 + - 使用RabbitMQ指数源,或tdx单一数据源 + - 提供自定义合约功能,实时提供其合成后的tick行情 + - 增加天勤行情,实现上期所5档行情和指数行情 + + +5、 增加App: index_tick_publisher, 订阅通达信指数行情=》rabbit_mq 推送 + + +4、 增加App: tick_recorder, 直接异步写入csv文件 + + +3、 增加tdx 免费数据源,包括 + + - 提供主力合约/指数合约的信息获取 + - 提供期货/股票数据bar 和分笔成交数据下载 + - 提供每日增量更新期货数据=> csv文件,可配合NFS+Celery,实现分布式回测 + +2、 增加rabbitMQ通信组件 + +1、 事件引擎,增加运行效率调试功能 + + + 大佳 QQ/Wechat:28888502 +2020最新套利课程:http://www.uquant.org/course/43 -------------------------------------------------------------------------------------------- # 原版 vn.py - 基于python的开源交易平台开发框架 diff --git a/prod/jobs/refill_tq_future_ticks.py b/prod/jobs/refill_tq_future_ticks.py new file mode 100644 index 00000000..35b9fbd0 --- /dev/null +++ b/prod/jobs/refill_tq_future_ticks.py @@ -0,0 +1,91 @@ +# flake8: noqa +""" +下载天勤期货历史tick数据 => vnpy项目目录/bar_data/tq/ +""" +import os +import sys +import json +import csv +from collections import OrderedDict +import pandas as pd +from contextlib import closing +from datetime import datetime, timedelta +import argparse +from tqsdk import TqApi, TqSim + +vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +if vnpy_root not in sys.path: + sys.path.append(vnpy_root) + +os.environ["VNPY_TESTING"] = "1" + +from vnpy.data.tdx.tdx_future_data import get_future_contracts, Exchange +from vnpy.trader.utility import get_csv_last_dt, get_underlying_symbol, extract_vt_symbol +from vnpy.data.tq.downloader import DataDownloader + +if __name__ == "__main__": + if len(sys.argv) <= 1: + print('请使用 --help 查看说明') + # 参数分析 + parser = argparse.ArgumentParser() + parser.add_argument('-s', '--symbol', type=str, default='', help='下载合约,格式: rb2010 或者 SHFE.rb2010') + parser.add_argument('-b', '--begin', type=str, default='20160101', help='开始日期,格式:20160101') + parser.add_argument('-e', '--end', type=str, default=datetime.now().strftime('%Y%m%d'), + help='结束日期,格式:{}'.format(datetime.now().strftime('%Y%m%d'))) + args = parser.parse_args() + if len(args.symbol) == 0: + print('下载合约未设定 参数 -s rb2010') + os._exit(0) + + # 开始下载 + api = TqApi(TqSim()) + download_tasks = {} + begin_date = datetime.strptime(args.begin, '%Y%m%d') + end_date = datetime.strptime(args.end, '%Y%m%d') + n_days = (end_date - begin_date).days + + future_contracts = get_future_contracts() + if '.' not in args.symbol: + underly_symbol = get_underlying_symbol(args.symbol).upper() + contract_info = future_contracts.get(underly_symbol) + symbol = args.symbol + exchange = Exchange(contract_info.get('exchange')) + else: + symbol, exchange = extract_vt_symbol(args.symbol) + + if n_days <= 0: + n_days = 1 + + for n in range(n_days): + download_date = begin_date + timedelta(days=n) + if download_date.isoweekday() in [6, 7]: + continue + + save_folder = os.path.abspath(os.path.join( + vnpy_root, 'tick_data', 'tq', 'future', + download_date.strftime('%Y%m'))) + if not os.path.exists(save_folder): + os.makedirs(save_folder) + + save_file = os.path.abspath(os.path.join(save_folder, + "{}_{}.csv".format(symbol, download_date.strftime('%Y%m%d')))) + zip_file = os.path.abspath(os.path.join(save_folder, + "{}_{}.pkb2".format(symbol, download_date.strftime('%Y%m%d')))) + if os.path.exists(save_file): + continue + if os.path.exists(zip_file): + continue + + # 下载从 2018-05-01凌晨0点 到 2018-06-01凌晨0点 的 T1809 盘口Tick数据 + download_tasks["{}_{}_tick".format(symbol, download_date.strftime('%Y%m%d'))] = DataDownloader( + api, + symbol_list=f"{exchange.value}.{symbol}", + dur_sec=0, + start_dt=download_date.date(), + end_dt=download_date.replace(hour=16), csv_file_name=save_file) + + # 使用with closing机制确保下载完成后释放对应的资源 + with closing(api): + while not all([v.is_finished() for v in download_tasks.values()]): + api.wait_update() + print("progress: ", {k: ("%.2f%%" % v.get_progress()) for k, v in download_tasks.items()}) diff --git a/requirements.txt b/requirements.txt index ee7b2c78..28759e38 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +six==1.13.0 PyQt5 pyqtgraph dataclasses; python_version<="3.6" @@ -7,7 +8,7 @@ websocket-client peewee mongoengine numpy -pandas +pandas==0.25.2 matplotlib seaborn futu-api @@ -18,6 +19,7 @@ ibapi deap pyzmq QScintilla -pytdx +PySocks pykalman cython +tqsdk diff --git a/vnpy/data/tq/downloader.py b/vnpy/data/tq/downloader.py new file mode 100644 index 00000000..16b0a528 --- /dev/null +++ b/vnpy/data/tq/downloader.py @@ -0,0 +1,217 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +#__author__ = 'yangyang' +# 修改: +# 1, 输入单个合约时,标题不再扩展为 合约.标题 +# 2. 下载tick时,5当行情都下载 + +import csv +from datetime import date, datetime +from typing import Union, List + +from tqsdk.api import TqApi +from tqsdk.datetime import _get_trading_day_start_time, _get_trading_day_end_time +from tqsdk.diff import _get_obj +from tqsdk.utils import _generate_uuid + + +class DataDownloader: + """ + 历史数据下载器, 输出到csv文件 + + 多合约按时间横向对齐 + """ + + def __init__(self, api: TqApi, symbol_list: Union[str, List[str]], dur_sec: int, start_dt: Union[date, datetime], + end_dt: Union[date, datetime], csv_file_name: str) -> None: + """ + 创建历史数据下载器实例 + + Args: + api (TqApi): TqApi实例,该下载器将使用指定的api下载数据 + + symbol_list (str/list of str): 需要下载数据的合约代码,当指定多个合约代码时将其他合约按第一个合约的交易时间对齐 + + dur_sec (int): 数据周期,以秒为单位。例如: 1分钟线为60,1小时线为3600,日线为86400,Tick数据为0 + + start_dt (date/datetime): 起始时间, 如果类型为 date 则指的是交易日, 如果为 datetime 则指的是具体时间点 + + end_dt (date/datetime): 结束时间, 如果类型为 date 则指的是交易日, 如果为 datetime 则指的是具体时间点 + + csv_file_name (str): 输出csv的文件名 + + Example:: + + from datetime import datetime, date + from contextlib import closing + from tqsdk import TqApi, TqSim + from tqsdk.tools import DataDownloader + + api = TqApi(TqSim()) + download_tasks = {} + # 下载从 2018-01-01 到 2018-09-01 的 SR901 日线数据 + download_tasks["SR_daily"] = DataDownloader(api, symbol_list="CZCE.SR901", dur_sec=24*60*60, + start_dt=date(2018, 1, 1), end_dt=date(2018, 9, 1), csv_file_name="SR901_daily.csv") + # 下载从 2017-01-01 到 2018-09-01 的 rb主连 5分钟线数据 + download_tasks["rb_5min"] = DataDownloader(api, symbol_list="KQ.m@SHFE.rb", dur_sec=5*60, + start_dt=date(2017, 1, 1), end_dt=date(2018, 9, 1), csv_file_name="rb_5min.csv") + # 下载从 2018-01-01凌晨6点 到 2018-06-01下午4点 的 cu1805,cu1807,IC1803 分钟线数据,所有数据按 cu1805 的时间对齐 + # 例如 cu1805 夜盘交易时段, IC1803 的各项数据为 N/A + # 例如 cu1805 13:00-13:30 不交易, 因此 IC1803 在 13:00-13:30 之间的K线数据会被跳过 + download_tasks["cu_min"] = DataDownloader(api, symbol_list=["SHFE.cu1805", "SHFE.cu1807", "CFFEX.IC1803"], dur_sec=60, + start_dt=datetime(2018, 1, 1, 6, 0 ,0), end_dt=datetime(2018, 6, 1, 16, 0, 0), csv_file_name="cu_min.csv") + # 下载从 2018-05-01凌晨0点 到 2018-06-01凌晨0点 的 T1809 盘口Tick数据 + download_tasks["T_tick"] = DataDownloader(api, symbol_list=["CFFEX.T1809"], dur_sec=0, + start_dt=datetime(2018, 5, 1), end_dt=datetime(2018, 6, 1), csv_file_name="T1809_tick.csv") + # 使用with closing机制确保下载完成后释放对应的资源 + with closing(api): + while not all([v.is_finished() for v in download_tasks.values()]): + api.wait_update() + print("progress: ", { k:("%.2f%%" % v.get_progress()) for k,v in download_tasks.items() }) + """ + self._api = api + if isinstance(start_dt, datetime): + self._start_dt_nano = int(start_dt.timestamp() * 1e9) + else: + self._start_dt_nano = _get_trading_day_start_time(int(datetime(start_dt.year, start_dt.month, start_dt.day).timestamp()) * 1000000000) + if isinstance(end_dt, datetime): + self._end_dt_nano = int(end_dt.timestamp() * 1e9) + else: + self._end_dt_nano = _get_trading_day_end_time(int(datetime(end_dt.year, end_dt.month, end_dt.day).timestamp()) * 1000000000) + self._current_dt_nano = self._start_dt_nano + self._symbol_list = symbol_list if isinstance(symbol_list, list) else [symbol_list] + # 检查合约代码是否存在 + for symbol in self._symbol_list: + if (not self._api._stock) and symbol not in self._api._data.get("quotes", {}): + raise Exception("代码 %s 不存在, 请检查合约代码是否填写正确" % (symbol)) + self._dur_nano = dur_sec * 1000000000 + if self._dur_nano == 0 and len(self._symbol_list) != 1: + raise Exception("Tick序列不支持多合约") + self._csv_file_name = csv_file_name + self._task = self._api.create_task(self._download_data()) + + def is_finished(self) -> bool: + """ + 判断是否下载完成 + + Returns: + bool: 如果数据下载完成则返回 True, 否则返回 False. + """ + return self._task.done() + + def get_progress(self) -> float: + """ + 获得下载进度百分比 + + Returns: + float: 下载进度,100表示下载完成 + """ + return 100.0 if self._task.done() else (self._current_dt_nano - self._start_dt_nano) / ( + self._end_dt_nano - self._start_dt_nano) * 100 + + async def _download_data(self): + """下载数据, 多合约横向按时间对齐""" + chart_info = { + "aid": "set_chart", + "chart_id": _generate_uuid("PYSDK_downloader"), + "ins_list": ",".join(self._symbol_list), + "duration": self._dur_nano, + "view_width": 2000, + "focus_datetime": self._start_dt_nano, + "focus_position": 0, + } + # 还没有发送过任何请求, 先请求定位左端点 + await self._api._send_chan.send(chart_info) + chart = _get_obj(self._api._data, ["charts", chart_info["chart_id"]]) + current_id = None # 当前数据指针 + csv_header = [] + data_cols = ["open", "high", "low", "close", "volume", "open_oi", "close_oi"] if self._dur_nano != 0 else \ + ["last_price", "highest", "lowest", "volume", + "amount", "open_interest","upper_limit","lower_limit", + "bid_price1", "bid_volume1", "ask_price1", "ask_volume1", + "bid_price2", "bid_volume2", "ask_price2", "ask_volume2", + "bid_price3", "bid_volume3", "ask_price3", "ask_volume3", + "bid_price4", "bid_volume4", "ask_price4", "ask_volume4", + "bid_price5", "bid_volume5", "ask_price5", "ask_volume5" + ] + serials = [] + for symbol in self._symbol_list: + path = ["klines", symbol, str(self._dur_nano)] if self._dur_nano != 0 else ["ticks", symbol] + serial = _get_obj(self._api._data, path) + serials.append(serial) + try: + with open(self._csv_file_name, 'w', newline='') as csvfile: + csv_writer = csv.writer(csvfile, dialect='excel') + async with self._api.register_update_notify() as update_chan: + async for _ in update_chan: + if not (chart_info.items() <= _get_obj(chart, ["state"]).items()): + # 当前请求还没收齐回应, 不应继续处理 + continue + left_id = chart.get("left_id", -1) + right_id = chart.get("right_id", -1) + if (left_id == -1 and right_id == -1) or self._api._data.get("mdhis_more_data", True): + # 定位信息还没收到, 或数据序列还没收到 + continue + for serial in serials: + # 检查合约的数据是否收到 + if serial.get("last_id", -1) == -1: + continue + if current_id is None: + current_id = max(left_id, 0) + while current_id <= right_id: + item = serials[0]["data"].get(str(current_id), {}) + if item.get("datetime", 0) == 0 or item["datetime"] > self._end_dt_nano: + # 当前 id 已超出 last_id 或k线数据的时间已经超过用户限定的右端 + return + if len(csv_header) == 0: + # 写入文件头 + csv_header = ["datetime"] + for symbol in self._symbol_list: + for col in data_cols: + if len(self._symbol_list) > 2: + csv_header.append(symbol + "." + col) + else: + csv_header.append(col) + + csv_writer.writerow(csv_header) + row = [self._nano_to_str(item["datetime"])] + for col in data_cols: + row.append(self._get_value(item, col)) + for i in range(1, len(self._symbol_list)): + symbol = self._symbol_list[i] + tid = serials[0].get("binding", {}).get(symbol, {}).get(str(current_id), -1) + k = {} if tid == -1 else serials[i]["data"].get(str(tid), {}) + for col in data_cols: + row.append(self._get_value(k, col)) + csv_writer.writerow(row) + current_id += 1 + self._current_dt_nano = item["datetime"] + # 当前 id 已超出订阅范围, 需重新订阅后续数据 + chart_info.pop("focus_datetime", None) + chart_info.pop("focus_position", None) + chart_info["left_kline_id"] = current_id + await self._api._send_chan.send(chart_info) + finally: + # 释放chart资源 + await self._api._send_chan.send({ + "aid": "set_chart", + "chart_id": chart_info["chart_id"], + "ins_list": "", + "duration": self._dur_nano, + "view_width": 2000, + }) + + @staticmethod + def _get_value(obj, key): + if key not in obj: + return "#N/A" + if isinstance(obj[key], str): + return float("nan") + return obj[key] + + @staticmethod + def _nano_to_str(nano): + dt = datetime.fromtimestamp(nano // 1000000000) + s = dt.strftime('%Y-%m-%d %H:%M:%S') + s += '.' + str(int(nano % 1000000000)).zfill(9) + return s diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index ad251513..eb3da627 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -5,6 +5,9 @@ import traceback import json from datetime import datetime, timedelta from copy import copy, deepcopy +from functools import lru_cache +from typing import List +import pandas as pd from vnpy.api.ctp import ( MdApi, @@ -48,11 +51,13 @@ from vnpy.trader.constant import ( OrderType, Product, Status, - OptionType + OptionType, + Interval ) from vnpy.trader.gateway import BaseGateway from vnpy.trader.object import ( TickData, + BarData, OrderData, TradeData, PositionData, @@ -61,6 +66,7 @@ from vnpy.trader.object import ( OrderRequest, CancelRequest, SubscribeRequest, + HistoryRequest ) from vnpy.trader.utility import ( extract_vt_symbol, @@ -73,6 +79,8 @@ from vnpy.trader.utility import ( ) from vnpy.trader.event import EVENT_TIMER +from vnpy.api.websocket import WebsocketClient + # 增加通达信指数接口行情 from time import sleep from threading import Thread @@ -152,6 +160,64 @@ index_contracts = {} # tdx 期货配置本地缓存 future_contracts = get_future_contracts() +# 时间戳对齐 +TIME_GAP = 8 * 60 * 60 * 1000000000 +INTERVAL_VT2TQ = { + Interval.MINUTE: 60, + Interval.HOUR: 60 * 60, + Interval.DAILY: 60 * 60 * 24, +} + +TQ2VT_TYPE = { + "FUTURE_OPTION": Product.OPTION, + "INDEX": Product.INDEX, + "FUTURE_COMBINE": Product.SPREAD, + "SPOT": Product.SPOT, + "FUTURE_CONT": Product.INDEX, + "FUTURE": Product.FUTURES, + "FUTURE_INDEX": Product.INDEX, + "OPTION": Product.OPTION, +} + +@lru_cache(maxsize=9999) +def vt_to_tq_symbol(symbol: str, exchange: Exchange) -> str: + """ + TQSdk exchange first + """ + for count, word in enumerate(symbol): + if word.isdigit(): + break + + fix_symbol = symbol + if exchange in [Exchange.INE, Exchange.SHFE, Exchange.DCE]: + fix_symbol = symbol.lower() + + # Check for index symbol + time_str = symbol[count:] + + if time_str in ["88"]: + return f"KQ.m@{exchange.value}.{fix_symbol[:count]}" + if time_str in ["99"]: + return f"KQ.i@{exchange.value}.{fix_symbol[:count]}" + + return f"{exchange.value}.{fix_symbol}" + + +@lru_cache(maxsize=9999) +def tq_to_vt_symbol(tq_symbol: str) -> str: + """""" + if "KQ.m" in tq_symbol: + ins_type, instrument = tq_symbol.split("@") + exchange, symbol = instrument.split(".") + return f"{symbol}88.{exchange}" + elif "KQ.i" in tq_symbol: + ins_type, instrument = tq_symbol.split("@") + exchange, symbol = instrument.split(".") + return f"{symbol}99.{exchange}" + else: + exchange, symbol = tq_symbol.split(".") + return f"{symbol}.{exchange}" + class CtpGateway(BaseGateway): """ @@ -185,6 +251,7 @@ class CtpGateway(BaseGateway): self.md_api = None self.tdx_api = None self.rabbit_api = None + self.tq_api = None self.subscribed_symbols = set() # 已订阅合约代码 @@ -204,6 +271,7 @@ class CtpGateway(BaseGateway): auth_code = setting["授权编码"] product_info = setting["产品信息"] rabbit_dict = setting.get('rabbit', None) + tq_dict = setting.get('tq', None) if ( (not td_address.startswith("tcp://")) and (not td_address.startswith("ssl://")) @@ -239,22 +307,28 @@ class CtpGateway(BaseGateway): self.md_api.connect(md_address, userid, password, brokerid) if rabbit_dict: + self.write_log(f'激活RabbitMQ行情接口') self.rabbit_api = SubMdApi(gateway=self) self.rabbit_api.connect(rabbit_dict) + elif tq_dict is not None: + self.write_log(f'激活天勤行情接口') + self.tq_api = TqMdApi(gateway=self) + self.tq_api.connect(tq_dict) else: + self.write_log(f'激活通达信行情接口') self.tdx_api = TdxMdApi(gateway=self) self.tdx_api.connect() self.init_query() - for (vt_symbol, is_bar) in self.subscribed_symbols: + for (vt_symbol, is_bar) in list(self.subscribed_symbols): symbol, exchange = extract_vt_symbol(vt_symbol) req = SubscribeRequest( symbol=symbol, exchange=exchange, is_bar=is_bar ) - # 指数合约,从tdx行情订阅 + # 指数合约,从tdx行情、天勤订阅 if req.symbol[-2:] in ['99']: req.symbol = req.symbol.upper() if self.tdx_api is not None: @@ -262,9 +336,18 @@ class CtpGateway(BaseGateway): self.tdx_api.connect() self.tdx_api.subscribe(req) elif self.rabbit_api is not None: + # 使用rabbitmq获取 self.rabbit_api.subscribe(req) + elif self.tq_api: + # 使用天勤行情获取 + self.tq_api.subscribe(req) else: - self.md_api.subscribe(req) + # 上期所、上能源支持五档行情,使用天勤接口 + if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]: + self.write_log(f'使用天勤接口订阅') + self.tq_api.subscribe(req) + else: + self.md_api.subscribe(req) def check_status(self): """检查状态""" @@ -339,11 +422,22 @@ class CtpGateway(BaseGateway): if req.symbol[-2:] in ['99']: req.symbol = req.symbol.upper() if self.tdx_api: + self.write_log(f'使用通达信接口订阅{req.symbol}') self.tdx_api.subscribe(req) elif self.rabbit_api: + self.write_log(f'使用RabbitMQ接口订阅{req.symbol}') self.rabbit_api.subscribe(req) + elif self.tq_api: + self.write_log(f'使用天勤接口订阅{ req.symbol}') + self.tq_api.subscribe(req) else: - self.md_api.subscribe(req) + # 上期所、上能源支持五档行情,使用天勤接口 + if self.tq_api and req.exchange in [Exchange.SHFE, Exchange.INE]: + self.write_log(f'使用天勤接口订阅{ req.symbol}') + self.tq_api.subscribe(req) + else: + self.write_log(f'使用CTP接口订阅{req.symbol}') + self.md_api.subscribe(req) # Allow the strategies to start before the connection self.subscribed_symbols.add((req.vt_symbol, req.is_bar)) @@ -408,6 +502,12 @@ class CtpGateway(BaseGateway): self.rabbit_api = None tmp4.close() + if self.tq_api: + self.write_log(u'天勤行情API') + tmp5 = self.tq_api + self.tq_api = None + tmp5.close() + def process_timer_event(self, event): """""" self.count += 1 @@ -433,7 +533,6 @@ class CtpGateway(BaseGateway): tick = copy(tick) combiner.on_tick(tick) - class CtpMdApi(MdApi): """""" @@ -1710,6 +1809,222 @@ class SubMdApi(): self.gateway.write_log(u'RabbitMQ行情订阅 {}'.format(str(vn_symbol))) +class TqMdApi(): + """天勤行情API""" + + def __init__(self, gateway): + """""" + super().__init__() + + self.gateway = gateway + self.gateway_name = gateway.gateway_name + + self.api = None + self.is_connected = False + self.subscribe_array = [] + # 行情对象列表 + self.quote_objs = [] + + # 数据更新线程 + self.update_thread = None + # 所有的合约 + self.all_instruments = [] + + self.ticks = {} + + def connect(self, setting): + """""" + try: + from tqsdk import TqApi + self.api = TqApi() + except Exception as e: + self.gateway.write_log(f'天勤行情API接入异常'.format(str(e))) + if self.api: + self.is_connected = True + self.gateway.write_log(f'天勤行情API已连接') + self.update_thread = Thread(target=self.update) + self.update_thread.start() + + def generate_tick_from_quote(self, vt_symbol, quote) -> TickData: + """ + 生成TickData + """ + # 清洗 nan + quote = {k: 0 if v != v else v for k, v in quote.items()} + symbol, exchange = extract_vt_symbol(vt_symbol) + tick = TickData( + symbol=symbol, + exchange=exchange, + datetime=datetime.strptime(quote["datetime"], "%Y-%m-%d %H:%M:%S.%f"), + name=symbol, + volume=quote["volume"], + open_interest=quote["open_interest"], + last_price=quote["last_price"], + limit_up=quote["upper_limit"], + limit_down=quote["lower_limit"], + open_price=quote["open"], + high_price=quote["highest"], + low_price=quote["lowest"], + pre_close=quote["pre_close"], + bid_price_1=quote["bid_price1"], + bid_price_2=quote["bid_price2"], + bid_price_3=quote["bid_price3"], + bid_price_4=quote["bid_price4"], + bid_price_5=quote["bid_price5"], + ask_price_1=quote["ask_price1"], + ask_price_2=quote["ask_price2"], + ask_price_3=quote["ask_price3"], + ask_price_4=quote["ask_price4"], + ask_price_5=quote["ask_price5"], + bid_volume_1=quote["bid_volume1"], + bid_volume_2=quote["bid_volume2"], + bid_volume_3=quote["bid_volume3"], + bid_volume_4=quote["bid_volume4"], + bid_volume_5=quote["bid_volume5"], + ask_volume_1=quote["ask_volume1"], + ask_volume_2=quote["ask_volume2"], + ask_volume_3=quote["ask_volume3"], + ask_volume_4=quote["ask_volume4"], + ask_volume_5=quote["ask_volume5"], + gateway_name=self.gateway_name + ) + if symbol.endswith('99') and tick.ask_price_1 == 0.0 and tick.bid_price_1 == 0.0: + price_tick = quote['price_tick'] + if isinstance(price_tick, float) or isinstance(price_tick,int): + tick.ask_price_1 = tick.last_price + price_tick + tick.ask_volume_1 = 1 + tick.bid_price_1 = tick.last_price - price_tick + tick.bid_volume_1 = 1 + + return tick + + def update(self) -> None: + """ + 更新行情/委托/账户/持仓 + """ + while self.api.wait_update(): + + # 更新行情信息 + for vt_symbol, quote in self.quote_objs: + if self.api.is_changing(quote): + tick = self.generate_tick_from_quote(vt_symbol, quote) + tick and self.gateway.on_tick(tick) and self.gateway.on_custom_tick(tick) + + def subscribe(self, req: SubscribeRequest) -> None: + """ + 订阅行情 + """ + if req.vt_symbol not in self.subscribe_array: + symbol, exchange = extract_vt_symbol(req.vt_symbol) + try: + quote = self.api.get_quote(vt_to_tq_symbol(symbol, exchange)) + self.quote_objs.append((req.vt_symbol, quote)) + self.subscribe_array.append(req.vt_symbol) + except Exception as ex: + self.gateway.write_log('订阅天勤行情异常:{}'.format(str(ex))) + + def query_contracts(self) -> None: + """""" + self.all_instruments = [ + v for k, v in self.api._data["quotes"].items() if v["expired"] == False + ] + for contract in self.all_instruments: + if ( + "SSWE" in contract["instrument_id"] + or "CSI" in contract["instrument_id"] + ): + # vnpy没有这两个交易所,需要可以自行修改vnpy代码 + continue + + vt_symbol = tq_to_vt_symbol(contract["instrument_id"]) + symbol, exchange = extract_vt_symbol(vt_symbol) + + if TQ2VT_TYPE[contract["ins_class"]] == Product.OPTION: + contract_data = ContractData( + symbol=symbol, + exchange=exchange, + name=symbol, + product=TQ2VT_TYPE[contract["ins_class"]], + size=contract["volume_multiple"], + pricetick=contract["price_tick"], + history_data=True, + option_strike=contract["strike_price"], + option_underlying=tq_to_vt_symbol(contract["underlying_symbol"]), + option_type=OptionType[contract["option_class"]], + option_expiry=datetime.fromtimestamp(contract["expire_datetime"]), + option_index=tq_to_vt_symbol(contract["underlying_symbol"]), + gateway_name=self.gateway_name, + ) + else: + contract_data = ContractData( + symbol=symbol, + exchange=exchange, + name=symbol, + product=TQ2VT_TYPE[contract["ins_class"]], + size=contract["volume_multiple"], + pricetick=contract["price_tick"], + history_data=True, + gateway_name=self.gateway_name, + ) + self.gateway.on_contract(contract_data) + + def query_history(self, req: HistoryRequest) -> List[BarData]: + """ + 获取历史数据 + """ + symbol = req.symbol + exchange = req.exchange + interval = req.interval + start = req.start + end = req.end + # 天勤需要的数据 + tq_symbol = vt_to_tq_symbol(symbol, exchange) + tq_interval = INTERVAL_VT2TQ.get(interval) + end += timedelta(1) + total_days = end - start + # 一次最多只能下载 8964 根Bar + min_length = min(8964, total_days.days * 500) + df = self.api.get_kline_serial(tq_symbol, tq_interval, min_length).sort_values( + by=["datetime"] + ) + + # 时间戳对齐 + df["datetime"] = pd.to_datetime(df["datetime"] + TIME_GAP) + + # 过滤开始结束时间 + df = df[(df["datetime"] >= start - timedelta(days=1)) & (df["datetime"] < end)] + + data: List[BarData] = [] + if df is not None: + for ix, row in df.iterrows(): + bar = BarData( + symbol=symbol, + exchange=exchange, + interval=interval, + datetime=row["datetime"].to_pydatetime(), + open_price=row["open"], + high_price=row["high"], + low_price=row["low"], + close_price=row["close"], + volume=row["volume"], + open_interest=row.get("close_oi", 0), + gateway_name=self.gateway_name, + ) + data.append(bar) + return data + + def close(self) -> None: + """""" + try: + if self.api: + self.api.close() + self.is_connected = False + if self.update_thread: + self.update_thread.join() + except Exception as e: + self.gateway.write_log('退出天勤行情api异常:{}'.format(str(e))) + + class TickCombiner(object): """ Tick合成类 diff --git a/vnpy/trader/object.py b/vnpy/trader/object.py index a94016da..25da3f00 100644 --- a/vnpy/trader/object.py +++ b/vnpy/trader/object.py @@ -330,6 +330,7 @@ class ContractData(BaseData): option_underlying: str = "" # vt_symbol of underlying contract option_type: OptionType = None option_expiry: datetime = None + option_index: str = "" # vt_symbol mapping cur option def __post_init__(self): """""" diff --git a/vnpy/trader/ui/widget.py b/vnpy/trader/ui/widget.py index 4f0423d5..56911b34 100644 --- a/vnpy/trader/ui/widget.py +++ b/vnpy/trader/ui/widget.py @@ -504,7 +504,7 @@ class ConnectDialog(QtWidgets.QDialog): def __init__(self, main_engine: MainEngine, gateway_name: str): """""" super().__init__() - + self.setting = {} self.main_engine: MainEngine = main_engine self.gateway_name: str = gateway_name self.filename: str = f"connect_{gateway_name.lower()}.json" @@ -524,6 +524,8 @@ class ConnectDialog(QtWidgets.QDialog): # Saved setting provides field data used last time. loaded_setting = load_json(self.filename) + self.setting.update(loaded_setting) + # Initialize line edits and form layout based on setting. form = QtWidgets.QFormLayout() @@ -570,9 +572,11 @@ class ConnectDialog(QtWidgets.QDialog): field_value = field_type(widget.text()) setting[field_name] = field_value - save_json(self.filename, setting) + self.setting.update(setting) - self.main_engine.connect(setting, self.gateway_name) + save_json(self.filename, self.setting) + + self.main_engine.connect(self.setting, self.gateway_name) self.accept()