[新App] 数字货币合约交易引擎

This commit is contained in:
msincenselee 2020-03-15 09:50:00 +08:00
parent a0cf676ac7
commit 621797c743
14 changed files with 5465 additions and 20 deletions

View File

@ -6,25 +6,29 @@
# “当你想放弃时,想想你为什么开始。埃隆·马斯克”
###Fork版本主要改进如下
1. 事件引擎,增加运行效率调试功能
2. 增加rabbitMQ通信组件
3. 增加tdx 免费数据源,包括
1、 事件引擎,增加运行效率调试功能
2、 增加rabbitMQ通信组件
3、 增加tdx 免费数据源,包括
- 提供主力合约/指数合约的信息获取
- 提供期货/股票数据bar 和分笔成交数据下载
- 提供每日增量更新期货数据=> csv文件可配合NFS+Celery实现分布式回测
4. 增加App: tick_recorder, 直接异步写入csv文件
5. 增加App: index_tick_publisher, 订阅通达信指数行情=》rabbit_mq 推送
6. 增强ctp_gateway包括:
4、 增加App: tick_recorder, 直接异步写入csv文件
5、 增加App: index_tick_publisher, 订阅通达信指数行情=》rabbit_mq 推送
6、 增强ctp_gateway包括:
- 提供指数行情订阅
- 使用RabbitMQ指数源或tdx单一数据源
- 提供自定义合约功能实时提供其合成后的tick行情
7. 增加component组件包括:
7 增加component组件包括:
- 提供cta_line_bar k线组件支持国内文华/交易师/TB等分钟/小时的计算模式,支持任意秒/分钟/小时/天/周等周期支持k线数据实时生成。
@ -35,7 +39,7 @@
- 提供cta_period 组件,支持策略中‘周期’的逻辑
- 提供cta_grid_trade组件支持网格交易、复杂的策略持仓逻辑、持久化
8. 增加App: cta_strategy_pro包括
8 增加App: cta_strategy_pro包括
- 提供策略实例的单独日志记录文件
@ -52,7 +56,7 @@
- 增加CtaSpread模板支持FAK正套/反套
- 增加Spread组合引擎tick级别回测支持多策略实例得套利共享账号回测。
9、增强主引擎包括
9、 增强主引擎,包括:
- 支持同一类gateway多个接入配置
- 增加获取当前价格接口
@ -71,6 +75,14 @@
- 支持自定义套利合约得算法,及算法下单。
- 可通过vnpy界面/cta_strategy_pro策略直接发出套利单由算法引擎执行
12、 增加App: cta_crypto包括
- 增加币安合约交易vnpy.gateway.binancef支持每个合约独立杠杆比率
- 增肌币安合约数据接口 vnpy.data.binance.binance_future_data
- 独立的CTA引擎 cta_crypto运行数字货币时替代原版cta_strategy引擎。
- 支持bar方式回测/组合回测
- 增强期货交易模板
大佳
QQ/Wechat28888502

View File

@ -0,0 +1,29 @@
from pathlib import Path
from vnpy.trader.app import BaseApp
from .base import APP_NAME, StopOrder
from .engine import CtaEngine
from .template import (
Direction,
Offset,
Status,
Interval,
TickData,
BarData,
TradeData,
OrderData,
CtaTemplate, CtaFutureTemplate) # noqa
from vnpy.trader.utility import BarGenerator, ArrayManager # noqa
class CtaCryptoApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略CRYPTO"
engine_class = CtaEngine
widget_name = "CtaManager"
icon_name = "cta.ico"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,53 @@
"""
Defines constants and objects used in CtaCrypto App.
"""
from dataclasses import dataclass, field
from enum import Enum
from datetime import timedelta
from vnpy.trader.constant import Direction, Offset, Interval
APP_NAME = "CtaCrypto"
STOPORDER_PREFIX = "STOP"
class StopOrderStatus(Enum):
WAITING = "等待中"
CANCELLED = "已撤销"
TRIGGERED = "已触发"
class EngineType(Enum):
LIVE = "实盘"
BACKTESTING = "回测"
class BacktestingMode(Enum):
BAR = 1
TICK = 2
@dataclass
class StopOrder:
vt_symbol: str
direction: Direction
offset: Offset
price: float
volume: float
stop_orderid: str
strategy_name: str
lock: bool = False
vt_orderids: list = field(default_factory=list)
status: StopOrderStatus = StopOrderStatus.WAITING
gateway_name: str = None
EVENT_CTA_LOG = "eCtaLog"
EVENT_CTA_STRATEGY = "eCtaStrategy"
EVENT_CTA_STOPORDER = "eCtaStopOrder"
INTERVAL_DELTA_MAP = {
Interval.MINUTE: timedelta(minutes=1),
Interval.HOUR: timedelta(hours=1),
Interval.DAILY: timedelta(days=1),
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,330 @@
# encoding: UTF-8
'''
本文件中包含的是CTA模块的组合回测引擎回测引擎的API和CTA引擎一致
可以使用和实盘相同的代码进行回测
华富资产 李来佳
'''
from __future__ import division
import sys
import os
import gc
import pandas as pd
import traceback
import random
import bz2
import pickle
from datetime import datetime, timedelta
from time import sleep
from vnpy.trader.object import (
TickData,
BarData,
RenkoBarData,
)
from vnpy.trader.constant import (
Exchange,
)
from vnpy.trader.utility import (
extract_vt_symbol,
)
from .back_testing import BackTestingEngine
class PortfolioTestingEngine(BackTestingEngine):
"""
CTA组合回测引擎, 使用回测引擎作为父类
函数接口和策略引擎保持一样
从而实现同一套代码从回测到实盘
针对1分钟bar的回测 或者tick回测
导入CTA_Settings
"""
def __init__(self, event_engine=None):
"""Constructor"""
super().__init__(event_engine)
self.bar_csv_file = {}
self.bar_df_dict = {} # 历史数据的df回测用
self.bar_df = None # 历史数据的df时间+symbol作为组合索引
self.bar_interval_seconds = 60 # bar csv文件属于K线类型K线的周期秒数,缺省是1分钟
self.tick_path = None # tick级别回测 路径
def load_bar_csv_to_df(self, vt_symbol, bar_file, data_start_date=None, data_end_date=None):
"""加载回测bar数据到DataFrame"""
self.output(u'loading {} from {}'.format(vt_symbol, bar_file))
if vt_symbol in self.bar_df_dict:
return True
if not os.path.exists(bar_file):
self.write_error(u'回测时,{}对应的csv bar文件{}不存在'.format(vt_symbol, bar_file))
return False
try:
data_types = {
"datetime": str,
"open": float,
"high": float,
"low": float,
"close": float,
"open_interest": float,
"volume": float,
"instrument_id": str,
"symbol": str,
"total_turnover": float,
"limit_down": float,
"limit_up": float,
"trading_day": str,
"date": str,
"time": str
}
# 加载csv文件 =》 dateframe
symbol_df = pd.read_csv(bar_file, dtype=data_types)
# 转换时间str =》 datetime
symbol_df["datetime"] = pd.to_datetime(symbol_df["datetime"], format="%Y-%m-%d %H:%M:%S")
# 设置时间为索引
symbol_df = symbol_df.set_index("datetime")
# 裁剪数据
symbol_df = symbol_df.loc[self.test_start_date:self.test_end_date]
self.bar_df_dict.update({vt_symbol: symbol_df})
except Exception as ex:
self.write_error(u'回测时读取{} csv文件{}失败:{}'.format(vt_symbol, bar_file, ex))
self.output(u'回测时读取{} csv文件{}失败:{}'.format(vt_symbol, bar_file, ex))
return False
return True
def comine_bar_df(self):
"""
合并所有回测合约的bar DataFrame =集中的DataFrame
把bar_df_dict =bar_df
:return:
"""
self.output('comine_df')
self.bar_df = pd.concat(self.bar_df_dict, axis=0).swaplevel(0, 1).sort_index()
self.bar_df_dict.clear()
def prepare_env(self, test_settings):
self.output('portfolio prepare_env')
super().prepare_env(test_settings)
def prepare_data(self, data_dict):
"""
准备组合数据
:param data_dict: 合约得配置参数
:return:
"""
# 调用回测引擎,跟新合约得数据
super().prepare_data(data_dict)
if len(data_dict) == 0:
self.write_log(u'请指定回测数据和文件')
return
if self.mode == 'tick':
return
# 检查/更新bar文件
for symbol, symbol_data in data_dict.items():
self.write_log(u'配置{}数据:{}'.format(symbol, symbol_data))
bar_file = symbol_data.get('bar_file', None)
if bar_file is None:
self.write_error(u'{}没有配置数据文件')
continue
if not os.path.isfile(bar_file):
self.write_log(u'{0}文件不存在'.format(bar_file))
continue
self.bar_csv_file.update({symbol: bar_file})
def run_portfolio_test(self, strategy_settings: dict = {}):
"""
运行组合回测
"""
if not self.strategy_start_date:
self.write_error(u'回测开始日期未设置。')
return
if len(strategy_settings) == 0:
self.write_error('未提供有效配置策略实例')
return
self.cur_capital = self.init_capital # 更新设置期初资金
if not self.data_end_date:
self.data_end_date = datetime.today()
self.write_log(u'开始组合回测')
for strategy_name, strategy_setting in strategy_settings.items():
self.load_strategy(strategy_name, strategy_setting)
self.write_log(u'策略初始化完成')
self.write_log(u'开始回放数据')
self.write_log(u'开始回测:{} ~ {}'.format(self.data_start_date, self.data_end_date))
if self.mode == 'bar':
self.run_bar_test()
else:
self.write_error('目前仅实现bar回测')
def run_bar_test(self):
"""使用bar进行组合回测"""
testdays = (self.data_end_date - self.data_start_date).days
if testdays < 1:
self.write_log(u'回测时间不足')
return
# 加载数据
for vt_symbol in self.symbol_strategy_map.keys():
symbol, exchange = extract_vt_symbol(vt_symbol)
self.load_bar_csv_to_df(vt_symbol, self.bar_csv_file.get(symbol))
# 合并数据
self.comine_bar_df()
last_trading_day = None
bars_dt = None
bars_same_dt = []
gc_collect_days = 0
try:
for (dt, vt_symbol), bar_data in self.bar_df.iterrows():
symbol, exchange = extract_vt_symbol(vt_symbol)
if symbol.startswith('future_renko'):
bar_datetime = dt
bar = RenkoBarData(
gateway_name='backtesting',
symbol=symbol,
exchange=exchange,
datetime=bar_datetime
)
bar.seconds = float(bar_data.get('seconds', 0))
bar.high_seconds = float(bar_data.get('high_seconds', 0)) # 当前Bar的上限秒数
bar.low_seconds = float(bar_data.get('low_seconds', 0)) # 当前bar的下限秒数
bar.height = float(bar_data.get('height', 0)) # 当前Bar的高度限制
bar.up_band = float(bar_data.get('up_band', 0)) # 高位区域的基线
bar.down_band = float(bar_data.get('down_band', 0)) # 低位区域的基线
bar.low_time = bar_data.get('low_time', None) # 最后一次进入低位区域的时间
bar.high_time = bar_data.get('high_time', None) # 最后一次进入高位区域的时间
else:
bar_datetime = dt - timedelta(seconds=self.bar_interval_seconds)
bar = BarData(
gateway_name='backtesting',
symbol=symbol,
exchange=exchange,
datetime=bar_datetime
)
if 'open' in bar_data:
bar.open_price = float(bar_data['open'])
bar.close_price = float(bar_data['close'])
bar.high_price = float(bar_data['high'])
bar.low_price = float(bar_data['low'])
else:
bar.open_price = float(bar_data['open_price'])
bar.close_price = float(bar_data['close_price'])
bar.high_price = float(bar_data['high_price'])
bar.low_price = float(bar_data['low_price'])
bar.volume = int(bar_data['volume'])
bar.date = dt.strftime('%Y-%m-%d')
bar.time = dt.strftime('%H:%M:%S')
str_td = str(bar_data.get('trading_day', ''))
if len(str_td) == 8:
bar.trading_day = str_td[0:4] + '-' + str_td[4:6] + '-' + str_td[6:8]
else:
bar.trading_day = bar.date
if last_trading_day != bar.trading_day:
self.output(u'回测数据日期:{},资金:{}'.format(bar.trading_day, self.net_capital))
if self.strategy_start_date > bar.datetime:
last_trading_day = bar.trading_day
# bar时间与队列时间一致添加到队列中
if dt == bars_dt:
bars_same_dt.append(bar)
continue
else:
# bar时间与队列时间不一致先推送队列的bars
random.shuffle(bars_same_dt)
for _bar_ in bars_same_dt:
self.new_bar(_bar_)
# 创建新的队列
bars_same_dt = [bar]
bars_dt = dt
# 更新每日净值
if self.strategy_start_date <= dt <= self.data_end_date:
if last_trading_day != bar.trading_day:
if last_trading_day is not None:
self.saving_daily_data(datetime.strptime(last_trading_day, '%Y-%m-%d'), self.cur_capital,
self.max_net_capital, self.total_commission)
last_trading_day = bar.trading_day
# 第二个交易日,撤单
self.cancel_orders()
# 更新持仓缓存
self.update_pos_buffer()
gc_collect_days += 1
if gc_collect_days >= 10:
# 执行内存回收
gc.collect()
sleep(1)
gc_collect_days = 0
if self.net_capital < 0:
self.write_error(u'净值低于0回测停止')
self.output(u'净值低于0回测停止')
return
self.write_log(u'bar数据回放完成')
if last_trading_day is not None:
self.saving_daily_data(datetime.strptime(last_trading_day, '%Y-%m-%d'), self.cur_capital,
self.max_net_capital, self.total_commission)
except Exception as ex:
self.write_error(u'回测异常导致停止:{}'.format(str(ex)))
self.write_error(u'{},{}'.format(str(ex), traceback.format_exc()))
print(str(ex), file=sys.stderr)
traceback.print_exc()
return
def single_test(test_setting: dict, strategy_setting: dict):
"""
单一回测
: test_setting, 组合回测所需的配置包括合约信息数据bar信息回测时间资金等
strategy_setting, dict, 一个或多个策略配置
"""
# 创建组合回测引擎
engine = PortfolioTestingEngine()
engine.prepare_env(test_setting)
try:
engine.run_portfolio_test(strategy_setting)
# 回测结果,保存
engine.show_backtesting_result()
except Exception as ex:
print('组合回测异常{}'.format(str(ex)))
traceback.print_exc()
return False
print('测试结束')
return True

View File

@ -0,0 +1,758 @@
""""""
import os
import sys
import uuid
import bz2
import pickle
import traceback
import zlib
import json
from abc import ABC
from copy import copy
from typing import Any, Callable
from logging import INFO, ERROR
from datetime import datetime
from vnpy.trader.constant import Interval, Direction, Offset, Status, OrderType
from vnpy.trader.object import BarData, TickData, OrderData, TradeData
from vnpy.trader.utility import virtual, append_data, extract_vt_symbol, get_underlying_symbol
from .base import StopOrder, EngineType
from vnpy.component.cta_grid_trade import CtaGrid, CtaGridTrade, LOCK_GRID
from vnpy.component.cta_position import CtaPosition
from vnpy.component.cta_policy import CtaPolicy # noqa
class CtaTemplate(ABC):
"""CTA策略模板"""
author = ""
parameters = []
variables = []
# 保存委托单编号和相关委托单的字典
# key为委托单编号
# value为该合约相关的委托单
active_orders = {}
def __init__(
self,
cta_engine: Any,
strategy_name: str,
vt_symbol: str,
setting: dict,
):
""""""
self.cta_engine = cta_engine
self.strategy_name = strategy_name
self.vt_symbol = vt_symbol
self.inited = False # 是否初始化完毕
self.trading = False # 是否开始交易
self.pos = 0 # 持仓/仓差
self.entrust = 0 # 是否正在委托, 0, 无委托 , 1, 委托方向是LONG -1, 委托方向是SHORT
self.tick_dict = {} # 记录所有on_tick传入最新tick
# Copy a new variables list here to avoid duplicate insert when multiple
# strategy instances are created with the same strategy class.
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.variables.insert(2, "pos")
self.variables.insert(3, "entrust")
def update_setting(self, setting: dict):
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
@classmethod
def get_class_parameters(cls):
"""
Get default parameters dict of strategy class.
"""
class_parameters = {}
for name in cls.parameters:
class_parameters[name] = getattr(cls, name)
return class_parameters
def get_parameters(self):
"""
Get strategy parameters dict.
"""
strategy_parameters = {}
for name in self.parameters:
strategy_parameters[name] = getattr(self, name)
return strategy_parameters
def get_variables(self):
"""
Get strategy variables dict.
"""
strategy_variables = {}
for name in self.variables:
strategy_variables[name] = getattr(self, name)
return strategy_variables
def get_data(self):
"""
Get strategy data.
"""
strategy_data = {
"strategy_name": self.strategy_name,
"vt_symbol": self.vt_symbol,
"class_name": self.__class__.__name__,
"author": self.author,
"parameters": self.get_parameters(),
"variables": self.get_variables(),
}
return strategy_data
def get_positions(self):
""" 返回持仓数量"""
pos_list = []
if self.pos > 0:
pos_list.append({
"vt_symbol": self.vt_symbol,
"direction": "long",
"volume": self.pos
})
elif self.pos < 0:
pos_list.append({
"vt_symbol": self.vt_symbol,
"direction": "short",
"volume": abs(self.pos)
})
return pos_list
@virtual
def on_timer(self):
pass
@virtual
def on_init(self):
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_start(self):
"""
Callback when strategy is started.
"""
pass
@virtual
def on_stop(self):
"""
Callback when strategy is stopped.
"""
pass
@virtual
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
pass
@virtual
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
pass
@virtual
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
pass
@virtual
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
@virtual
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
def buy(self, price: float, volume: float, stop: bool = False,
vt_symbol: str = '', order_type: OrderType = OrderType.LIMIT,
order_time: datetime = None, grid: CtaGrid = None):
"""
Send buy order to open a long position.
"""
if OrderType in [OrderType.FAK, OrderType.FOK]:
if self.is_upper_limit(vt_symbol):
self.write_error(u'涨停价不做FAK/FOK委托')
return []
return self.send_order(vt_symbol=vt_symbol,
direction=Direction.LONG,
offset=Offset.OPEN,
price=price,
volume=volume,
stop=stop,
order_type=order_type,
order_time=order_time,
grid=grid)
def sell(self, price: float, volume: float, stop: bool = False,
vt_symbol: str = '', order_type: OrderType = OrderType.LIMIT,
order_time: datetime = None, grid: CtaGrid = None):
"""
Send sell order to close a long position.
"""
if OrderType in [OrderType.FAK, OrderType.FOK]:
if self.is_lower_limit(vt_symbol):
self.write_error(u'跌停价不做FAK/FOK sell委托')
return []
return self.send_order(vt_symbol=vt_symbol,
direction=Direction.SHORT,
offset=Offset.CLOSE,
price=price,
volume=volume,
stop=stop,
order_type=order_type,
order_time=order_time,
grid=grid)
def short(self, price: float, volume: float, stop: bool = False,
vt_symbol: str = '', order_type: OrderType = OrderType.LIMIT,
order_time: datetime = None, grid: CtaGrid = None):
"""
Send short order to open as short position.
"""
if OrderType in [OrderType.FAK, OrderType.FOK]:
if self.is_lower_limit(vt_symbol):
self.write_error(u'跌停价不做FAK/FOK short委托')
return []
return self.send_order(vt_symbol=vt_symbol,
direction=Direction.SHORT,
offset=Offset.OPEN,
price=price,
volume=volume,
stop=stop,
order_type=order_type,
order_time=order_time,
grid=grid)
def cover(self, price: float, volume: float, stop: bool = False,
vt_symbol: str = '', order_type: OrderType = OrderType.LIMIT,
order_time: datetime = None, grid: CtaGrid = None):
"""
Send cover order to close a short position.
"""
if OrderType in [OrderType.FAK, OrderType.FOK]:
if self.is_upper_limit(vt_symbol):
self.write_error(u'涨停价不做FAK/FOK cover委托')
return []
return self.send_order(vt_symbol=vt_symbol,
direction=Direction.LONG,
offset=Offset.CLOSE,
price=price,
volume=volume,
stop=stop,
order_type=order_type,
order_time=order_time,
grid=grid)
def send_order(
self,
vt_symbol: str,
direction: Direction,
offset: Offset,
price: float,
volume: float,
stop: bool = False,
order_type: OrderType = OrderType.LIMIT,
order_time: datetime = None,
grid: CtaGrid = None
):
"""
Send a new order.
"""
# 兼容cta_strategy的模板缺省不指定vt_symbol时使用策略配置的vt_symbol
if vt_symbol == '':
vt_symbol = self.vt_symbol
if not self.trading:
return []
vt_orderids = self.cta_engine.send_order(
strategy=self,
vt_symbol=vt_symbol,
direction=direction,
offset=offset,
price=price,
volume=volume,
stop=stop,
order_type=order_type
)
if order_time is None:
order_time = datetime.now()
for vt_orderid in vt_orderids:
d = {
'direction': direction,
'offset': offset,
'vt_symbol': vt_symbol,
'price': price,
'volume': volume,
'order_type': order_type,
'traded': 0,
'order_time': order_time,
'status': Status.SUBMITTING
}
if grid:
d.update({'grid': grid})
grid.order_ids.append(vt_orderid)
self.active_orders.update({vt_orderid: d})
if direction == Direction.LONG:
self.entrust = 1
elif direction == Direction.SHORT:
self.entrust = -1
return vt_orderids
def cancel_order(self, vt_orderid: str):
"""
Cancel an existing order.
"""
if self.trading:
return self.cta_engine.cancel_order(self, vt_orderid)
return False
def cancel_all(self):
"""
Cancel all orders sent by strategy.
"""
if self.trading:
self.cta_engine.cancel_all(self)
def is_upper_limit(self, symbol):
"""是否涨停"""
tick = self.tick_dict.get(symbol, None)
if tick is None or tick.limit_up is None or tick.limit_up == 0:
return False
if tick.bid_price_1 == tick.limit_up:
return True
def is_lower_limit(self, symbol):
"""是否跌停"""
tick = self.tick_dict.get(symbol, None)
if tick is None or tick.limit_down is None or tick.limit_down == 0:
return False
if tick.ask_price_1 == tick.limit_down:
return True
def write_log(self, msg: str, level: int = INFO):
"""
Write a log message.
"""
self.cta_engine.write_log(msg=msg, strategy_name=self.strategy_name, level=level)
def write_error(self, msg: str):
"""write error log message"""
self.write_log(msg=msg, level=ERROR)
def get_engine_type(self):
"""
Return whether the cta_engine is backtesting or live trading.
"""
return self.cta_engine.get_engine_type()
def load_bar(
self,
days: int,
interval: Interval = Interval.MINUTE,
callback: Callable = None,
):
"""
Load historical bar data for initializing strategy.
"""
if not callback:
callback = self.on_bar
self.cta_engine.load_bar(self.vt_symbol, days, interval, callback)
def load_tick(self, days: int):
"""
Load historical tick data for initializing strategy.
"""
self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick)
def put_event(self):
"""
Put an strategy data event for ui update.
"""
if self.inited:
self.cta_engine.put_strategy_event(self)
def send_email(self, msg):
"""
Send email to default receiver.
"""
if self.inited:
self.cta_engine.send_email(msg, self)
def sync_data(self):
"""
Sync strategy variables value into disk storage.
"""
if self.trading:
self.cta_engine.sync_strategy_data(self)
class CtaFutureTemplate(CtaTemplate):
"""
合约期货模板
"""
price_tick = 1 # 商品的最小价格跳动
symbol_size = 10 # 商品得合约乘数
margin_rate = 0.1 # 商品的保证金
volumn_tick = 1 # 商品最小成交数量
# 委托类型
order_type = OrderType.LIMIT
cancel_seconds = 120 # 撤单时间(秒)
# 资金相关
max_invest_rate = 0.1 # 最大仓位(0~1)
max_invest_margin = 0 # 资金上限 0不限制
max_invest_pos = 0 # 单向头寸数量上限 0不限制
# 是否回测状态
backtesting = False
# 逻辑过程日志
dist_fieldnames = ['datetime', 'symbol', 'volume', 'price',
'operation', 'signal', 'stop_price', 'target_price',
'long_pos', 'short_pos']
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""""""
self.position = None # 仓位组件
self.policy = None # 事务执行组件
self.gt = None # 网格交易组件
self.klines = {} # K线组件字典: kline_name: kline
self.cur_datetime = None # 当前Tick时间
self.cur_tick = None # 最新的合约tick( vt_symbol)
self.cur_price = None # 当前价(主力合约 vt_symbol)
self.last_minute = None # 最后的分钟,用于on_tick内每分钟处理的逻辑
super().__init__(
cta_engine, strategy_name, vt_symbol, setting
)
# 增加仓位管理模块
self.position = CtaPosition(strategy=self)
self.position.maxPos = sys.maxsize
# 增加网格持久化模块
self.gt = CtaGridTrade(strategy=self)
if 'backtesting' not in self.parameters:
self.parameters.append('backtesting')
def update_setting(self, setting: dict):
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
self.price_tick = self.cta_engine.get_price_tick(self.vt_symbol)
self.symbol_size = self.cta_engine.get_size(self.vt_symbol)
self.margin_rate = self.cta_engine.get_margin_rate(self.vt_symbol)
self.volumn_tick = self.cta_engine.get_volume_tick(self.vt_symbol)
def save_klines_to_cache(self, kline_names: list = []):
"""
保存K线数据到缓存
:param kline_names: 一般为self.klines的keys
:return:
"""
if len(kline_names) == 0:
kline_names = list(self.klines.keys())
# 获取保存路径
save_path = self.cta_engine.get_data_path()
# 保存缓存的文件名
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2'))
with bz2.BZ2File(file_name, 'wb') as f:
klines = {}
for kline_name in kline_names:
kline = self.klines.get(kline_name, None)
# if kline:
# kline.strategy = None
# kline.cb_on_bar = None
klines.update({kline_name: kline})
pickle.dump(klines, f)
def load_klines_from_cache(self, kline_names: list = []):
"""
从缓存加载K线数据
:param kline_names:
:return:
"""
if len(kline_names) == 0:
kline_names = list(self.klines.keys())
save_path = self.cta_engine.get_data_path()
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_klines.pkb2'))
try:
last_bar_dt = None
with bz2.BZ2File(file_name, 'rb') as f:
klines = pickle.load(f)
# 逐一恢复K线
for kline_name in kline_names:
# 缓存的k线实例
cache_kline = klines.get(kline_name, None)
# 当前策略实例的K线实例
strategy_kline = self.klines.get(kline_name, None)
if cache_kline and strategy_kline:
# 临时保存当前的回调函数
cb_on_bar = strategy_kline.cb_on_bar
# 缓存实例数据 =》 当前实例数据
strategy_kline.__dict__.update(cache_kline.__dict__)
# 所有K线的最后时间
if last_bar_dt and strategy_kline.cur_datetime:
last_bar_dt = max(last_bar_dt, strategy_kline.cur_datetime)
else:
last_bar_dt = strategy_kline.cur_datetime
# 重新绑定k线策略与on_bar回调函数
strategy_kline.strategy = self
strategy_kline.cb_on_bar = cb_on_bar
self.write_log(f'恢复{kline_name}缓存数据,最新bar结束时间:{last_bar_dt}')
self.write_log(u'加载缓存k线数据完毕')
return last_bar_dt
except Exception as ex:
self.write_error(f'加载缓存K线数据失败:{str(ex)}')
return None
def get_klines_snapshot(self):
"""返回当前klines的切片数据"""
try:
d = {
'strategy': self.strategy_name,
'datetime': datetime.now()}
klines = {}
for kline_name in sorted(self.klines.keys()):
klines.update({kline_name: self.klines.get(kline_name).get_data()})
kline_names = list(klines.keys())
binary_data = zlib.compress(pickle.dumps(klines))
d.update({'kline_names': kline_names, 'klines': binary_data, 'zlib': True})
return d
except Exception as ex:
self.write_error(f'获取klines切片数据失败:{str(ex)}')
return {}
def init_position(self):
"""
初始化Positin
使用网格的持久化获取开仓状态的多空单更新
:return:
"""
self.write_log(u'init_position(),初始化持仓')
if len(self.gt.up_grids) <= 0:
self.position.short_pos = 0
# 加载已开仓的空单数据网格JSON
short_grids = self.gt.load(direction=Direction.SHORT, open_status_filter=[True])
if len(short_grids) == 0:
self.write_log(u'没有持久化的空单数据')
self.gt.up_grids = []
else:
self.gt.up_grids = short_grids
for sg in short_grids:
if len(sg.order_ids) > 0 or sg.order_status:
self.write_log(f'重置委托状态:{sg.order_status},清除委托单:{sg.order_ids}')
sg.order_status = False
sg.order_ids = []
self.write_log(u'加载持仓空单[{},价格:{},数量:{}手,开仓时间:{}'
.format(self.vt_symbol, sg.open_price,
sg.volume, sg.open_time))
self.position.short_pos -= sg.volume
self.write_log(u'持久化空单,共持仓:{}'.format(abs(self.position.short_pos)))
if len(self.gt.dn_grids) <= 0:
# 加载已开仓的多数据网格JSON
self.position.long_pos = 0
long_grids = self.gt.load(direction=Direction.LONG, open_status_filter=[True])
if len(long_grids) == 0:
self.write_log(u'没有持久化的多单数据')
self.gt.dn_grids = []
else:
self.gt.dn_grids = long_grids
for lg in long_grids:
if len(lg.order_ids) > 0 or lg.order_status:
self.write_log(f'重置委托状态:{lg.order_status},清除委托单:{lg.order_ids}')
lg.order_status = False
lg.order_ids = []
self.write_log(u'加载持仓多单[{},价格:{},数量:{}手, 开仓时间:{}'
.format(self.vt_symbol, lg.open_price, lg.volume, lg.open_time))
self.position.long_pos += lg.volume
self.write_log(f'持久化多单,共持仓:{self.position.long_pos}')
self.position.pos = self.position.long_pos + self.position.short_pos
self.write_log(u'{}加载持久化数据完成,多单:{},空单:{},共:{}'
.format(self.strategy_name,
self.position.long_pos,
abs(self.position.short_pos),
self.position.pos))
self.pos = self.position.pos
self.gt.save()
self.display_grids()
def get_positions(self):
"""
获取策略当前持仓(重构使用主力合约
:return: [{'vt_symbol':symbol,'direction':direction,'volume':volume]
"""
if not self.position:
return []
pos_list = []
if self.position.long_pos > 0:
for g in self.gt.get_opened_grids(direction=Direction.LONG):
pos_list.append({'vt_symbol': self.vt_symbol,
'direction': 'long',
'volume': g.volume - g.traded_volume,
'price': g.open_price})
if abs(self.position.short_pos) > 0:
for g in self.gt.get_opened_grids(direction=Direction.SHORT):
pos_list.append({'vt_symbol': self.vt_symbol,
'direction': 'short',
'volume': abs(g.volume - g.traded_volume),
'price': g.open_price})
if self.cur_datetime and (datetime.now() - self.cur_datetime).total_seconds() < 10:
self.write_log(u'当前持仓:{}'.format(pos_list))
return pos_list
def tns_cancel_logic(self, dt, force=False):
"撤单逻辑"""
if len(self.active_orders) < 1:
self.entrust = 0
return
for vt_orderid in list(self.active_orders.keys()):
order_info = self.active_orders.get(vt_orderid)
if order_info.get('status', None) in [Status.CANCELLED, Status.REJECTED]:
self.active_orders.pop(vt_orderid, None)
continue
order_time = order_info.get('order_time')
over_ms = (dt - order_time).total_seconds()
if (over_ms > self.cancel_seconds) \
or force: # 超过设置的时间还未成交
self.write_log(f'{dt}, 超时{over_ms}秒未成交,取消委托单:{order_info}')
if self.cancel_order(vt_orderid):
order_info.update({'status': Status.CANCELLING})
else:
order_info.update({'status': Status.CANCELLED})
if len(self.active_orders) < 1:
self.entrust = 0
def display_grids(self):
"""更新网格显示信息"""
if not self.inited:
return
up_grids_info = self.gt.to_str(direction=Direction.SHORT)
if len(self.gt.up_grids) > 0:
self.write_log(up_grids_info)
dn_grids_info = self.gt.to_str(direction=Direction.LONG)
if len(self.gt.dn_grids) > 0:
self.write_log(dn_grids_info)
def display_tns(self):
"""显示事务的过程记录=》 log"""
if not self.inited:
return
self.write_log(u'{} 当前 {}价格:{}'
.format(self.cur_datetime, self.vt_symbol, self.cur_price))
if hasattr(self, 'policy'):
policy = getattr(self, 'policy')
op = getattr(policy, 'to_json', None)
if callable(op):
self.write_log(u'当前Policy:{}'.format(json.dumps(policy.to_json(), indent=2, ensure_ascii=False)))
def save_dist(self, dist_data):
"""
保存策略逻辑过程记录= csv文件按
:param dist_data:
:return:
"""
if self.backtesting:
save_path = self.cta_engine.get_logs_path()
else:
save_path = self.cta_engine.get_data_path()
try:
if self.position and 'long_pos' not in dist_data:
dist_data.update({'long_pos': self.position.long_pos})
if self.position and 'short_pos' not in dist_data:
dist_data.update({'short_pos': self.position.short_pos})
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_dist.csv'))
append_data(file_name=file_name, dict_data=dist_data, field_names=self.dist_fieldnames)
except Exception as ex:
self.write_error(u'save_dist 异常:{} {}'.format(str(ex), traceback.format_exc()))
def save_tns(self, tns_data):
"""
保存多空事务记录=csv文件,便于后续分析
:param tns_data:
:return:
"""
if self.backtesting:
save_path = self.cta_engine.get_logs_path()
else:
save_path = self.cta_engine.get_data_path()
try:
file_name = os.path.abspath(os.path.join(save_path, f'{self.strategy_name}_tns.csv'))
append_data(file_name=file_name, dict_data=tns_data)
except Exception as ex:
self.write_error(u'save_tns 异常:{} {}'.format(str(ex), traceback.format_exc()))
def send_wechat(self, msg: str):
"""实盘时才发送微信"""
if self.backtesting:
return
self.cta_engine.send_wechat(msg=msg, strategy=self)

View File

@ -0,0 +1 @@
from .widget import CtaManager

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -0,0 +1,464 @@
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtCore, QtGui, QtWidgets
from vnpy.trader.ui.widget import (
BaseCell,
EnumCell,
MsgCell,
TimeCell,
BaseMonitor
)
from ..base import (
APP_NAME,
EVENT_CTA_LOG,
EVENT_CTA_STOPORDER,
EVENT_CTA_STRATEGY
)
from ..engine import CtaEngine
class CtaManager(QtWidgets.QWidget):
""""""
signal_log = QtCore.pyqtSignal(Event)
signal_strategy = QtCore.pyqtSignal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
super(CtaManager, self).__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.cta_engine = main_engine.get_engine(APP_NAME)
self.managers = {}
self.init_ui()
self.register_event()
self.cta_engine.init_engine()
self.update_class_combo()
def init_ui(self):
""""""
self.setWindowTitle("CTA策略")
# Create widgets
self.class_combo = QtWidgets.QComboBox()
add_button = QtWidgets.QPushButton("添加策略")
add_button.clicked.connect(self.add_strategy)
init_button = QtWidgets.QPushButton("全部初始化")
init_button.clicked.connect(self.cta_engine.init_all_strategies)
start_button = QtWidgets.QPushButton("全部启动")
start_button.clicked.connect(self.cta_engine.start_all_strategies)
stop_button = QtWidgets.QPushButton("全部停止")
stop_button.clicked.connect(self.cta_engine.stop_all_strategies)
clear_button = QtWidgets.QPushButton("清空日志")
clear_button.clicked.connect(self.clear_log)
self.scroll_layout = QtWidgets.QVBoxLayout()
self.scroll_layout.addStretch()
scroll_widget = QtWidgets.QWidget()
scroll_widget.setLayout(self.scroll_layout)
scroll_area = QtWidgets.QScrollArea()
scroll_area.setWidgetResizable(True)
scroll_area.setWidget(scroll_widget)
self.log_monitor = LogMonitor(self.main_engine, self.event_engine)
self.stop_order_monitor = StopOrderMonitor(
self.main_engine, self.event_engine
)
# Set layout
hbox1 = QtWidgets.QHBoxLayout()
hbox1.addWidget(self.class_combo)
hbox1.addWidget(add_button)
hbox1.addStretch()
hbox1.addWidget(init_button)
hbox1.addWidget(start_button)
hbox1.addWidget(stop_button)
hbox1.addWidget(clear_button)
grid = QtWidgets.QGridLayout()
grid.addWidget(scroll_area, 0, 0, 2, 1)
grid.addWidget(self.stop_order_monitor, 0, 1)
grid.addWidget(self.log_monitor, 1, 1)
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(hbox1)
vbox.addLayout(grid)
self.setLayout(vbox)
def update_class_combo(self):
""""""
self.class_combo.addItems(
self.cta_engine.get_all_strategy_class_names()
)
def register_event(self):
""""""
self.signal_strategy.connect(self.process_strategy_event)
self.event_engine.register(
EVENT_CTA_STRATEGY, self.signal_strategy.emit
)
def process_strategy_event(self, event):
"""
Update strategy status onto its monitor.
"""
data = event.data
strategy_name = data["strategy_name"]
if strategy_name in self.managers:
manager = self.managers[strategy_name]
manager.update_data(data)
else:
manager = StrategyManager(self, self.cta_engine, data)
self.scroll_layout.insertWidget(0, manager)
self.managers[strategy_name] = manager
def remove_strategy(self, strategy_name):
""""""
manager = self.managers.pop(strategy_name)
manager.deleteLater()
def add_strategy(self):
""""""
class_name = str(self.class_combo.currentText())
if not class_name:
return
parameters = self.cta_engine.get_strategy_class_parameters(class_name)
editor = SettingEditor(parameters, class_name=class_name)
n = editor.exec_()
if n == editor.Accepted:
setting = editor.get_setting()
vt_symbol = setting.pop("vt_symbol")
strategy_name = setting.pop("strategy_name")
auto_init = setting.pop("auto_init", False)
auto_start = setting.pop("auto_start", False)
self.cta_engine.add_strategy(
class_name, strategy_name, vt_symbol, setting, auto_init, auto_start
)
def clear_log(self):
""""""
self.log_monitor.setRowCount(0)
def show(self):
""""""
self.showMaximized()
class StrategyManager(QtWidgets.QFrame):
"""
Manager for a strategy
"""
def __init__(
self, cta_manager: CtaManager, cta_engine: CtaEngine, data: dict
):
""""""
super(StrategyManager, self).__init__()
self.cta_manager = cta_manager
self.cta_engine = cta_engine
self.strategy_name = data["strategy_name"]
self._data = data
self.init_ui()
def init_ui(self):
""""""
self.setFixedHeight(300)
self.setFrameShape(self.Box)
self.setLineWidth(1)
init_button = QtWidgets.QPushButton("初始化")
init_button.clicked.connect(self.init_strategy)
start_button = QtWidgets.QPushButton("启动")
start_button.clicked.connect(self.start_strategy)
stop_button = QtWidgets.QPushButton("停止")
stop_button.clicked.connect(self.stop_strategy)
edit_button = QtWidgets.QPushButton("编辑")
edit_button.clicked.connect(self.edit_strategy)
remove_button = QtWidgets.QPushButton("移除")
remove_button.clicked.connect(self.remove_strategy)
reload_button = QtWidgets.QPushButton("重载")
reload_button.clicked.connect(self.reload_strategy)
save_button = QtWidgets.QPushButton("保存")
save_button.clicked.connect(self.save_strategy)
strategy_name = self._data["strategy_name"]
vt_symbol = self._data["vt_symbol"]
class_name = self._data["class_name"]
author = self._data["author"]
label_text = (
f"{strategy_name} - {vt_symbol} ({class_name} by {author})"
)
label = QtWidgets.QLabel(label_text)
label.setAlignment(QtCore.Qt.AlignCenter)
self.parameters_monitor = DataMonitor(self._data["parameters"])
self.variables_monitor = DataMonitor(self._data["variables"])
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(init_button)
hbox.addWidget(start_button)
hbox.addWidget(stop_button)
hbox.addWidget(edit_button)
hbox.addWidget(remove_button)
hbox.addWidget(reload_button)
hbox.addWidget(save_button)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(label)
vbox.addLayout(hbox)
vbox.addWidget(self.parameters_monitor)
vbox.addWidget(self.variables_monitor)
self.setLayout(vbox)
def update_data(self, data: dict):
""""""
self._data = data
self.parameters_monitor.update_data(data["parameters"])
self.variables_monitor.update_data(data["variables"])
def init_strategy(self):
""""""
self.cta_engine.init_strategy(self.strategy_name)
def start_strategy(self):
""""""
self.cta_engine.start_strategy(self.strategy_name)
def stop_strategy(self):
""""""
self.cta_engine.stop_strategy(self.strategy_name)
def edit_strategy(self):
""""""
strategy_name = self._data["strategy_name"]
parameters = self.cta_engine.get_strategy_parameters(strategy_name)
editor = SettingEditor(parameters, strategy_name=strategy_name)
n = editor.exec_()
if n == editor.Accepted:
setting = editor.get_setting()
self.cta_engine.edit_strategy(strategy_name, setting)
def remove_strategy(self):
""""""
result = self.cta_engine.remove_strategy(self.strategy_name)
# Only remove strategy gui manager if it has been removed from engine
if result:
self.cta_manager.remove_strategy(self.strategy_name)
def reload_strategy(self):
"""重新加载策略"""
self.cta_engine.reload_strategy(self.strategy_name)
def save_strategy(self):
self.cta_engine.save_strategy_data(self.strategy_name)
class DataMonitor(QtWidgets.QTableWidget):
"""
Table monitor for parameters and variables.
"""
def __init__(self, data: dict):
""""""
super(DataMonitor, self).__init__()
self._data = data
self.cells = {}
self.init_ui()
def init_ui(self):
""""""
labels = list(self._data.keys())
self.setColumnCount(len(labels))
self.setHorizontalHeaderLabels(labels)
self.setRowCount(1)
self.verticalHeader().setSectionResizeMode(
QtWidgets.QHeaderView.Stretch
)
self.verticalHeader().setVisible(False)
self.setEditTriggers(self.NoEditTriggers)
for column, name in enumerate(self._data.keys()):
value = self._data[name]
cell = QtWidgets.QTableWidgetItem(str(value))
cell.setTextAlignment(QtCore.Qt.AlignCenter)
self.setItem(0, column, cell)
self.cells[name] = cell
def update_data(self, data: dict):
""""""
for name, value in data.items():
cell = self.cells[name]
cell.setText(str(value))
class StopOrderMonitor(BaseMonitor):
"""
Monitor for local stop order.
"""
event_type = EVENT_CTA_STOPORDER
data_key = "stop_orderid"
sorting = True
headers = {
"stop_orderid": {
"display": "停止委托号",
"cell": BaseCell,
"update": False,
},
"vt_orderids": {"display": "限价委托号", "cell": BaseCell, "update": True},
"vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False},
"direction": {"display": "方向", "cell": EnumCell, "update": False},
"offset": {"display": "开平", "cell": EnumCell, "update": False},
"price": {"display": "价格", "cell": BaseCell, "update": False},
"volume": {"display": "数量", "cell": BaseCell, "update": False},
"status": {"display": "状态", "cell": EnumCell, "update": True},
"lock": {"display": "锁仓", "cell": BaseCell, "update": False},
"strategy_name": {"display": "策略名", "cell": BaseCell, "update": False},
}
class LogMonitor(BaseMonitor):
"""
Monitor for log data.
"""
event_type = EVENT_CTA_LOG
data_key = ""
sorting = False
headers = {
"time": {"display": "时间", "cell": TimeCell, "update": False},
"msg": {"display": "信息", "cell": MsgCell, "update": False},
}
def init_ui(self):
"""
Stretch last column.
"""
super(LogMonitor, self).init_ui()
self.horizontalHeader().setSectionResizeMode(
1, QtWidgets.QHeaderView.Stretch
)
def insert_new_row(self, data):
"""
Insert a new row at the top of table.
"""
super(LogMonitor, self).insert_new_row(data)
self.resizeRowToContents(0)
class SettingEditor(QtWidgets.QDialog):
"""
For creating new strategy and editing strategy parameters.
"""
def __init__(
self, parameters: dict, strategy_name: str = "", class_name: str = ""
):
""""""
super(SettingEditor, self).__init__()
self.parameters = parameters
self.strategy_name = strategy_name
self.class_name = class_name
self.edits = {}
self.init_ui()
def init_ui(self):
""""""
form = QtWidgets.QFormLayout()
# Add vt_symbol and name edit if add new strategy
if self.class_name:
self.setWindowTitle(f"添加策略:{self.class_name}")
button_text = "添加"
parameters = {"strategy_name": "", "vt_symbol": "", "auto_init": True, "auto_start": True}
parameters.update(self.parameters)
else:
self.setWindowTitle(f"参数编辑:{self.strategy_name}")
button_text = "确定"
parameters = self.parameters
for name, value in parameters.items():
type_ = type(value)
edit = QtWidgets.QLineEdit(str(value))
if type_ is int:
validator = QtGui.QIntValidator()
edit.setValidator(validator)
elif type_ is float:
validator = QtGui.QDoubleValidator()
edit.setValidator(validator)
form.addRow(f"{name} {type_}", edit)
self.edits[name] = (edit, type_)
button = QtWidgets.QPushButton(button_text)
button.clicked.connect(self.accept)
form.addRow(button)
self.setLayout(form)
def get_setting(self):
""""""
setting = {}
if self.class_name:
setting["class_name"] = self.class_name
for name, tp in self.edits.items():
edit, type_ = tp
value_text = edit.text()
if type_ == bool:
if value_text == "True":
value = True
else:
value = False
else:
value = type_(value_text)
setting[name] = value
return setting

View File

@ -30,7 +30,7 @@ from .base import (
)
from .template import CtaTemplate
from .cta_fund_kline import FundKline
from vnpy.component.cta_fund_kline import FundKline
from vnpy.trader.object import (
BarData,

View File

@ -1,14 +1,18 @@
# 币安合约数据
import os
import json
from typing import Dict, List, Any
from datetime import datetime, timedelta
from vnpy.api.rest.rest_client import RestClient
from vnpy.trader.object import (
Interval,
Exchange,
Product,
BarData,
HistoryRequest
)
from vnpy.trader.utility import save_json, load_json
BINANCE_INTERVALS = ["1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "3d", "1w", "1M"]
@ -26,6 +30,7 @@ TIMEDELTA_MAP: Dict[Interval, timedelta] = {
REST_HOST: str = "https://fapi.binance.com"
class BinanceFutureData(RestClient):
def __init__(self, parent=None):
@ -51,9 +56,9 @@ class BinanceFutureData(RestClient):
return b_interval
def get_bars(self,
req: HistoryRequest,
return_dict=True,
) -> List[Any]:
req: HistoryRequest,
return_dict=True,
) -> List[Any]:
"""获取历史kline"""
bars = []
limit = 1000
@ -108,10 +113,10 @@ class BinanceFutureData(RestClient):
"vt_symbol": f'{req.symbol}.{req.exchange.value}',
"interval": req.interval.value,
"volume": float(data[5]),
"open_price": float(data[1]),
"high_price": float(data[2]),
"low_price": float(data[3]),
"close_price": float(data[4]),
"open": float(data[1]),
"high": float(data[2]),
"low": float(data[3]),
"close": float(data[4]),
"gateway_name": "",
"open_interest": 0,
"trading_day": dt.strftime('%Y-%m-%d')
@ -159,3 +164,66 @@ class BinanceFutureData(RestClient):
df.index.name = 'datetime'
df.to_csv(file_name, index=True)
self.write_log('保存成功')
def get_contracts(self):
contracts = {}
# Get response from server
resp = self.request(
"GET",
"/fapi/v1/exchangeInfo",
data={}
)
if resp.status_code // 100 != 2:
msg = f"获取交易所失败,状态码:{resp.status_code},信息:{resp.text}"
self.write_log(msg)
else:
data = resp.json()
for d in data["symbols"]:
self.write_log(json.dumps(d, indent=2))
base_currency = d["baseAsset"]
quote_currency = d["quoteAsset"]
name = f"{base_currency.upper()}/{quote_currency.upper()}"
pricetick = 1
min_volume = 1
for f in d["filters"]:
if f["filterType"] == "PRICE_FILTER":
pricetick = float(f["tickSize"])
elif f["filterType"] == "LOT_SIZE":
min_volume = float(f["stepSize"])
contract = {
"symbol": d["symbol"],
"exchange": Exchange.BINANCE.value,
"vt_symbol": d["symbol"] + '.' + Exchange.BINANCE.value,
"name": name,
"price_tick": pricetick,
"symbol_size": 20,
"margin_rate" : round(float(d['requiredMarginPercent']) / 100,5),
"min_volume": min_volume,
"product": Product.FUTURES.value,
"commission_rate": 0.005
}
contracts.update({contract.get('vt_symbol'): contract})
return contracts
@classmethod
def load_contracts(self):
"""读取本地配置文件获取期货合约配置"""
f = os.path.abspath(os.path.join(os.path.dirname(__file__), 'future_contracts.json'))
contracts = load_json(f, auto_save=False)
return contracts
def save_contracts(self):
"""保存合约配置"""
contracts = self.get_contracts()
if len(contracts) > 0:
f = os.path.abspath(os.path.join(os.path.dirname(__file__), 'future_contracts.json'))
save_json(f, contracts)
self.write_log(f'保存合约配置=>{f}')

View File

@ -189,6 +189,8 @@ class BinancefRestApi(RestClient):
self.recv_window: int = 5000
self.time_offset: int = 0
self.contracts = {}
self.order_count: int = 1_000_000
self.order_count_lock: Lock = Lock()
self.connect_time: int = 0
@ -481,6 +483,14 @@ class BinancefRestApi(RestClient):
if account.balance:
self.gateway.on_account(account)
# 临时缓存合约的配置信息
for position in data["positions"]:
symbol = position.get('symbol')
if symbol:
if symbol not in self.contracts:
self.gateway.write_log(json.dumps(position, indent=2))
self.contracts.update({symbol: position})
self.gateway.write_log("账户资金查询成功")
def on_query_position(self, data: dict, request: Request) -> None:
@ -606,10 +616,11 @@ class BinancefRestApi(RestClient):
self.gateway.write_log(f'速率限制:{rate_limits}')
for d in data["symbols"]:
self.gateway.write_log(json.dumps(d, indent=2))
base_currency = d["baseAsset"]
quote_currency = d["quoteAsset"]
name = f"{base_currency.upper()}/{quote_currency.upper()}"
symbol = d["symbol"]
pricetick = 1
min_volume = 1
@ -619,12 +630,19 @@ class BinancefRestApi(RestClient):
elif f["filterType"] == "LOT_SIZE":
min_volume = float(f["stepSize"])
# 合约乘数
symbol_size = 20 # 缺省为20倍的杠杆
contract_info = self.contracts.get(symbol, {})
if contract_info:
symbol_size = int(contract_info.get('leverage', symbol_size))
contract = ContractData(
symbol=d["symbol"],
symbol=symbol,
exchange=Exchange.BINANCE,
name=name,
pricetick=pricetick,
size=1,
size=symbol_size,
margin_rate= round(float(d['requiredMarginPercent'])/100, 5),
min_volume=min_volume,
product=Product.FUTURES,
history_data=True,