From 2dcbcab0a6f6aa456e7fe30e1d85efb1c124c02c Mon Sep 17 00:00:00 2001 From: msincenselee Date: Sat, 19 Mar 2022 23:19:22 +0800 Subject: [PATCH] =?UTF-8?q?[update]=20=E4=B8=80=E8=88=AC=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/amqp/base.py | 34 +- vnpy/app/account_recorder/README.md | 10 - vnpy/app/account_recorder/engine.py | 6 +- vnpy/app/cta_strategy_pro/engine.py | 159 ++- vnpy/app/cta_strategy_pro/template.py | 31 +- vnpy/app/cta_strategy_pro/template_spread.py | 1030 ++++++++++++++++-- vnpy/app/index_tick_publisher/engine.py | 3 +- vnpy/data/binance/binance_future_data.py | 1 + vnpy/data/mongo/mongo_data.py | 6 +- vnpy/data/tdx/tdx_stock_data.py | 23 +- vnpy/gateway/ctp/ctp_gateway.py | 70 +- vnpy/trader/converter.py | 5 +- vnpy/trader/engine.py | 2 +- vnpy/trader/gateway.py | 10 + vnpy/trader/ui/kline/kline.py | 5 +- vnpy/trader/ui/widget.py | 99 +- vnpy/trader/utility.py | 10 + 17 files changed, 1352 insertions(+), 152 deletions(-) diff --git a/vnpy/amqp/base.py b/vnpy/amqp/base.py index c3797ac5..19d051e9 100644 --- a/vnpy/amqp/base.py +++ b/vnpy/amqp/base.py @@ -1,18 +1,20 @@ # encoding: UTF-8 import pika - +import sys class base_broker(): def __init__(self, host='localhost', port=5672, user='guest', password='guest', channel_number=1): """ - :param host: 连接rabbitmq的服务器地址(或者群集地址) + :param host: 连接rabbitmq的服务器地址(或者群集地址),或者多台主机地址,使用;分隔开 :param port: 端口 :param user: 用户名 :param password: 密码 :param channel_number: 频道的数字(大于1) """ + self.host = host + self.port = port self.user = user self.password = password @@ -21,16 +23,26 @@ class base_broker(): # 身份鉴权 self.credentials = pika.PlainCredentials(self.user, self.password, erase_on_connect=True) + if ';' in self.host: + hosts = self.host.split(';') + else: + hosts = [self.host] - # 创建连接 - self.connection = pika.BlockingConnection( - pika.ConnectionParameters(host=self.host, port=self.port, - credentials=self.credentials, - heartbeat=0, socket_timeout=5)) - - # 创建一个频道,或者指定频段数字编号 - self.channel = self.connection.channel( - channel_number=self.channel_number) + # 多个连接服务器时,使用 + for _host_ in hosts: + try: + # 创建连接 + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=_host_, port=self.port, + credentials=self.credentials, + heartbeat=0, socket_timeout=5)) + # 创建一个频道,或者指定频段数字编号 + self.channel = self.connection.channel( + channel_number=self.channel_number) + except: + print(f'pika rabbit connect to {_host_} {self.port} fail', file=sys.stderr) + else: + break def reconnect(self): """ diff --git a/vnpy/app/account_recorder/README.md b/vnpy/app/account_recorder/README.md index ff21b14d..ef5254a2 100644 --- a/vnpy/app/account_recorder/README.md +++ b/vnpy/app/account_recorder/README.md @@ -31,13 +31,3 @@ } } } - -# 创建mongodb 索引,提高性能 - -db.today_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'accountid_vtsymbol_sysorderid_order_date_holder_id','unique':true}) -db.history_orders.createIndex({'account_id':1,'vt_symbol':1,'sys_orderid':1,'order_date':1,'holder_id':1},{'name':'history_accountid_vtsymbol_sysorderid_order_date_holder_id'}) -db.today_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id','unique':true}) -db.history_trades.createIndex({'account_id':1,'vt_symbol':1,'vt_tradeid':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtSymbol_vt_tradeid_trade_date_holder_id'}) -db.today_positions.createIndex({'account_id':1,'vt_symbol':1,'direction':1,'trade_date':1,'holder_id':1},{'name':'accountid_vtsymbol_direction_trade_date_holder_id'}) -db.today_strategy_pos.createIndex({'account_id':1,'strategy_group':1,'strategy_name':1,'date':1},{'name':'accountid_strategy_group_strategy_name_date'}) -db.strategy_snapshot.createIndex({'account_id':1,'strategy_group':1,'strategy':1,'guid':1,'datetime':1},{'name':'accountid_strategy_name_guid'}) diff --git a/vnpy/app/account_recorder/engine.py b/vnpy/app/account_recorder/engine.py index b77cf5e4..d47619a7 100644 --- a/vnpy/app/account_recorder/engine.py +++ b/vnpy/app/account_recorder/engine.py @@ -42,7 +42,7 @@ from vnpy.trader.event import ( ) from vnpy.trader.constant import Direction, Exchange, Status from vnpy.trader.engine import BaseEngine, MainEngine -from vnpy.trader.utility import get_trading_date, load_json, save_json +from vnpy.trader.utility import get_trading_date, load_json, save_json,print_dict from vnpy.data.mongo.mongo_data import MongoData # 入库 @@ -487,7 +487,7 @@ class AccountRecorder(BaseEngine): price = self.main_engine.get_price(pos.vt_symbol) if price: data.update({'cur_price': price}) - + # self.write_log('update position:{}'.format(print_dict(data))) self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_POSITION_COL, fld=fld, data=data) def update_strategy_snapshot(self, event: Event): @@ -570,7 +570,7 @@ class AccountRecorder(BaseEngine): pos_data = copy.copy(data) pos_data.update({'account_id': pos_data.get('accountid')}) pos_data.update({'datetime': dt.strftime("%Y-%m-%d %H:%M:%S")}) - + self.write_log(f'stratgy_pos event:{print_dict(pos_data)}') self.update_data(db_name=ACCOUNT_DB_NAME, col_name=TODAY_STRATEGY_POS_COL, fld=fld, data=pos_data) def process_gw_error(self, event: Event): diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py index ee6e2d98..c8734e9c 100644 --- a/vnpy/app/cta_strategy_pro/engine.py +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -63,6 +63,8 @@ from vnpy.trader.utility import ( from vnpy.trader.util_logger import setup_logger, logging from vnpy.trader.util_wechat import send_wx_msg +from vnpy.data.mongo.mongo_data import MongoData +from vnpy.trader.setting import SETTINGS from vnpy.trader.converter import OffsetConverter from .base import ( @@ -120,6 +122,9 @@ class CtaEngine(BaseEngine): # "trade_2_wx": true # 是否交易记录转发至微信通知 # "event_log: false # 是否转发日志到event bus,显示在图形界面 # "snapshot2file": false # 是否保存切片到文件 + # "compare_pos": false # False,强制不进行 账号 <=> 引擎实例 得仓位比对。(一般分布式RPC运行时,其他得实例都不进行比对) + # "get_pos_from_db": false # True,使用数据库得 策略<=>pos 数据作为比较(一般分布式RPC运行时,其中一个使用即可); False,使用当前引擎实例得 策略.pos进行比对 + self.engine_config = {} # 是否激活 write_log写入event bus(比较耗资源) self.event_log = False @@ -171,6 +176,26 @@ class CtaEngine(BaseEngine): self.write_log("CTA策略引擎初始化成功") + if self.engine_config.get('get_pos_from_db', False): + self.write_log(f'激活数据库策略仓位比对模式') + self.init_mongo_data() + + def init_mongo_data(self): + """初始化hams数据库""" + host = SETTINGS.get('hams.host', 'localhost') + port = SETTINGS.get('hams.port', 27017) + self.write_log(f'初始化hams数据库连接:{host}:{port}') + try: + # Mongo数据连接客户端 + self.mongo_data = MongoData(host=host, port=port) + + if self.mongo_data and self.mongo_data.db_has_connected: + self.write_log(f'连接成功') + else: + self.write_error(f'HAMS数据库{host}:{port}连接异常.') + except Exception as ex: + self.write_error(f'HAMS数据库{host}:{port}连接异常.{str(ex)}') + def close(self): """停止所属有的策略""" self.stop_all_strategies() @@ -1713,7 +1738,8 @@ class CtaEngine(BaseEngine): pos_list.append(leg1_pos) pos_list.append(leg2_pos) - + else: + pos_list.append(pos) except Exception as ex: self.write_error(f'分解SPD失败:{str(ex)}') @@ -1749,6 +1775,33 @@ class CtaEngine(BaseEngine): return strategy_pos_list + def get_all_strategy_pos_from_hams(self): + """ + 获取hams中该账号下所有策略仓位明细 + """ + strategy_pos_dict = {} + if not self.mongo_data: + self.init_mongo_data() + + if self.mongo_data and self.mongo_data.db_has_connected: + filter = {'account_id': self.engine_config.get('accountid', '-')} + + pos_list = self.mongo_data.db_query( + db_name='Account', + col_name='today_strategy_pos', + filter_dict=filter + ) + for pos in pos_list: + s_name = pos.get('strategy_name', None) + if s_name: + if s_name not in strategy_pos_dict: + strategy_pos_dict[s_name] = pos + continue + if pos.get('datetime', '') > strategy_pos_dict[s_name].get('datetime', ''): + strategy_pos_dict[s_name] = pos + + return list(strategy_pos_dict.values()) + def get_strategy_class_parameters(self, class_name: str): """ Get default parameters of a strategy class. @@ -1810,9 +1863,14 @@ class CtaEngine(BaseEngine): self.write_log(u'开始对比账号&策略的持仓') - # 获取当前策略得持仓 - if len(strategy_pos_list) == 0: - strategy_pos_list = self.get_all_strategy_pos() + # 获取hams数据库中所有运行实例得策略 + if self.engine_config.get("get_pos_from_db", False): + strategy_pos_list = self.get_all_strategy_pos_from_hams() + else: + # 获取当前实例运行策略得持仓 + if len(strategy_pos_list) == 0: + strategy_pos_list = self.get_all_strategy_pos() + self.write_log(u'策略持仓清单:{}'.format(strategy_pos_list)) none_strategy_pos = self.get_none_strategy_pos_list() @@ -1888,53 +1946,44 @@ class CtaEngine(BaseEngine): pos_compare_result = '' # 精简输出 compare_info = '' - diff_pos_dict = {} + diff_pos_dict = {} # 短合约: {'long_diff':xx, 'short_diff',xx) + for vt_symbol in sorted(vt_symbols): # 发送不一致得结果 symbol_pos = compare_pos.pop(vt_symbol, {}) + # + # # 股指期货: 帐号多/空轧差, vs 策略多空轧差 是否一致; + # # 其他期货:帐号多单 vs 除了多单, 空单 vs 空单 + # if vt_symbol.endswith(".CFFEX"): + # diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == ( + # symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0)) + # pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \ + # symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0) + # match = diff_match + # # 轧差一致,帐号/策略持仓不一致 + # if diff_match and not pos_match: + # if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0): + # self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format( + # vt_symbol, + # symbol_pos.get('账号多单', 0), + # symbol_pos.get('账号空单', 0), + # symbol_pos.get('策略多单', 0), + # symbol_pos.get('策略空单', 0) + # )) + # diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0), + # "short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单', + # 0)}}) + # else: + match = round(symbol_pos.get('账号空单', 0), 7) == round(symbol_pos.get('策略空单', 0), 7) and \ + round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7) - d_long = { - 'account_id': self.engine_config.get('accountid', '-'), - 'vt_symbol': vt_symbol, - 'direction': Direction.LONG.value, - 'strategy_list': symbol_pos.get('多单策略', [])} - - d_short = { - 'account_id': self.engine_config.get('accountid', '-'), - 'vt_symbol': vt_symbol, - 'direction': Direction.SHORT.value, - 'strategy_list': symbol_pos.get('空单策略', [])} - - # 股指期货: 帐号多/空轧差, vs 策略多空轧差 是否一致; - # 其他期货:帐号多单 vs 除了多单, 空单 vs 空单 - if vt_symbol.endswith(".CFFEX"): - diff_match = (symbol_pos.get('账号多单', 0) - symbol_pos.get('账号空单', 0)) == ( - symbol_pos.get('策略多单', 0) - symbol_pos.get('策略空单', 0)) - pos_match = symbol_pos.get('账号空单', 0) == symbol_pos.get('策略空单', 0) and \ - symbol_pos.get('账号多单', 0) == symbol_pos.get('策略多单', 0) - match = diff_match - # 轧差一致,帐号/策略持仓不一致 - if diff_match and not pos_match: - if symbol_pos.get('账号多单', 0) > symbol_pos.get('策略多单', 0): - self.write_log('{}轧差持仓:多:{},空:{} 大于 策略持仓 多:{},空:{}'.format( - vt_symbol, - symbol_pos.get('账号多单', 0), - symbol_pos.get('账号空单', 0), - symbol_pos.get('策略多单', 0), - symbol_pos.get('策略空单', 0) - )) - diff_pos_dict.update({vt_symbol: {"long": symbol_pos.get('账号多单', 0) - symbol_pos.get('策略多单', 0), - "short": symbol_pos.get('账号空单', 0) - symbol_pos.get('策略空单', - 0)}}) - else: - match = round(symbol_pos.get('账号空单', 0), 7) == round(symbol_pos.get('策略空单', 0), 7) and \ - round(symbol_pos.get('账号多单', 0), 7) == round(symbol_pos.get('策略多单', 0), 7) # 多空都一致 if match: msg = u'{}多空都一致.{}\n'.format(vt_symbol, json.dumps(symbol_pos, indent=2, ensure_ascii=False)) self.write_log(msg) compare_info += msg else: + # 多空不一致 pos_compare_result += '\n{}: '.format(vt_symbol) # 判断是多单不一致? diff_long_volume = round(symbol_pos.get('账号多单', 0), 7) - round(symbol_pos.get('策略多单', 0), 7) @@ -1946,8 +1995,16 @@ class CtaEngine(BaseEngine): symbol_pos.get('策略多单')) pos_compare_result += msg - self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) + self.write_log(u'{}不一致:{}'.format(vt_symbol, msg)) compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) + + # 登记短合约得差别 + underlying_symbol = get_underlying_symbol(vt_symbol.split('.')[0]) + diff_pos = diff_pos_dict.get(underlying_symbol, {}) + diff_pos.update({'多单': diff_long_volume + diff_pos.get('多单', 0)}) + diff_pos_dict[underlying_symbol] = diff_pos + + # 自动平衡 if auto_balance: self.balance_pos(vt_symbol, Direction.LONG, diff_long_volume) @@ -1961,13 +2018,24 @@ class CtaEngine(BaseEngine): symbol_pos.get('空单策略'), symbol_pos.get('策略空单')) pos_compare_result += msg - self.write_error(u'{}不一致:{}'.format(vt_symbol, msg)) + self.write_log(u'{}不一致:{}'.format(vt_symbol, msg)) compare_info += u'{}不一致:{}\n'.format(vt_symbol, msg) + + # 登记短合约得差别 + underlying_symbol = get_underlying_symbol(vt_symbol.split('.')[0]) + diff_pos = diff_pos_dict.get(underlying_symbol, {}) + diff_pos.update({'空单': diff_short_volume + diff_pos.get('空单', 0)}) + diff_pos_dict[underlying_symbol] = diff_pos + + # 自动平衡仓位 if auto_balance: self.balance_pos(vt_symbol, Direction.SHORT, diff_short_volume) + # 统计所有轧差偏差得 + diff_underlying = sum([d.get('多单', 0) - d.get('空单', 0) for d in list(diff_pos_dict.values())]) + # 不匹配,输入到stdErr通道 - if pos_compare_result != '': + if pos_compare_result != '' and diff_underlying != 0: msg = u'账户{}持仓不匹配: {}' \ .format(self.engine_config.get('accountid', '-'), pos_compare_result) @@ -1982,9 +2050,6 @@ class CtaEngine(BaseEngine): return True, compare_info + ret_msg else: self.write_log(u'账户持仓与策略一致') - if len(diff_pos_dict) > 0: - for k, v in diff_pos_dict.items(): - self.write_log(f'{k} 存在大于策略的轧差持仓:{v}') return True, compare_info def balance_pos(self, vt_symbol, direction, volume): diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py index c2a38490..499be6c8 100644 --- a/vnpy/app/cta_strategy_pro/template.py +++ b/vnpy/app/cta_strategy_pro/template.py @@ -13,7 +13,7 @@ from logging import INFO, ERROR from datetime import datetime from vnpy.trader.constant import Interval, Direction, Offset, Status, OrderType, Color, Exchange from vnpy.trader.object import BarData, TickData, OrderData, TradeData -from vnpy.trader.utility import virtual, append_data, extract_vt_symbol, get_underlying_symbol +from vnpy.trader.utility import virtual, append_data, extract_vt_symbol, get_underlying_symbol,print_dict from .base import StopOrder, EngineType from vnpy.component.cta_grid_trade import CtaGrid, CtaGridTrade, LOCK_GRID @@ -75,7 +75,7 @@ class CtaTemplate(ABC): """ class_parameters = {} for name in cls.parameters: - class_parameters[name] = getattr(cls, name) + class_parameters[name] = getattr(cls, name,"") return class_parameters def get_parameters(self): @@ -84,7 +84,7 @@ class CtaTemplate(ABC): """ strategy_parameters = {} for name in self.parameters: - strategy_parameters[name] = getattr(self, name) + strategy_parameters[name] = getattr(self, name, "") return strategy_parameters def get_variables(self): @@ -187,6 +187,31 @@ class CtaTemplate(ABC): """ pass + def exist_order(self, vt_symbol, direction, offset): + """ + 是否存在相同得委托 + :param vt_symbol: + :param direction: + :param offset: + :return: + """ + if len(self.active_orders) == 0: + self.write_log(f'当前活动订单数量为零。查询条件:{vt_symbol},方向:{direction.value}, 开平:{offset.value}') + return False + + for orderid, order in self.active_orders.items(): + # self.write_log(f'当前活动订单:\n{print_dict(order)}') + if offset != Offset.OPEN: # 平昨、平今、平仓 + offset_cond = order['offset'] != Offset.OPEN + else: # 开仓 + offset_cond = order['offset'] == offset + + if order['vt_symbol'] == vt_symbol and order['direction'] == direction and offset_cond: + self.write_log(f'存在相同活动订单。查询条件:{vt_symbol},方向:{direction.value}, 开平:{offset.value}') + return True + + return False + def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False, vt_symbol: str = '', order_type: OrderType = OrderType.LIMIT, order_time: datetime = None, grid: CtaGrid = None): diff --git a/vnpy/app/cta_strategy_pro/template_spread.py b/vnpy/app/cta_strategy_pro/template_spread.py index 76b292c9..575a32aa 100644 --- a/vnpy/app/cta_strategy_pro/template_spread.py +++ b/vnpy/app/cta_strategy_pro/template_spread.py @@ -7,7 +7,7 @@ from copy import copy import bz2 import pickle import zlib -from vnpy.trader.utility import append_data, extract_vt_symbol +from vnpy.trader.utility import append_data, extract_vt_symbol, get_months_diff from .template import ( CtaPosition, CtaGridTrade, @@ -17,6 +17,7 @@ from .template import ( datetime, Offset, Exchange, + TickData, OrderType, OrderData, TradeData, @@ -28,8 +29,9 @@ from .template import ( class CtaSpreadTemplate(CtaTemplate): """CTA套利模板""" - activate_fak = False - order_type = OrderType.LIMIT + activate_fak = False # 是否使用FAK得下单、追单方式 + order_type = OrderType.LIMIT # 缺省下单方式,是使用限价单 + activate_lock = False # 对某些日内平今手续费较高得合约,采用锁仓方式 act_vt_symbol = "" # 主动腿合约 pas_vt_symbol = "" # 被动腿合约 act_symbol = "" @@ -45,7 +47,6 @@ class CtaSpreadTemplate(CtaTemplate): force_trading_close = False # 强制平仓 history_orders = {} - # 逻辑过程日志 dist_fieldnames = ['datetime', 'symbol', 'volume', 'price', 'operation', 'signal', 'stop_price', 'target_price', @@ -55,7 +56,12 @@ class CtaSpreadTemplate(CtaTemplate): """""" super().__init__(cta_engine, strategy_name, vt_symbol, setting) - self.parameters.append('activate_fak') + if 'activate_fak' not in self.parameters: + self.parameters.append('activate_fak') + if 'activate_lock' not in self.parameters: + self.parameters.append('activate_lock') + if 'cancel_seconds' not in self.parameters: + self.parameters.append('cancel_seconds') # 基础组件 self.position = CtaPosition(strategy=self) @@ -84,6 +90,9 @@ class CtaSpreadTemplate(CtaTemplate): self.act_margin_rate = None self.pas_margin_rate = None + self.diff_months = 0 # 主动腿和被动腿相隔月数量 + + self.spd_pos = None # 套利合约的holding pos self.act_pos = None # 主动合约得holding pos self.pas_pos = None # 被动合约得holding pos @@ -107,6 +116,9 @@ class CtaSpreadTemplate(CtaTemplate): self.act_margin_rate = self.cta_engine.get_margin_rate(self.act_vt_symbol) self.pas_margin_rate = self.cta_engine.get_margin_rate(self.pas_vt_symbol) + # 计算主动腿与被动腿得相隔月 + self.diff_months = get_months_diff(self.act_symbol, self.pas_symbol) + # 实盘采用FAK if not self.backtesting and self.activate_fak: self.order_type = OrderType.FAK @@ -378,7 +390,6 @@ class CtaSpreadTemplate(CtaTemplate): self.write_log(u'当前持仓:{}'.format(pos_list)) return pos_list - def on_start(self): """启动策略(必须由用户继承实现)""" # 订阅主动腿/被动腿合约 @@ -588,7 +599,7 @@ class CtaSpreadTemplate(CtaTemplate): # 更新开仓均价/数量 if trade.vt_symbol == self.act_vt_symbol: opened_price = grid.snapshot.get('act_open_price', 0) - opened_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) + opened_volume = grid.snapshot.get('act_open_volume') act_open_volume = opened_volume + trade.volume act_open_price = (opened_price * opened_volume + trade.price * trade.volume) / act_open_volume @@ -600,7 +611,7 @@ class CtaSpreadTemplate(CtaTemplate): elif trade.vt_symbol == self.pas_vt_symbol: opened_price = grid.snapshot.get('pas_open_price', 0) - opened_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) + opened_volume = grid.snapshot.get('pas_open_volume', 0) pas_open_volume = opened_volume + trade.volume pas_open_price = (opened_price * opened_volume + trade.price * trade.volume) / pas_open_volume @@ -614,27 +625,38 @@ class CtaSpreadTemplate(CtaTemplate): # 更新平仓均价/数量 if trade.vt_symbol == self.act_vt_symbol: closed_price = grid.snapshot.get('act_close_price', 0) - closed_volume = grid.snapshot.get('act_close_volume', grid.volume * self.act_vol_ratio) + closed_volume = grid.snapshot.get('act_close_volume',0) + opened_volume = grid.snapshot.get('act_open_volume') + act_close_volume = closed_volume + trade.volume + opened_volume = opened_volume - trade.volume + if opened_volume < 0: + debug =1 act_close_price = (closed_price * closed_volume + trade.price * trade.volume) / act_close_volume self.write_log(f'{trade.vt_symbol} 平仓均价{closed_price} => {act_close_price},' f' 平仓手数:{closed_volume}=>{act_close_volume}') grid.snapshot.update({'act_close_price': act_close_price, 'act_close_volume': act_close_volume, - 'act_vt_symbol': self.act_vt_symbol}) + 'act_vt_symbol': self.act_vt_symbol, + 'act_open_volume':opened_volume}) elif trade.vt_symbol == self.pas_vt_symbol: closed_price = grid.snapshot.get('pas_close_price', 0) - closed_volume = grid.snapshot.get('pas_close_volume', grid.volume * self.pas_vol_ratio) - pas_open_volume = closed_volume + trade.volume - pas_open_price = (closed_price * closed_volume + trade.price * trade.volume) / pas_open_volume + closed_volume = grid.snapshot.get('pas_close_volume', 0) + opened_volume = grid.snapshot.get('pas_open_volume') + pas_closed_volume = closed_volume + trade.volume + opened_volume = opened_volume - trade.volume + if opened_volume < 0: + debug = 1 + pas_open_price = (closed_price * closed_volume + trade.price * trade.volume) / pas_closed_volume self.write_log(f'{trade.vt_symbol} 平仓均价{closed_price} => {pas_open_price},' - f' 平仓手数:{closed_volume}=>{pas_open_volume}') + f' 平仓手数:{closed_volume}=>{pas_closed_volume}') grid.snapshot.update({'pas_close_price': pas_open_price, - 'pas_close_volume': pas_open_volume, - 'pas_vt_symbol': self.pas_vt_symbol}) + 'pas_close_volume': pas_closed_volume, + 'pas_vt_symbol': self.pas_vt_symbol, + 'pas_open_volume': opened_volume}) self.gt.save() @@ -662,7 +684,6 @@ class CtaSpreadTemplate(CtaTemplate): # 平仓完毕(cover, sell) if order_info.get("offset", None) != Offset.OPEN: grid.open_status = False - grid.close_status = True self.write_log(f'{grid.direction.value}单已平仓完毕,手数:{grid.volume}, 详细:{grid.snapshot}') @@ -785,9 +806,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.buy(price=buy_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange==Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=OrderType.FAK, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not vt_orderids: self.write_error(u'重新提交{} {}手开多单,价格:{},失败'. @@ -827,9 +848,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.short(price=short_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange==Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=OrderType.FAK, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not vt_orderids: @@ -940,9 +961,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.cover(price=cover_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange==Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=OrderType.FAK, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not vt_orderids: self.write_error(u'重新提交{} {}手平空单{}失败'.format(order_vt_symbol, order_volume, cover_price)) @@ -977,9 +998,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.sell(price=sell_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange==Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=OrderType.FAK, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not vt_orderids: @@ -1033,7 +1054,9 @@ class CtaSpreadTemplate(CtaTemplate): canceled_ids = [] for vt_orderid in list(self.active_orders.keys()): - order_info = self.active_orders[vt_orderid] + order_info = self.active_orders.get(vt_orderid, None) + if order_info is None: + continue order_vt_symbol = order_info.get('vt_symbol', self.vt_symbol) order_symbol, order_exchange = extract_vt_symbol(order_vt_symbol) order_time = order_info['order_time'] @@ -1084,9 +1107,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.short(price=short_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange == Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=order_grid) if len(vt_orderids) > 0: @@ -1103,9 +1126,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.buy(price=buy_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange == Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=order_grid) if len(vt_orderids) > 0: @@ -1123,9 +1146,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.sell(price=sell_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange == Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=order_grid) if len(vt_orderids) > 0: self.write_log(u'委托成功,orderids:{}'.format(vt_orderids)) @@ -1140,9 +1163,9 @@ class CtaSpreadTemplate(CtaTemplate): vt_orderids = self.cover(price=cover_price, volume=order_volume, vt_symbol=order_vt_symbol, - lock=order_exchange == Exchange.CFFEX, + lock=order_exchange == Exchange.CFFEX or self.activate_lock, order_type=order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=order_grid) if len(vt_orderids) > 0: self.write_log(u'委托成功,orderids:{}'.format(vt_orderids)) @@ -1262,11 +1285,11 @@ class CtaSpreadTemplate(CtaTemplate): # 开空主动腿 act_vt_orderids = self.short(vt_symbol=self.act_vt_symbol, - lock=self.act_exchange == Exchange.CFFEX, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_act_tick.bid_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not act_vt_orderids: self.write_error(f'spd_short,{self.act_vt_symbol}开空仓{grid.volume * self.act_vol_ratio}手失败,' @@ -1275,11 +1298,11 @@ class CtaSpreadTemplate(CtaTemplate): # 开多被动腿(FAK或者限价单) pas_vt_orderids = self.buy(vt_symbol=self.pas_vt_symbol, - lock=self.pas_exchange==Exchange.CFFEX, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_pas_tick.ask_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not pas_vt_orderids: self.write_error(f'spd_short,{self.pas_vt_symbol}开多仓{grid.volume * self.pas_vol_ratio}手失败,' @@ -1292,7 +1315,7 @@ class CtaSpreadTemplate(CtaTemplate): grid.snapshot.update({"act_vt_symbol": self.act_vt_symbol, "act_open_volume": 0, "pas_vt_symbol": self.pas_vt_symbol, "pas_open_volume": 0}) grid.order_status = True - grid.order_datetime = self.cur_datetime + grid.order_time = self.cur_datetime vt_orderids = act_vt_orderids + pas_vt_orderids # 不能用act_vt_orderids.extend(pas_vt_orderids),它的返回值为 None,会导致没有vt_orderids self.write_log(u'spd short vt_order_ids:{0}'.format(vt_orderids)) @@ -1333,33 +1356,33 @@ class CtaSpreadTemplate(CtaTemplate): # 开多主动腿(FAK 或者限价单) act_vt_orderids = self.buy(vt_symbol=self.act_vt_symbol, - lock=self.act_exchange==Exchange.CFFEX, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_act_tick.ask_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not act_vt_orderids: - self.write_error(f'spd_short,{self.act_vt_symbol}开多仓{grid.volume * self.act_vol_ratio}手失败,' + self.write_error(f'spd_buy,{self.act_vt_symbol}开多仓{grid.volume * self.act_vol_ratio}手失败,' f'委托价:{self.cur_act_tick.ask_price_1}') return [] # 开空被动腿 pas_vt_orderids = self.short(vt_symbol=self.pas_vt_symbol, - lock=self.pas_exchange==Exchange.CFFEX, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_pas_tick.bid_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not pas_vt_orderids: - self.write_error(f'spd_short,{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,' + self.write_error(f'spd_buy,{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,' f'委托价:{self.cur_pas_tick.bid_price_1}') return [] grid.snapshot.update({"act_vt_symbol": self.act_vt_symbol, "act_open_volume": 0, "pas_vt_symbol": self.pas_vt_symbol, "pas_open_volume": 0}) grid.order_status = True - grid.order_datetime = self.cur_datetime + grid.order_time = self.cur_datetime vt_orderids = act_vt_orderids + pas_vt_orderids self.write_log(u'spd buy vt_ordderids:{}'.format(vt_orderids)) return vt_orderids @@ -1401,22 +1424,23 @@ class CtaSpreadTemplate(CtaTemplate): pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) # 检查账号持仓是否满足平仓目标 - if self.act_pos.long_pos < act_close_volume: - self.write_error(f'账号 {self.act_vt_symbol} 多单持仓{self.act_pos.long_pos}' + if self.act_pos.long_pos < act_close_volume and not (self.act_exchange == Exchange.CFFEX or self.activate_lock): + self.write_error(f'[正套]平仓,账号主动腿 {self.act_vt_symbol} 多单持仓{self.act_pos.long_pos}' f'今仓{self.act_pos.long_td}/昨{self.act_pos.long_yd}, 不满足{act_close_volume}') return [] - if self.pas_pos.short_pos < pas_close_volume: - self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.short_pos}' + if self.pas_pos.short_pos < pas_close_volume and not ( + self.pas_exchange == Exchange.CFFEX or self.activate_lock): + self.write_error(f'[正套]平仓,账号被动腿 {self.pas_vt_symbol} 多单持仓{self.pas_pos.short_pos}' f'今仓{self.pas_pos.short_td}/昨{self.pas_pos.short_yd}, 不满足{act_close_volume}') return [] # 被动腿空单平仓 pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol, - lock=self.pas_exchange == Exchange.CFFEX, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_pas_tick.ask_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not pas_vt_orderids: self.write_error(f'spd_sell,{self.pas_vt_symbol}空单平仓{grid.volume * self.pas_vol_ratio}手失败,' @@ -1425,11 +1449,11 @@ class CtaSpreadTemplate(CtaTemplate): # 主动腿多单平仓 act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol, - lock=self.act_exchange==Exchange.CFFEX, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_act_tick.bid_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not act_vt_orderids: self.write_error(f'spd_sell,{self.act_vt_symbol}多单平仓{grid.volume * self.act_vol_ratio}手失败,' @@ -1437,12 +1461,12 @@ class CtaSpreadTemplate(CtaTemplate): return [] grid.order_status = True - grid.order_datetime = self.cur_datetime + grid.order_time = self.cur_datetime vt_orderids = act_vt_orderids + pas_vt_orderids self.write_log(f'spd sell vt_orderids:{vt_orderids}') return vt_orderids - # ---------------------------------------------------------------------- + def spd_cover(self, grid: CtaGrid, force: bool = False): """非标准合约的套利平反套指令""" self.write_log(u'套利价差反套单平仓,price={},volume={}'.format(grid.close_price, grid.volume)) @@ -1478,22 +1502,23 @@ class CtaSpreadTemplate(CtaTemplate): # 检查主动腿、被动腿,是否满足 act_close_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) - if self.act_pos.short_pos < act_close_volume: + if self.act_pos.short_pos < act_close_volume and not ( + self.act_exchange == Exchange.CFFEX or self.activate_lock): self.write_error(f'账号 {self.act_vt_symbol} 空单持仓{self.act_pos.short_pos}' f'今仓{self.act_pos.short_td}/昨{self.act_pos.short_yd}, 不满足{act_close_volume}') return [] - if self.pas_pos.long_pos < pas_close_volume: + if self.pas_pos.long_pos < pas_close_volume and not (self.pas_exchange == Exchange.CFFEX or self.activate_lock): self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.long_pos}' f'今仓{self.pas_pos.long_td}/昨{self.pas_pos.long_yd}, 不满足{act_close_volume}') return [] # 被动腿多单平仓 pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol, - lock=self.pas_exchange == Exchange.CFFEX, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_pas_tick.bid_price_1, volume=grid.volume * self.pas_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not pas_vt_orderids: self.write_error(f'spd_cover,{self.pas_vt_symbol}多单平仓{grid.volume * self.pas_vol_ratio}手失败,' @@ -1502,11 +1527,11 @@ class CtaSpreadTemplate(CtaTemplate): # 主动腿空单平仓 act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol, - lock=self.act_exchange==Exchange.CFFEX, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, price=self.cur_act_tick.ask_price_1, volume=grid.volume * self.act_vol_ratio, order_type=self.order_type, - order_time=self.cur_datetime, + order_time=self.cur_datetime if self.backtesting else datetime.now(), grid=grid) if not act_vt_orderids: self.write_error(f'spd_cover{self.act_vt_symbol}空单平仓{grid.volume * self.act_vol_ratio}手失败,' @@ -1514,7 +1539,886 @@ class CtaSpreadTemplate(CtaTemplate): return [] grid.order_status = True - grid.order_datetime = self.cur_datetime + grid.order_time = self.cur_datetime vt_orderids = act_vt_orderids + pas_vt_orderids self.write_log(f'spd cover vt_orderids:{vt_orderids}') return vt_orderids + + +class CtaSpreadTemplateV2(CtaSpreadTemplate): + """ + 套利模板 v2 + 改进方向: + 采用限价单方式 + 针对非标准套利合约,分别判断两腿得运动动量,根据方向进行选择优先开仓得一腿 + 设置时间撤单和价格运动偏移撤单逻辑 + + 网格组件开仓时, open_volume =0, target_volume = n, 当open_volume == target_volume时,开仓完成。 + 网格组件平仓时,open_volume = n, target_volume = 0, 当open_volume == 0时,平仓完成 + """ + + tick_window_len = 20 # 观测得tick数量 + spd_orders = {} # 套利得委托单,用于跟踪主动腿和被动腿得成交情况 + + def __init__(self, cta_engine, strategy_name, vt_symbol, setting): + """""" + super().__init__(cta_engine, strategy_name, vt_symbol, setting) + + self.spd_ask1_prices = [] # 价差得若干卖1价列表 + self.spd_bid1_prices = [] # 价差得若干买1价列表 + self.spd_last_prices = [] # 价差得最新价列表 + + self.act_ask1_prices = [] # 主动腿若干卖1价列表 + self.act_bid1_prices = [] # 主动腿若干买1价列表 + self.act_last_prices = [] # 主动腿若干最新价列表 + + self.pas_ask1_prices = [] # 被动腿若干卖1价列表 + self.pas_bid1_prices = [] # 被动腿若干买1价列表 + self.pas_last_prices = [] # 被动腿若干最新价列表 + + def on_tick(self, tick: TickData): + """ + 更新tick价格 + :param tick: + :return: + """ + # 更新主动腿的持续买1、卖1价格、最新价 + if tick.symbol == self.act_symbol: + if len(self.act_ask1_prices) > self.tick_window_len: + self.act_ask1_prices.pop(0) + if tick.ask_price_1 is not None and tick.ask_volume_1 > 0: + self.act_ask1_prices.append(tick.ask_price_1) + + if len(self.act_bid1_prices) > self.tick_window_len: + self.act_bid1_prices.pop(0) + if tick.bid_price_1 is not None and tick.bid_volume_1 > 0: + self.act_bid1_prices.append(tick.bid_price_1) + + if len(self.act_last_prices) > self.tick_window_len: + self.act_last_prices.pop(0) + if tick.last_price is not None and tick.volume > 0: + self.act_last_prices.append(tick.last_price) + + # 更新被动腿的持续买1、卖1价格、最新价 + if tick.symbol == self.pas_symbol: + if len(self.pas_ask1_prices) > self.tick_window_len: + self.pas_ask1_prices.pop(0) + if tick.ask_price_1 is not None and tick.ask_volume_1 > 0: + self.pas_ask1_prices.append(tick.ask_price_1) + + if len(self.pas_bid1_prices) > self.tick_window_len: + self.pas_bid1_prices.pop(0) + if tick.bid_price_1 is not None and tick.bid_volume_1 > 0: + self.pas_bid1_prices.append(tick.bid_price_1) + + if len(self.pas_last_prices) > self.tick_window_len: + self.pas_last_prices.pop(0) + if tick.last_price is not None and tick.volume > 0: + self.pas_last_prices.append(tick.last_price) + + # 实时检查委托订单 + self.check_ordering_grids() + + def on_order_all_traded(self, order: OrderData): + """ + 订单全部成交 + :param order: + :return: + """ + self.write_log(u'{},委托单:{}全部完成'.format(order.time, order.vt_orderid)) + order_info = self.active_orders[order.vt_orderid] + + # 通过vt_orderid,找到对应的网格 + grid = order_info.get('grid', None) + if grid is not None: + # 移除当前委托单 + if order.vt_orderid in grid.order_ids: + grid.order_ids.remove(order.vt_orderid) + + # 平仓完毕(cover, sell) + if order_info.get("offset", None) != Offset.OPEN: + act_target_volume = grid.snapshot.get("act_target_volume") + act_open_volume = grid.snapshot.get("act_open_volume") + pas_target_volume = grid.snapshot.get("pas_target_volume") + pas_open_volume = grid.snapshot.get("pas_open_volume") + + # 主动腿和被动腿都平仓完毕 + if pas_target_volume == pas_open_volume == 0 and act_target_volume == act_open_volume == 0: + grid.open_status = False + # grid.close_status = True + + self.write_log(f'{grid.direction.value}单已平仓完毕,手数:{grid.volume}, 详细:{grid.snapshot}') + + self.update_pos(price=grid.close_price, + volume=grid.volume, + operation='cover' if grid.direction == Direction.SHORT else 'sell', + dt=self.cur_datetime) + + self.write_log(f'移除网格:{grid.to_json()}') + self.gt.remove_grids_by_ids(direction=grid.direction, ids=[grid.id]) + + # 开仓完毕( buy, short) + else: + act_target_volume = grid.snapshot.get("act_target_volume") + act_open_volume = grid.snapshot.get("act_open_volume") + pas_target_volume = grid.snapshot.get("pas_target_volume") + pas_open_volume = grid.snapshot.get("pas_open_volume") + act_open_price = grid.snapshot.get('act_open_price') + pas_open_price = grid.snapshot.get('pas_open_price') + + # 主动腿和被动腿都开仓完毕 + if pas_target_volume == pas_open_volume > 0 and act_target_volume == act_open_volume > 0: + grid.order_status = False + grid.traded_volume = 0 + grid.open_status = True + grid.open_time = self.cur_datetime + + # 按照实际开仓开仓价进行更新 + if grid.direction == Direction.LONG: + if act_open_price and pas_open_price and act_open_price - pas_open_price < grid.open_price: + real_open_price = act_open_price - pas_open_price + self.write_log(f'[正套{grid.open_price}=>{grid.close_price}] 调整:{real_open_price}=>{grid.close_price}') + grid.open_price = real_open_price + elif grid.direction == Direction.SHORT: + if act_open_price and pas_open_price and act_open_price - pas_open_price > grid.open_price: + real_open_price = act_open_price - pas_open_price + self.write_log( + f'[反套{grid.open_price}=>{grid.close_price}] 调整:{real_open_price}=>{grid.close_price}') + grid.open_price = real_open_price + + self.write_log(f'{grid.direction.value}单已开仓完毕,,手数:{grid.volume}, 详细:{grid.snapshot}') + self.update_pos(price=grid.open_price, + volume=grid.volume, + operation='short' if grid.direction == Direction.SHORT else 'buy', + dt=self.cur_datetime) + + # 在策略得活动订单中,移除 + self.history_orders[order.vt_orderid] = self.active_orders.pop(order.vt_orderid, None) + self.gt.save() + if len(self.active_orders) < 1: + self.entrust = 0 + return + + def on_order_open_canceled(self, order: OrderData): + """ + 委托开仓单撤销 + :param order: + :return: + """ + self.write_log(u'委托开仓单撤销:{}'.format(order.__dict__)) + + if not self.trading: + if not self.backtesting: + self.write_error(u'当前不允许交易') + return + + if order.vt_orderid not in self.active_orders: + self.write_error(u'{}不在未完成的委托单中{}。'.format(order.vt_orderid, self.active_orders)) + return + + # 直接更新“未完成委托单”,更新volume,retry次数 + old_order = self.active_orders[order.vt_orderid] + self.write_log(u'{} 委托信息:{}'.format(order.vt_orderid, old_order)) + old_order['traded'] = order.traded + order_vt_symbol = copy(old_order['vt_symbol']) + order_symbol, order_exchange = extract_vt_symbol(order_vt_symbol) + + order_volume = old_order['volume'] - old_order['traded'] + if order_volume <= 0: + msg = u'{} {}{}需重新开仓数量为{},不再开仓' \ + .format(self.strategy_name, + order.vt_orderid, + order_vt_symbol, + order_volume) + self.write_error(msg) + + self.write_log(u'移除:{}'.format(order.vt_orderid)) + self.history_orders[order.vt_orderid] = self.active_orders.pop(order.vt_orderid, None) + return + + order_price = old_order['price'] + order_type = old_order.get('order_type', OrderType.LIMIT) + + grid = old_order.get('grid', None) + + pre_status = old_order.get('status', Status.NOTTRADED) + old_order.update({'status': Status.CANCELLED}) + self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) + if grid: + if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') + grid.order_ids.remove(order.vt_orderid) + + # if not grid.order_ids: + # grid.order_status = False + + self.gt.save() + self.active_orders.update({order.vt_orderid: old_order}) + + self.display_grids() + + def on_order_close_canceled(self, order: OrderData): + """委托平仓单撤销""" + self.write_log(u'委托平仓单撤销:{}'.format(order.__dict__)) + + if order.vt_orderid not in self.active_orders: + self.write_error(u'{}不在未完成的委托单中:{}。'.format(order.vt_orderid, self.active_orders)) + return + + if not self.trading: + self.write_error(f'{self.cur_datetime} 当前不允许交易') + return + + # 直接更新“未完成委托单”,更新volume,Retry次数 + old_order = self.active_orders[order.vt_orderid] + self.write_log(u'{} 订单信息:{}'.format(order.vt_orderid, old_order)) + old_order['traded'] = order.traded + # order_time = old_order['order_time'] + order_vt_symbol = copy(old_order['vt_symbol']) + order_symbol, order_exchange = extract_vt_symbol(order_vt_symbol) + + order_volume = old_order['volume'] - old_order['traded'] + if order_volume <= 0: + msg = u'{} {}{}重新平仓数量为{},不再平仓' \ + .format(self.strategy_name, order.vt_orderid, order_vt_symbol, order_volume) + self.write_error(msg) + self.send_wechat(msg) + self.write_log(u'活动订单移除:{}'.format(order.vt_orderid)) + self.history_orders[order.vt_orderid] = self.active_orders.pop(order.vt_orderid, None) + return + + order_price = old_order['price'] + order_type = old_order.get('order_type', OrderType.LIMIT) + + grid = old_order.get('grid', None) + + pre_status = old_order.get('status', Status.NOTTRADED) + old_order.update({'status': Status.CANCELLED}) + self.write_log(u'委托单状态:{}=>{}'.format(pre_status, old_order.get('status'))) + if grid: + if order.vt_orderid in grid.order_ids: + self.write_log(f'移除grid中order_ids:{order.vt_orderid}') + grid.order_ids.remove(order.vt_orderid) + # if len(grid.order_ids) == 0: + # grid.order_status = False + self.gt.save() + self.active_orders.update({order.vt_orderid: old_order}) + + self.display_grids() + + def cancel_logic(self, dt, force=False, reopen=False): + "撤单逻辑""" + if len(self.active_orders) < 1: + self.entrust = 0 + + canceled_ids = [] + + for vt_orderid in list(self.active_orders.keys()): + order_info = self.active_orders.get(vt_orderid, None) + if order_info is None: + continue + order_vt_symbol = order_info.get('vt_symbol', self.vt_symbol) + order_symbol, order_exchange = extract_vt_symbol(order_vt_symbol) + order_time = order_info['order_time'] + order_volume = order_info['volume'] - order_info['traded'] + order_grid = order_info.get('grid', None) + order_status = order_info.get('status', Status.NOTTRADED) + order_type = order_info.get('order_type', OrderType.LIMIT) + over_seconds = (dt - order_time).total_seconds() + + # 只处理未成交的限价委托单 + if order_status in [Status.SUBMITTING, Status.NOTTRADED] and (order_type == OrderType.LIMIT): + if over_seconds > self.cancel_seconds or force: # 超过设置的时间还未成交 + self.write_log(u'超时{}秒未成交,取消委托单:vt_orderid:{},order:{}' + .format(over_seconds, vt_orderid, order_info)) + order_info.update({'status': Status.CANCELLING}) + self.active_orders.update({vt_orderid: order_info}) + ret = self.cancel_order(str(vt_orderid)) + if not ret: + self.write_log(f'{vt_orderid}撤单失败,更新状态为撤单成功') + order_info.update({'status': Status.CANCELLED}) + self.active_orders.update({vt_orderid: order_info}) + else: + self.write_log(f'{vt_orderid}撤单成功') + if order_grid: + if vt_orderid in order_grid.order_ids: + self.write_log(f'{vt_orderid}存在网格委托队列{order_grid.order_ids}中,移除') + order_grid.order_ids.remove(vt_orderid) + + continue + + # 处理状态为‘撤销’的委托单 + elif order_status == Status.CANCELLED: + self.write_log(u'委托单{}已成功撤单,删除{}'.format(vt_orderid, order_info)) + canceled_ids.append(vt_orderid) + + # 删除撤单的订单 + for vt_orderid in canceled_ids: + self.write_log(u'删除orderID:{0}'.format(vt_orderid)) + self.history_orders[vt_orderid] = self.active_orders.pop(vt_orderid, None) + + if len(self.active_orders) == 0: + self.entrust = 0 + + def check_ordering_grids(self): + """ + 检查正在交易得网格 + :return: + """ + + # 扫描反套网格 + for grid in self.gt.up_grids: + + # 不是在委托得网格,不处理 + if not grid.order_status: + continue + act_target_volume = grid.snapshot.get('act_target_volume') + pas_target_volume = grid.snapshot.get('pas_target_volume') + act_open_volume = grid.snapshot.get('act_open_volume') + pas_open_volume = grid.snapshot.get('pas_open_volume') + + # 正在委托反套单开仓状态 + if not grid.open_status and not grid.close_status: + + # 持平、未满足开仓目标 + if act_open_volume == pas_open_volume < act_target_volume == pas_target_volume: + # 价差满足,目前没有委托单 + if self.cur_spd_tick.bid_price_1 >= grid.open_price and len(grid.order_ids) == 0: + # 买入被动腿数量 + buy_pas_volume = pas_target_volume - pas_open_volume + # 开多被动腿(限价单) + pas_vt_orderids = self.buy(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.ask_price_1, + volume=buy_pas_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'spd_short,{self.pas_vt_symbol}开多仓{buy_pas_volume}手失败,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + continue + + # 主动腿缺腿,当前没有委托单 + if act_open_volume < pas_open_volume and len(grid.order_ids) == 0: + short_act_volume = pas_open_volume - act_open_volume + # 开空主动腿 + act_vt_orderids = self.short(vt_symbol=self.act_vt_symbol, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_act_tick.bid_price_1, + volume=short_act_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not act_vt_orderids: + self.write_error(f'spd_short,{self.act_vt_symbol}开空仓{short_act_volume}手失败,' + f'委托价:{self.cur_act_tick.bid_price_1}') + continue + + # 正在委托得反套单平仓状态 + if grid.open_status and grid.close_status : + # 持平、未满足平仓目标 + if act_open_volume == pas_open_volume > act_target_volume == pas_target_volume: + # 价差满足平仓,目前没有委托单 + if self.cur_spd_tick.bid_price_1 <= grid.close_price and len(grid.order_ids) == 0: + pas_close_volume = pas_open_volume + # 被动腿多单平仓 + pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.bid_price_1, + volume=pas_close_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'spd_cover,{self.pas_vt_symbol}多单平仓{pas_close_volume}手失败,' + f'委托价:{self.cur_pas_tick.bid_price_1}') + continue + + # 当主动腿瘸腿时,降低主动腿 + if pas_target_volume <= pas_open_volume < act_open_volume and len(grid.order_ids) == 0: + act_close_volume = act_open_volume - pas_open_volume + # 主动腿空单平仓 + act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_act_tick.ask_price_1, + volume=act_close_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not act_vt_orderids: + self.write_error(f'spd_cover{self.act_vt_symbol}空单平仓{act_close_volume}手失败,' + f'委托价:{self.cur_act_tick.ask_price_1}') + continue + + # 扫描正套网格 + for grid in self.gt.dn_grids: + + # 不是在委托得网格,不处理 + if not grid.order_status: + continue + act_target_volume = grid.snapshot.get('act_target_volume') + pas_target_volume = grid.snapshot.get('pas_target_volume') + act_open_volume = grid.snapshot.get('act_open_volume') + pas_open_volume = grid.snapshot.get('pas_open_volume') + + # 正在委托正套单开仓状态 + if not grid.open_status and not grid.close_status: + # 持平、未满足开仓目标 + if act_open_volume == pas_open_volume < act_target_volume == pas_target_volume: + # 价差满足,目前没有委托单 + if self.cur_spd_tick.ask_price_1 <= grid.open_price and len(grid.order_ids) == 0: + short_pas_volume = pas_target_volume - pas_open_volume + # 开空被动腿 + pas_vt_orderids = self.short(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.bid_price_1, + volume=short_pas_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'[正套{grid.open_price}=>{grid.close_price}],' + f'被动腿{self.pas_vt_symbol}开空仓{short_pas_volume}手失败,' + f'委托价:{self.cur_pas_tick.bid_price_1}') + continue + self.write_log(f'[正套{grid.open_price}=>{grid.close_price}],' + f'被动腿{self.pas_vt_symbol}开空仓{short_pas_volume}手,' + f'委托价:{self.cur_pas_tick.bid_price_1}') + continue + + # 主动腿缺腿,当前没有委托单 + if act_open_volume < pas_open_volume and len(grid.order_ids) == 0: + buy_act_volume = pas_open_volume - act_open_volume + act_vt_orderids = self.buy(vt_symbol=self.act_vt_symbol, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_act_tick.ask_price_1, + volume=buy_act_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not act_vt_orderids: + self.write_error(f'[正套{grid.open_price}=>{grid.close_price}],' + f'主动腿{self.act_vt_symbol}开多仓{buy_act_volume}手,' + f'委托价:{self.cur_act_tick.ask_price_1}') + + # 正在委托得正套单平仓状态 + if grid.open_status and grid.close_status: + # 持平、未满足平仓目标 + if act_open_volume == pas_open_volume > act_target_volume == pas_target_volume: + # 价差满足平仓,目前没有委托单 + if self.cur_spd_tick.bid_price_1 >= grid.close_price and len(grid.order_ids) == 0: + pas_close_volume = pas_open_volume + # 被动腿空单平仓 + pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.ask_price_1, + volume=pas_close_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'[正套{grid.open_price}=>{grid.close_price}],' + f'被动腿{self.pas_vt_symbol}空单平仓{pas_close_volume}手失败,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + continue + self.write_log(f'[正套{grid.open_price}=>{grid.close_price}],' + f'被动腿{self.pas_vt_symbol}空单平仓{pas_close_volume}手,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + continue + + if pas_target_volume <= pas_open_volume < act_open_volume and len(grid.order_ids) == 0: + act_close_volume = act_open_volume - pas_open_volume + # 主动腿多单平仓 + act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol, + lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_act_tick.bid_price_1, + volume=act_close_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not act_vt_orderids: + self.write_error(f'[正套{grid.open_price}=>{grid.close_price}],' + f'主动腿{self.act_vt_symbol}多单平仓{act_close_volume}手失败,' + f'委托价:{self.cur_act_tick.bid_price_1}') + continue + self.write_log(f'[正套{grid.open_price}=>{grid.close_price}],' + f'主动腿{self.act_vt_symbol}多单平仓{act_close_volume}手,' + f'委托价:{self.cur_act_tick.bid_price_1}') + + def spd_buy(self, grid: CtaGrid, force: bool = False): + """非标准合约的套利正套指令""" + self.write_log(u'套利价差正套单,price={},volume={}'.format(grid.open_price, grid.volume)) + + if self.entrust != 0: + self.write_log(u'[正套]正在委托,不开仓') + return [] + if not self.trading: + self.write_log(u'[正套]停止状态,不开仓') + return [] + if not self.allow_trading_open: + self.write_log(f'[正套]{self.cur_datetime}不允许开仓') + return [] + if self.force_trading_close: + self.write_log(f'[正套]{self.cur_datetime}强制平仓日,不开仓') + return [] + + # 检查流动性缺失 + if not self.check_liquidity( + direction=Direction.LONG, + ask_volume=grid.volume * self.act_vol_ratio, + bid_volume=grid.volume * self.pas_vol_ratio + ) \ + and not force: + return [] + + # 检查涨跌停距离 + if self.check_near_up_nor_down(): + return [] + + if self.cur_spd_tick.bid_price_1 > grid.open_price and not force: + self.write_log(u'[正套]价差{}不满足:{}'.format(self.cur_spd_tick.bid_price_1, grid.open_price)) + return [] + + # 判断主动腿、被动腿得动能方向,选择优先下单得合约 + # 主动腿目标、被动腿目标 + act_target_volume = grid.volume * self.act_vol_ratio + pas_target_volume = grid.volume * self.pas_vol_ratio + + # 检查主动腿和被动腿需要得保证金,检查账号是否满足保证金 + # 主动腿保证金/被动腿保证金 + act_margin = act_target_volume * self.cur_act_tick.last_price * self.act_symbol_size * self.act_margin_rate + pas_margin = pas_target_volume * self.cur_pas_tick.last_price * self.pas_symbol_size * self.pas_margin_rate + + # 当前净值,可用资金,资金占用比例,资金上限 + balance, avaliable, occupy_percent, percent_limit = self.cta_engine.get_account() + + # 同一品种套利 + invest_margin = max(act_margin, pas_margin) + + # 计划使用保证金 + target_margin = balance * (occupy_percent / 100) + invest_margin + + if 100 * (target_margin / balance) > percent_limit: + self.write_error(u'[正套]委托后,预计当前资金占用:{},超过限定:{}比例,不能开仓' + .format(100 * (target_margin / balance), percent_limit)) + return [] + + # + # # 开多主动腿(FAK 或者限价单) + # act_vt_orderids = self.buy(vt_symbol=self.act_vt_symbol, + # lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + # price=self.cur_act_tick.ask_price_1, + # volume=grid.volume * self.act_vol_ratio, + # order_type=self.order_type, + # order_time=self.cur_datetime if self.backtesting else datetime.now(), + # grid=grid) + # if not act_vt_orderids: + # self.write_error(f'spd_buy,{self.act_vt_symbol}开多仓{grid.volume * self.act_vol_ratio}手失败,' + # f'委托价:{self.cur_act_tick.ask_price_1}') + # return [] + + # 开空被动腿 + pas_vt_orderids = self.short(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.bid_price_1, + volume=grid.volume * self.pas_vol_ratio, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'[正套-被动腿],{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手失败,' + f'委托价:{self.cur_pas_tick.bid_price_1}') + return [] + self.write_log(f'[正套-被动腿],{self.pas_vt_symbol}开空仓{grid.volume * self.pas_vol_ratio}手,' + f'委托价:{self.cur_pas_tick.bid_price_1}') + + # 利用网格得snapshort进行记录,当前持有仓位,目标仓位 + grid.snapshot.update( + {"act_vt_symbol": self.act_vt_symbol, "act_open_volume": 0, 'act_target_volume': act_target_volume, + "pas_vt_symbol": self.pas_vt_symbol, "pas_open_volume": 0, 'pas_target_volume': pas_target_volume}) + grid.order_status = True + grid.order_time = self.cur_datetime + vt_orderids = pas_vt_orderids + self.write_log(u'[正套][被动腿] vt_ordderids:{}'.format(vt_orderids)) + + # 添加正套得委托单跟踪,对象是网格id,内容是方向 + self.spd_orders.update({grid.id: {'direction': Direction.LONG}, 'offset': Offset.OPEN}) + + return vt_orderids + + def spd_short(self, grid: CtaGrid, force: bool = False): + """非标准合约的套利反套指令""" + self.write_log(u'委托反套单,price={},volume={}'.format(grid.open_price, grid.volume)) + + if grid.order_status: + self.write_log(u'[反套]正在委托,不重复开仓') + return [] + if not self.trading: + self.write_log(u'[反套]停止状态,不开仓') + return [] + if not self.allow_trading_open: + self.write_log(f'[反套]{self.cur_datetime}不允许开仓') + return [] + if self.force_trading_close: + self.write_log(f'[反套]{self.cur_datetime}强制平仓日,不开仓') + return [] + # 检查流动性缺失 + if not self.check_liquidity(direction=Direction.SHORT, + ask_volume=grid.volume * self.pas_vol_ratio, + bid_volume=grid.volume * self.act_vol_ratio + ) and not force: + return [] + # 检查涨跌停距离 + if self.check_near_up_nor_down(): + return [] + + if self.cur_spd_tick.bid_price_1 < grid.open_price and not force: + self.write_log(u'[反套]{}不满足开仓条件:{}'.format(self.cur_spd_tick.bid_price_1, grid.open_price)) + return [] + + # 判断主动腿、被动腿得动能方向,选择优先下单得合约 + # 主动腿目标、被动腿目标 + act_target_volume = grid.volume * self.act_vol_ratio + pas_target_volume = grid.volume * self.pas_vol_ratio + + # 检查主动腿和被动腿需要得保证金,检查账号是否满足保证金 + # 主动腿保证金/被动腿保证金 + act_margin = act_target_volume * self.cur_act_tick.last_price * self.act_symbol_size * self.act_margin_rate + pas_margin = pas_target_volume * self.cur_pas_tick.last_price * self.pas_symbol_size * self.pas_margin_rate + + # 当前净值,可用资金,资金占用比例,资金上限 + balance, avaliable, occupy_percent, percent_limit = self.cta_engine.get_account() + + # 同一品种套利 + invest_margin = max(act_margin, pas_margin) + + # 计划使用保证金 + target_margin = balance * (occupy_percent / 100) + invest_margin + + if 100 * (target_margin / balance) > percent_limit: + self.write_error(u'[反套]委托后,预计当前资金占用:{},超过限定:{}比例,不能开仓' + .format(100 * (target_margin / balance), percent_limit)) + return [] + # # 开空主动腿 + # act_vt_orderids = self.short(vt_symbol=self.act_vt_symbol, + # lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + # price=self.cur_act_tick.bid_price_1, + # volume=grid.volume * self.act_vol_ratio, + # order_type=self.order_type, + # order_time=self.cur_datetime if self.backtesting else datetime.now(), + # grid=grid) + # if not act_vt_orderids: + # self.write_error(f'spd_short,{self.act_vt_symbol}开空仓{grid.volume * self.act_vol_ratio}手失败,' + # f'委托价:{self.cur_act_tick.bid_price_1}') + # return [] + + # 开多被动腿(FAK或者限价单) + pas_vt_orderids = self.buy(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.ask_price_1, + volume=pas_target_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'[反套-被动腿],{self.pas_vt_symbol}开多仓{pas_target_volume}手失败,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + return [] + self.write_log(f'[反套-被动腿],{self.pas_vt_symbol}开多仓{pas_target_volume}手,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + + # 设置实际开仓数量为0,用于不断实现目标 + grid.snapshot.update({"act_vt_symbol": self.act_vt_symbol, + "act_open_volume": 0, + 'act_target_volume': act_target_volume, + "pas_vt_symbol": self.pas_vt_symbol, + "pas_open_volume": 0, + 'pas_target_volume': pas_target_volume}) + + grid.order_status = True + grid.order_time = self.cur_datetime + + vt_orderids = pas_vt_orderids + self.write_log(u'[反套-被动腿] vt_order_ids:{0}'.format(vt_orderids)) + return vt_orderids + + + def spd_sell(self, grid: CtaGrid, force: bool = False): + """非标准合约的套利平正套指令""" + self.write_log(f'[正套:{grid.open_price}=>{grid.close_price}]平仓,volume={grid.volume}') + if grid.order_status: + self.write_log(f'[正套:{grid.open_price}=>{grid.close_price}]正在委托,不平仓') + return [] + if not self.trading: + self.write_log(f'[正套:{grid.open_price}=>{grid.close_price}]策略处于停止状态,不平仓') + return [] + # 检查流动性缺失 + if not self.check_liquidity( + direction=Direction.SHORT, + ask_volume=grid.volume * self.pas_vol_ratio, + bid_volume=grid.volume * self.act_vol_ratio + ) and not force: + return [] + + # 检查涨跌停距离 + if self.check_near_up_nor_down(): + return [] + + if self.cur_spd_tick.bid_price_1 < grid.close_price and not force: + self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.bid_price_1, grid.close_price)) + return [] + + # 获取账号持仓 + self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol) + self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol) + if not all([self.act_pos, self.pas_pos]): + self.write_error(f'[正套:{grid.open_price}=>{grid.close_price}]主动腿/被动腿的账号持仓数据不存在') + return [] + + # 获取需要平仓的主动腿、被动腿volume + act_close_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) + pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) + + # 检查账号持仓是否满足平仓目标 + if self.act_pos.long_pos < act_close_volume and not (self.act_exchange == Exchange.CFFEX or self.activate_lock): + self.write_error(f'[正套]平仓,账号主动腿 {self.act_vt_symbol} 多单持仓{self.act_pos.long_pos}' + f'今仓{self.act_pos.long_td}/昨{self.act_pos.long_yd}, 不满足{act_close_volume}') + return [] + if self.pas_pos.short_pos < pas_close_volume and not ( + self.pas_exchange == Exchange.CFFEX or self.activate_lock): + self.write_error(f'[正套]平仓,账号被动腿 {self.pas_vt_symbol} 多单持仓{self.pas_pos.short_pos}' + f'今仓{self.pas_pos.short_td}/昨{self.pas_pos.short_yd}, 不满足{act_close_volume}') + return [] + + # 更新主动腿和被动腿得目标持仓为0,即平掉仓位 + grid.snapshot.update({"act_target_volume": 0,"pas_target_volume": 0}) + + # 被动腿空单平仓 + pas_vt_orderids = self.cover(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.ask_price_1, + volume=pas_close_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'[正套:{grid.open_price}=>{grid.close_price}],{self.pas_vt_symbol}空单平仓{pas_close_volume}手失败,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + return [] + + self.write_log(f'[正套:{grid.open_price}=>{grid.close_price}] {self.pas_vt_symbol}空单平仓{pas_close_volume}手,' + f'委托价:{self.cur_pas_tick.ask_price_1}') + + # 如果属于强制平仓得话,设置close价格低于当前价差。 + if force: + new_close_price = self.cur_spd_tick.bid_price_1 - 10 * self.act_price_tick + self.write_log(f'[正套:{grid.open_price}=>{grid.close_price}] 调整平仓价:{new_close_price}') + grid.close_price = self.cur_spd_tick.bid_price_1 - 10 * self.act_price_tick + + # + # # 主动腿多单平仓 + # act_vt_orderids = self.sell(vt_symbol=self.act_vt_symbol, + # lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + # price=self.cur_act_tick.bid_price_1, + # volume=grid.volume * self.act_vol_ratio, + # order_type=self.order_type, + # order_time=self.cur_datetime if self.backtesting else datetime.now(), + # grid=grid) + # if not act_vt_orderids: + # self.write_error(f'spd_sell,{self.act_vt_symbol}多单平仓{grid.volume * self.act_vol_ratio}手失败,' + # f'委托价:{self.cur_act_tick.bid_price_1}') + # return [] + + grid.close_status = True + grid.order_status = True + grid.order_time = self.cur_datetime + vt_orderids = pas_vt_orderids + self.write_log(f'[正套:{grid.open_price}=>{grid.close_price}] vt_orderids:{vt_orderids}') + + return vt_orderids + + # ---------------------------------------------------------------------- + def spd_cover(self, grid: CtaGrid, force: bool = False): + """非标准合约的套利平反套指令""" + self.write_log(u'套利价差反套单平仓,price={},volume={}'.format(grid.close_price, grid.volume)) + if grid.order_status: + self.write_log(u'[反套] 正在委托平仓,不重复') + return [] + if not self.trading: + self.write_log(u'停止状态,不平仓') + return [] + # 检查流动性缺失 + if not self.check_liquidity( + direction=Direction.LONG, + ask_volume=grid.volume * self.act_vol_ratio, + bid_volume=grid.volume * self.pas_vol_ratio + ) and not force: + return [] + # 检查涨跌停距离 + if self.check_near_up_nor_down(): + return [] + + if self.cur_spd_tick.ask_price_1 > grid.close_price and not force: + self.write_log(u'实际价差{}不满足:{}'.format(self.cur_spd_tick.ask_price_1, grid.close_price)) + return [] + + # 获取账号内主动腿和被动腿的持仓 + self.act_pos = self.cta_engine.get_position_holding(vt_symbol=self.act_vt_symbol) + self.pas_pos = self.cta_engine.get_position_holding(vt_symbol=self.pas_vt_symbol) + + if not all([self.act_pos, self.pas_pos]): + self.write_error('主动腿/被动退得持仓数据不存在') + return [] + + # 检查主动腿、被动腿,是否满足 + act_close_volume = grid.snapshot.get('act_open_volume', grid.volume * self.act_vol_ratio) + pas_close_volume = grid.snapshot.get('pas_open_volume', grid.volume * self.pas_vol_ratio) + if self.act_pos.short_pos < act_close_volume and not ( + self.act_exchange == Exchange.CFFEX or self.activate_lock): + self.write_error(f'账号 {self.act_vt_symbol} 空单持仓{self.act_pos.short_pos}' + f'今仓{self.act_pos.short_td}/昨{self.act_pos.short_yd}, 不满足{act_close_volume}') + return [] + if self.pas_pos.long_pos < pas_close_volume and not (self.pas_exchange == Exchange.CFFEX or self.activate_lock): + self.write_error(f'账号 {self.pas_vt_symbol} 多单持仓{self.pas_pos.long_pos}' + f'今仓{self.pas_pos.long_td}/昨{self.pas_pos.long_yd}, 不满足{act_close_volume}') + return [] + + # 更新主动腿和被动腿得目标持仓为0,即平掉仓位 + grid.snapshot.update({"act_target_volume": 0, "pas_target_volume": 0}) + + # 被动腿多单平仓 + pas_vt_orderids = self.sell(vt_symbol=self.pas_vt_symbol, + lock=self.pas_exchange == Exchange.CFFEX or self.activate_lock, + price=self.cur_pas_tick.bid_price_1, + volume=pas_close_volume, + order_type=self.order_type, + order_time=self.cur_datetime if self.backtesting else datetime.now(), + grid=grid) + if not pas_vt_orderids: + self.write_error(f'spd_cover,{self.pas_vt_symbol}多单平仓{pas_close_volume}手失败,' + f'委托价:{self.cur_pas_tick.bid_price_1}') + return [] + + # 如果属于强制平仓得话,设置close价格高于于当前价差10跳。 + if force: + grid.close_price = self.cur_spd_tick.ask_price_1 + 10 * self.act_price_tick + + # # 主动腿空单平仓 + # act_vt_orderids = self.cover(vt_symbol=self.act_vt_symbol, + # lock=self.act_exchange == Exchange.CFFEX or self.activate_lock, + # price=self.cur_act_tick.ask_price_1, + # volume=grid.volume * self.act_vol_ratio, + # order_type=self.order_type, + # order_time=self.cur_datetime if self.backtesting else datetime.now(), + # grid=grid) + # if not act_vt_orderids: + # self.write_error(f'spd_cover{self.act_vt_symbol}空单平仓{grid.volume * self.act_vol_ratio}手失败,' + # f'委托价:{self.cur_act_tick.ask_price_1}') + # return [] + grid.close_status = True + grid.order_status = True + grid.order_time = self.cur_datetime + vt_orderids = pas_vt_orderids + self.write_log(f'spd cover vt_orderids:{vt_orderids}') + return vt_orderids diff --git a/vnpy/app/index_tick_publisher/engine.py b/vnpy/app/index_tick_publisher/engine.py index 73cffd04..144768a8 100644 --- a/vnpy/app/index_tick_publisher/engine.py +++ b/vnpy/app/index_tick_publisher/engine.py @@ -100,7 +100,8 @@ class IndexTickPublisherV2(BaseEngine): exchange=conf.get('exchange', 'x_fanout_idx_tick')) self.write_log(f'创建发布器成功') except Exception as ex: - self.write_log(u'创建tick发布器异常:{}'.format(str(ex))) + self.write_error(u'创建tick发布器异常:{}'.format(str(ex))) + self.write_error(traceback.format_exc()) # ---------------------------------------------------------------------- def registerEvent(self): diff --git a/vnpy/data/binance/binance_future_data.py b/vnpy/data/binance/binance_future_data.py index ec2e0a6d..f6c7f8ef 100644 --- a/vnpy/data/binance/binance_future_data.py +++ b/vnpy/data/binance/binance_future_data.py @@ -259,6 +259,7 @@ class BinanceFutureData(RestClient): contracts = load_json(f, auto_save=False) return contracts + def save_contracts(self): """保存合约配置""" contracts = self.get_contracts() diff --git a/vnpy/data/mongo/mongo_data.py b/vnpy/data/mongo/mongo_data.py index 89e18290..5be34339 100644 --- a/vnpy/data/mongo/mongo_data.py +++ b/vnpy/data/mongo/mongo_data.py @@ -181,8 +181,10 @@ class MongoData(object): return [] def db_query_by_sort(self, db_name, col_name, filter_dict, sort_name, sort_type, limitNum=0): - """从MongoDB中读取数据,d是查询要求,sortName是排序的字段,sortType是排序类型 - 返回的是数据库查询的指针""" + """ + 从MongoDB中读取数据,d是查询要求,sortName是排序的字段,sortType是排序类型,1正序,-1倒序 + 返回的是数据库查询的指针 + """ try: if self.db_client: db = self.db_client[db_name] diff --git a/vnpy/data/tdx/tdx_stock_data.py b/vnpy/data/tdx/tdx_stock_data.py index 1c58811c..199a4876 100644 --- a/vnpy/data/tdx/tdx_stock_data.py +++ b/vnpy/data/tdx/tdx_stock_data.py @@ -263,12 +263,25 @@ class TdxStockData(object): return results - def get_name(self, code, market_id): - symbol_info = self.symbol_dict.get(f'{code}_{market_id}') - if symbol_info: - return symbol_info.get('name', code) + def get_name(self, symbol, market_id=None): + """ + 获取名称 + :param symbol: 代码 或者 代码.交易所 + :param market_id: 如果存在代码.交易所时,不使用该值 + :return: + """ + if '.' in symbol: + symbol, exchange = symbol.split('.') + if exchange == Exchange.SSE.value: + market_id = 1 + elif exchange == Exchange.SZSE.value: + market_id = 0 - return code + symbol_info = self.symbol_dict.get(f'{symbol}_{market_id}') + if symbol_info: + return symbol_info.get('name', symbol) + + return symbol # ---------------------------------------------------------------------- def get_bars(self, diff --git a/vnpy/gateway/ctp/ctp_gateway.py b/vnpy/gateway/ctp/ctp_gateway.py index 12955c1b..9988f62a 100644 --- a/vnpy/gateway/ctp/ctp_gateway.py +++ b/vnpy/gateway/ctp/ctp_gateway.py @@ -963,6 +963,47 @@ class CtpTdApi(TdApi): """""" self.gateway.write_error("交易撤单失败", error) + def onRspParkedOrderInsert(self, data: dict, error: dict, reqid: int, last: bool): + """""" + self.gateway.write_log('预埋单回报') + + order_ref = data["OrderRef"] + orderid = f"{self.frontid}_{self.sessionid}_{order_ref}" + + symbol = data["InstrumentID"] + exchange = symbol_exchange_map[symbol] + + order_type = OrderType.LIMIT + if data["OrderPriceType"] == THOST_FTDC_OPT_LimitPrice and data["TimeCondition"] == THOST_FTDC_TC_IOC: + if data["VolumeCondition"] == THOST_FTDC_VC_AV: + order_type = OrderType.FAK + elif data["VolumeCondition"] == THOST_FTDC_VC_CV: + order_type = OrderType.FOK + + if data["OrderPriceType"] == THOST_FTDC_OPT_AnyPrice: + order_type = OrderType.MARKET + + order = OrderData( + symbol=symbol, + exchange=exchange, + accountid=self.accountid, + orderid=orderid, + type=order_type, + direction=DIRECTION_CTP2VT[data["Direction"]], + offset=OFFSET_CTP2VT.get(data["CombOffsetFlag"], Offset.NONE), + price=data["LimitPrice"], + volume=data["VolumeTotalOriginal"], + status=Status.REJECTED, + gateway_name=self.gateway_name + ) + self.gateway.on_order(order) + + # self.gateway.write_error("交易委托失败", error) + + def onRspParkedOrderAction(self, data: dict, error: dict, reqid: int, last: bool): + """""" + self.gateway.write_error("预埋单交易撤单失败", error) + def onRspQueryMaxOrderVolume(self, data: dict, error: dict, reqid: int, last: bool): """""" pass @@ -1424,7 +1465,34 @@ class CtpTdApi(TdApi): ctp_req["VolumeCondition"] = THOST_FTDC_VC_CV self.reqid += 1 - self.reqOrderInsert(ctp_req, self.reqid) + dt = datetime.now() + use_packed = False + if 3 < dt.hour < 8 or 15 < dt.hour < 20: + use_packed = True + else: + if dt.hour in [8,20] and dt.minute <55: + use_packed = True + if req.exchange != Exchange.CFFEX: + if dt.hour == 10 and 15< dt.minute<30: + use_packed = True + if dt.hour == 11 and dt.minute > 30: + use_packed = True + if dt.hour == 12: + use_packed = True + if dt.hour == 13 and dt.minute < 30: + use_packed = True + else: + if dt.hour == 9 and dt.minute < 25: + use_packed = True + if dt.hour == 11 and dt.minute > 30: + use_packed = True + if dt.hour == 12: + use_packed = True + if use_packed: + self.gateway.write_log(f'使用预埋单下单') + self.reqParkedOrderInsert(ctp_req, self.reqid) + else: + self.reqOrderInsert(ctp_req, self.reqid) orderid = f"{self.frontid}_{self.sessionid}_{self.order_ref}" order = req.create_order_data(orderid, self.gateway_name) diff --git a/vnpy/trader/converter.py b/vnpy/trader/converter.py index 8122a913..71648741 100644 --- a/vnpy/trader/converter.py +++ b/vnpy/trader/converter.py @@ -59,7 +59,10 @@ class OffsetConverter: def get_position_holding(self, vt_symbol: str, gateway_name: str = '') -> "PositionHolding": """获取持仓信息""" if gateway_name is None or len(gateway_name) == 0: - contract = self.main_engine.get_contract(vt_symbol) + if len(self.main_engine.gateways.keys()) == 1: + gateway_name = list(self.main_engine.gateways.keys())[0] + else: + contract = self.main_engine.get_contract(vt_symbol) if contract: gateway_name = contract.gateway_name diff --git a/vnpy/trader/engine.py b/vnpy/trader/engine.py index 8247f982..186f5980 100644 --- a/vnpy/trader/engine.py +++ b/vnpy/trader/engine.py @@ -153,7 +153,7 @@ class MainEngine: if not gateway: # 增加兼容得写法,如果没有输入gateway_name,但当前只有一个gateway时,就使用当前gateway if len(self.gateways.keys()) == 1: - return self.gateways.values()[0] + return list(self.gateways.values())[0] self.write_error(f"在{self.gateways.keys()}中找不到底层接口:{gateway_name}") return gateway diff --git a/vnpy/trader/gateway.py b/vnpy/trader/gateway.py index 165632ed..ad007340 100644 --- a/vnpy/trader/gateway.py +++ b/vnpy/trader/gateway.py @@ -447,6 +447,16 @@ class TickCombiner(object): u'leg2:{0}跌停{1},不合成价差Tick'.format(self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1)) return + # 忽略买卖价格差距过大的tick + if self.last_leg1_tick.ask_price_1 > 5 * self.last_leg1_tick.bid_price_1 > 10: + self.gateway.write_log(u'leg1:{0}买卖价格差距过大{1} {2},不合成价差Tick'.format( + self.last_leg1_tick.vt_symbol, self.last_leg1_tick.ask_price_1, self.last_leg1_tick.bid_price_1)) + return + if self.last_leg2_tick.ask_price_1 > 5 * self.last_leg2_tick.bid_price_1 > 10: + self.gateway.write_log(u'leg2:{0}买卖价格差距过大{1} {2},不合成价差Tick'.format( + self.last_leg2_tick.vt_symbol, self.last_leg2_tick.ask_price_1, self.last_leg2_tick.bid_price_1)) + return + if self.trading_day != tick.trading_day: self.trading_day = tick.trading_day self.spread_high = None diff --git a/vnpy/trader/ui/kline/kline.py b/vnpy/trader/ui/kline/kline.py index 82f33338..891e44e6 100644 --- a/vnpy/trader/ui/kline/kline.py +++ b/vnpy/trader/ui/kline/kline.py @@ -1374,7 +1374,10 @@ class KLineWidget(KeyWraper): # 标记时间 t_value = df_markup['datetime'].loc[idx] if not isinstance(t_value, datetime) and isinstance(t_value, str): - t_value = datetime.strptime(t_value, '%Y-%m-%d %H:%M:%S') + if '.' in t_value: + t_value = datetime.strptime(t_value, '%Y-%m-%d %H:%M:%S.%f') + else: + t_value = datetime.strptime(t_value, '%Y-%m-%d %H:%M:%S') price = df_markup['price'].loc[idx] markup_text = df_markup['markup'].loc[idx] diff --git a/vnpy/trader/ui/widget.py b/vnpy/trader/ui/widget.py index dba95511..69e8e881 100644 --- a/vnpy/trader/ui/widget.py +++ b/vnpy/trader/ui/widget.py @@ -20,7 +20,7 @@ from ..event import ( EVENT_ACCOUNT, EVENT_LOG ) -from ..object import OrderRequest, SubscribeRequest +from ..object import OrderRequest, SubscribeRequest,LogData from ..utility import load_json, save_json from ..setting import SETTING_FILENAME, SETTINGS @@ -646,6 +646,30 @@ class TradingWidget(QtWidgets.QWidget): cancel_button = QtWidgets.QPushButton("全撤") cancel_button.clicked.connect(self.cancel_all) + algo_stop_button = QtWidgets.QPushButton("全停算法") + algo_stop_button.clicked.connect(self.stop_algo) + + hbox_nomal = QtWidgets.QHBoxLayout() + hbox_nomal.addWidget(send_button) + hbox_nomal.addWidget(cancel_button) + hbox_nomal.addWidget(algo_stop_button) + + algo_button = QtWidgets.QPushButton("算法单") + algo_button.clicked.connect(self.send_algo) + + self.win_pips = QtWidgets.QLineEdit() + self.win_pips.setText('10') + self.stop_pips = QtWidgets.QLineEdit() + self.stop_pips.setText('5') + hbox_algo = QtWidgets.QHBoxLayout() + win_lable = QtWidgets.QLabel("止盈跳") + hbox_algo.addWidget(win_lable) + hbox_algo.addWidget(self.win_pips) + stop_lable = QtWidgets.QLabel("止损跳") + hbox_algo.addWidget(stop_lable) + hbox_algo.addWidget(self.stop_pips) + hbox_algo.addWidget(algo_button) + self.checkFixed = QtWidgets.QCheckBox("价格") # 价格固定选择框 form1 = QtWidgets.QFormLayout() @@ -658,8 +682,8 @@ class TradingWidget(QtWidgets.QWidget): form1.addRow(self.checkFixed, self.price_line) form1.addRow("数量", self.volume_line) form1.addRow("接口", self.gateway_combo) - form1.addRow(send_button) - form1.addRow(cancel_button) + form1.addRow(hbox_nomal) + form1.addRow(hbox_algo) # Market depth display area bid_color = "rgb(255,174,201)" @@ -852,6 +876,75 @@ class TradingWidget(QtWidgets.QWidget): self.ap4_label.setText("") self.ap5_label.setText("") + def stop_algo(self) -> None: + + if not self.main_engine.algo_engine: + QtWidgets.QMessageBox.critical(self, "算法引擎未启动", "请先启动算法引擎") + return + try: + self.main_engine.algo_engine.stop_all() + except Exception as ex: + QtWidgets.QMessageBox.critical(self, f"算法引擎异常{str(ex)}", "请查看详细日志") + + + def send_algo(self) ->None: + """启动算法""" + if not self.main_engine.algo_engine: + QtWidgets.QMessageBox.critical(self, "算法引擎未启动", "请先启动算法引擎") + return + template_name = 'AutoStopWinAlgo' + algo_template = self.main_engine.algo_engine.algo_templates.get(template_name,None) + if algo_template is None: + QtWidgets.QMessageBox.critical(self, f"算法[{template_name}]不存在", "请先部署算法") + return + symbol = str(self.symbol_line.text()) + if not symbol: + QtWidgets.QMessageBox.critical(self, "委托失败", "请输入合约代码") + return + + volume_text = str(self.volume_line.text()) + if not volume_text: + QtWidgets.QMessageBox.critical(self, "委托失败", "请输入委托数量") + return + volume = float(volume_text) + + price_text = str(self.price_line.text()) + if not price_text: + price = 0 + else: + price = float(price_text) + + exchange = Exchange(str(self.exchange_combo.currentText())) + + win_pips = str(self.win_pips.text()) + if int(win_pips) <=0: + QtWidgets.QMessageBox.critical(self, "止盈点数须大于0", "请输入正确止盈点数") + return + stop_pips = str(self.stop_pips.text()) + if int(stop_pips) <= 0: + QtWidgets.QMessageBox.critical(self, "止损点数须大于0", "请输入正确止损点数") + return + offset = Offset(str(self.offset_combo.currentText())) + if offset != Offset.OPEN: + QtWidgets.QMessageBox.critical(self, "算法只支持开仓", "请选择开仓方式") + return + + setting = { + "vt_symbol": f"{symbol}.{exchange.value}", + "direction": Direction(str(self.direction_combo.currentText())), + "open_price": price, + "win_pips": int(win_pips), + "stop_pips": int(stop_pips), + "volume": volume, + "near_pips": 2, # 价格接近多少个跳动才开始挂单(开仓) + "offset": offset + } + algo = algo_template.new(self.main_engine.algo_engine, setting) + algo.start() + self.main_engine.algo_engine.algos[algo.algo_name] = algo + + self.main_engine.write_log(msg=f'算法{algo.algo_name}启动') + def send_order(self) -> None: """ Send new order manually. diff --git a/vnpy/trader/utility.py b/vnpy/trader/utility.py index ff888fba..faaaa4c6 100644 --- a/vnpy/trader/utility.py +++ b/vnpy/trader/utility.py @@ -699,6 +699,16 @@ def save_data_to_pkb2(data: Any, pkb2_file_name): pickle.dump(data, f) +def get_months_diff(act_symbol, pas_symbol): + """获取合约得相隔月份""" + pas_month = int(pas_symbol[-2:]) + act_month = int(act_symbol[-2:]) + if pas_month < act_month: + pas_month += 12 + + return max(1, pas_month - act_month) + + class BarGenerator: """ For: