# CTA策略模块 ## 模块构成 CTA策略模块主要由7部分构成,如下图: - base:定义了CTA模块中用到的一些基础设置,如引擎类型(回测/实盘)、回测模式(K线/Tick)、本地停止单的定义以及停止单状态(等待中/已撤销/已触发)。 - template:定义了CTA策略模板(包含信号生成和委托管理)、CTA信号(仅负责信号生成)、目标仓位算法(仅负责委托管理,适用于拆分巨型委托,降低冲击成本)。 - strategies: 官方提供的cta策略示例,包含从最基础的双均线策略,到通道突破类型的布林带策略,到跨时间周期策略,再到把信号生成和委托管理独立开来的多信号策略。(用户自定义的策略也可以放在strategies文件夹内运行) - backesting:包含回测引擎和参数优化。其中回测引擎定义了数据载入、委托撮合机制、计算与统计相关盈利指标、结果绘图等函数。 - converter:定义了针对上期所品种平今/平昨模式的委托转换模块;对于其他品种用户也可以通过可选参数lock切换至锁仓模式。 - engine:定义了CTA策略实盘引擎,其中包括:RQData客户端初始化和数据载入、策略的初始化和启动、推送Tick订阅行情到策略中、挂撤单操作、策略的停止和移除等。 - ui:基于PyQt5的GUI图形应用。 ![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/seix_elementos.png "enter image title here")   ## 数据加载 在实盘中,RQData通过实时载入数据进行策略的初始化。该功能主要在CTA实盘引擎engine.py内实现。 下面介绍具体流程: - 在菜单栏点击“配置”,进入全局配置页面输入RQData账号密码;或者直接配置json文件,即在用户目录下.vntrader文件夹找到vt_setting.json,如图。 ![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/RQData_setting.png "enter image title here") - 初始化RQData客户端:从vt_setting.json中读取RQData的账户、密码到rq_client.init()函数进行初始化 ``` def init_rqdata(self): """ Init RQData client. """ username = SETTINGS["rqdata.username"] password = SETTINGS["rqdata.password"] if not username or not password: return import rqdatac self.rq_client = rqdatac self.rq_client.init(username, password, ('rqdatad-pro.ricequant.com', 16011)) ``` - RQData载入实盘数据:输入vt_symbol后,首先会转换成符合RQData格式的rq_symbol,通过get_price()函数下载数据,并且插入到数据库中。 ``` def query_bar_from_rq( self, vt_symbol: str, interval: Interval, start: datetime, end: datetime ): """ Query bar data from RQData. """ symbol, exchange_str = vt_symbol.split(".") rq_symbol = to_rq_symbol(vt_symbol) if rq_symbol not in self.rq_symbols: return None end += timedelta(1) # For querying night trading period data df = self.rq_client.get_price( rq_symbol, frequency=interval.value, fields=["open", "high", "low", "close", "volume"], start_date=start, end_date=end ) data = [] for ix, row in df.iterrows(): bar = BarData( symbol=symbol, exchange=Exchange(exchange_str), interval=interval, datetime=row.name.to_pydatetime(), open_price=row["open"], high_price=row["high"], low_price=row["low"], close_price=row["close"], volume=row["volume"], gateway_name="RQ" ) data.append(bar) ```   ## 策略开发 CTA策略模板提供完整的信号生成和委托管理功能,用户可以基于该模板自行开发策略。新策略可以放在用户运行的文件内(推荐),如在c:\users\administrator.vntrader目录下创建strategies文件夹;可以放在根目录下vnpy\app\cta_strategy\strategies文件夹内。 注意:策略文件命名是以下划线模式,如boll_channel_strategy.py;而策略类命名采用的是驼峰式,如BollChannelStrategy。 下面通过BollChannelStrategy策略示例,来展示策略开发的具体步骤: ### 参数设置 定义策略参数并且初始化策略变量。策略参数为策略类的公有属性,用户可以通过创建新的实例来调用或者改变策略参数。 如针对rb1905品种,用户可以创建基于BollChannelStrategy的策略示例,如RB_BollChannelStrategy,boll_window可以由18改成30。 创建策略实例的方法有效地实现了一个策略跑多个品种,并且其策略参数可以通过品种的特征进行调整。 ``` boll_window = 18 boll_dev = 3.4 cci_window = 10 atr_window = 30 sl_multiplier = 5.2 fixed_size = 1 boll_up = 0 boll_down = 0 cci_value = 0 atr_value = 0 intra_trade_high = 0 intra_trade_low = 0 long_stop = 0 short_stop = 0 ``` ### 类的初始化 初始化分3步: - 通过super( )的方法继承CTA策略模板,在__init__( )函数传入CTA引擎、策略名称、vt_symbol、参数设置。 - 调用K线生成模块:通过时间切片来把Tick数据合成1分钟K线数据,然后更大的时间周期数据,如15分钟K线。 - 调用K线时间序列管理模块:基于K线数据,如1分钟、15分钟,来生成相应的技术指标。 ``` def __init__(self, cta_engine, strategy_name, vt_symbol, setting): """""" super(BollChannelStrategy, self).__init__( cta_engine, strategy_name, vt_symbol, setting ) self.bg = BarGenerator(self.on_bar, 15, self.on_15min_bar) self.am = ArrayManager() ``` ### 策略的初始化、启动、停止 通过“CTA策略”组件的相关功能按钮实现。 注意:函数load_bar(10),代表策略初始化需要载入10个交易日的历史数据。该历史数据可以是Tick数据,也可以是K线数据。在策略初始化时候,会调用K线时间序列管理器计算并缓存相关的计算指标,但是并不触发交易。 ``` def on_init(self): """ Callback when strategy is inited. """ self.write_log("策略初始化") self.load_bar(10) def on_start(self): """ Callback when strategy is started. """ self.write_log("策略启动") def on_stop(self): """ Callback when strategy is stopped. """ self.write_log("策略停止") ``` ### Tick数据回报 策略订阅某品种合约行情,交易所会推送Tick数据到该策略上。 由于BollChannelStrategy是基于15分钟K线来生成交易信号的,故收到Tick数据后,需要用到K线生成模块里面的update_tick函数,通过时间切片的方法,聚合成1分钟K线数据,并且推送到on_bar函数。 ``` def on_tick(self, tick: TickData): """ Callback of new tick data update. """ self.bg.update_tick(tick) ``` ### K线数据回报 收到推送过来的1分钟K线数据后,通过K线生成模块里面的update_bar函数,以分钟切片的方法,合成15分钟K线数据,并且推送到on_15min_bar函数。 ``` def on_bar(self, bar: BarData): """ Callback of new bar data update. """ self.bg.update_bar(bar) ``` ### 15分钟K线数据回报 负责CTA信号的生成,由3部分组成: - 清空未成交委托:为了防止之前下的单子在上一个15分钟没有成交,但是下一个15分钟可能已经调整了价格,就用cancel_all()方法立刻撤销之前未成交的所有委托,保证策略在当前这15分钟开始时的整个状态是清晰和唯一的。 - 调用K线时间序列管理模块:基于最新的15分钟K线数据来计算相应计算指标,如布林带通道上下轨、CCI指标、ATR指标 - 信号计算:通过持仓的判断以及结合CCI指标、布林带通道、ATR指标在通道突破点挂出停止单委托(buy/sell),同时设置离场点(short/cover)。 注意:CTA策略具有低胜率和高盈亏比的特定:在难以提升胜率的情况下,研究提高策略盈亏比有利于策略盈利水平的上升。 ``` def on_15min_bar(self, bar: BarData): """""" self.cancel_all() am = self.am am.update_bar(bar) if not am.inited: return self.boll_up, self.boll_down = am.boll(self.boll_window, self.boll_dev) self.cci_value = am.cci(self.cci_window) self.atr_value = am.atr(self.atr_window) if self.pos == 0: self.intra_trade_high = bar.high_price self.intra_trade_low = bar.low_price if self.cci_value > 0: self.buy(self.boll_up, self.fixed_size, True) elif self.cci_value < 0: self.short(self.boll_down, self.fixed_size, True) elif self.pos > 0: self.intra_trade_high = max(self.intra_trade_high, bar.high_price) self.intra_trade_low = bar.low_price self.long_stop = self.intra_trade_high - self.atr_value * self.sl_multiplier self.sell(self.long_stop, abs(self.pos), True) elif self.pos < 0: self.intra_trade_high = bar.high_price self.intra_trade_low = min(self.intra_trade_low, bar.low_price) self.short_stop = self.intra_trade_low + self.atr_value * self.sl_multiplier self.cover(self.short_stop, abs(self.pos), True) self.put_event() ``` ### 委托回报、成交回报、停止单回报 在策略中可以直接pass,其具体逻辑应用交给回测/实盘引擎负责。 ``` def on_order(self, order: OrderData): """ Callback of new order data update. """ pass def on_trade(self, trade: TradeData): """ Callback of new trade data update. """ self.put_event() def on_stop_order(self, stop_order: StopOrder): """ Callback of stop order update. """ pass ```   ## 回测研究 backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以及回测引擎应用示例: ### 加载策略 把CTA策略逻辑,对应合约品种,以及参数设置(可在策略文件外修改)载入到回测引擎中。 ``` def add_strategy(self, strategy_class: type, setting: dict): """""" self.strategy_class = strategy_class self.strategy = strategy_class( self, strategy_class.__name__, self.vt_symbol, setting ) ```   ### 载入历史数据 负责载入对应品种的历史数据,大概有4个步骤: - 根据数据类型不同,分成K线模式和Tick模式; - 通过select().where()方法,有条件地从数据库中选取数据,其筛选标准包括:vt_symbol、 回测开始日期、回测结束日期、K线周期(K线模式下); - order_by(DbBarData.datetime)表示需要按照时间顺序载入数据; - 载入数据是以迭代方式进行的,数据最终存入self.history_data。 ``` def load_data(self): """""" self.output("开始加载历史数据") if self.mode == BacktestingMode.BAR: s = ( DbBarData.select() .where( (DbBarData.vt_symbol == self.vt_symbol) & (DbBarData.interval == self.interval) & (DbBarData.datetime >= self.start) & (DbBarData.datetime <= self.end) ) .order_by(DbBarData.datetime) ) self.history_data = [db_bar.to_bar() for db_bar in s] else: s = ( DbTickData.select() .where( (DbTickData.vt_symbol == self.vt_symbol) & (DbTickData.datetime >= self.start) & (DbTickData.datetime <= self.end) ) .order_by(DbTickData.datetime) ) self.history_data = [db_tick.to_tick() for db_tick in s] self.output(f"历史数据加载完成,数据量:{len(self.history_data)}") ```   ### 撮合成交 载入CTA策略以及相关历史数据后,策略会根据最新的数据来计算相关指标。若符合条件会生成交易信号,发出具体委托(buy/sell/short/cover),并且在下一根K线成交。 根据委托类型的不同,回测引擎提供2种撮合成交机制来尽量模仿真实交易环节: - 限价单撮合成交:(以买入方向为例)先确定是否发生成交,成交标准为委托价>= 下一根K线的最低价;然后确定成交价格,成交价格为委托价与下一根K线开盘价的最小值。 - 停止单撮合成交:(以买入方向为例)先确定是否发生成交,成交标准为委托价<= 下一根K线的最高价;然后确定成交价格,成交价格为委托价与下一根K线开盘价的最大值。   下面展示在引擎中限价单撮合成交的流程: - 确定会撮合成交的价格; - 遍历限价单字典中的所有限价单,推送委托进入未成交队列的更新状态; - 判断成交状态,若出现成交,推送成交数据和委托数据; - 从字典中删除已成交的限价单。 ``` def cross_limit_order(self): """ Cross limit order with last bar/tick data. """ if self.mode == BacktestingMode.BAR: long_cross_price = self.bar.low_price short_cross_price = self.bar.high_price long_best_price = self.bar.open_price short_best_price = self.bar.open_price else: long_cross_price = self.tick.ask_price_1 short_cross_price = self.tick.bid_price_1 long_best_price = long_cross_price short_best_price = short_cross_price for order in list(self.active_limit_orders.values()): # Push order update with status "not traded" (pending) if order.status == Status.SUBMITTING: order.status = Status.NOTTRADED self.strategy.on_order(order) # Check whether limit orders can be filled. long_cross = ( order.direction == Direction.LONG and order.price >= long_cross_price and long_cross_price > 0 ) short_cross = ( order.direction == Direction.SHORT and order.price <= short_cross_price and short_cross_price > 0 ) if not long_cross and not short_cross: continue # Push order udpate with status "all traded" (filled). order.traded = order.volume order.status = Status.ALLTRADED self.strategy.on_order(order) self.active_limit_orders.pop(order.vt_orderid) # Push trade update self.trade_count += 1 if long_cross: trade_price = min(order.price, long_best_price) pos_change = order.volume else: trade_price = max(order.price, short_best_price) pos_change = -order.volume trade = TradeData( symbol=order.symbol, exchange=order.exchange, orderid=order.orderid, tradeid=str(self.trade_count), direction=order.direction, offset=order.offset, price=trade_price, volume=order.volume, time=self.datetime.strftime("%H:%M:%S"), gateway_name=self.gateway_name, ) trade.datetime = self.datetime self.strategy.pos += pos_change self.strategy.on_trade(trade) self.trades[trade.vt_tradeid] = trade ```   ### 计算策略盈亏情况 基于收盘价、当日持仓量、合约规模、滑点、手续费率等计算总盈亏与净盈亏,并且其计算结果以DataFrame格式输出,完成基于逐日盯市盈亏统计。 下面展示盈亏情况的计算过程 - 浮动盈亏 = 持仓量 \*(当日收盘价 - 昨日收盘价)\* 合约规模 - 实际盈亏 = 持仓变化量 \* (当时收盘价 - 开仓成交价)\* 合约规模 - 总盈亏 = 浮动盈亏 + 实际盈亏 - 净盈亏 = 总盈亏 - 总手续费 - 总滑点 ``` def calculate_pnl( self, pre_close: float, start_pos: float, size: int, rate: float, slippage: float, ): """""" self.pre_close = pre_close # Holding pnl is the pnl from holding position at day start self.start_pos = start_pos self.end_pos = start_pos self.holding_pnl = self.start_pos * \ (self.close_price - self.pre_close) * size # Trading pnl is the pnl from new trade during the day self.trade_count = len(self.trades) for trade in self.trades: if trade.direction == Direction.LONG: pos_change = trade.volume else: pos_change = -trade.volume turnover = trade.price * trade.volume * size self.trading_pnl += pos_change * \ (self.close_price - trade.price) * size self.end_pos += pos_change self.turnover += turnover self.commission += turnover * rate self.slippage += trade.volume * size * slippage # Net pnl takes account of commission and slippage cost self.total_pnl = self.trading_pnl + self.holding_pnl self.net_pnl = self.total_pnl - self.commission - self.slippage ```   ### 计算策略统计指标 calculate_statistics函数是基于逐日盯市盈亏情况(DateFrame格式)来计算衍生指标,如最大回撤、年化收益、盈亏比、夏普比率等。 ``` df["balance"] = df["net_pnl"].cumsum() + self.capital df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0) df["highlevel"] = ( df["balance"].rolling( min_periods=1, window=len(df), center=False).max() ) df["drawdown"] = df["balance"] - df["highlevel"] df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100 # Calculate statistics value start_date = df.index[0] end_date = df.index[-1] total_days = len(df) profit_days = len(df[df["net_pnl"] > 0]) loss_days = len(df[df["net_pnl"] < 0]) end_balance = df["balance"].iloc[-1] max_drawdown = df["drawdown"].min() max_ddpercent = df["ddpercent"].min() total_net_pnl = df["net_pnl"].sum() daily_net_pnl = total_net_pnl / total_days total_commission = df["commission"].sum() daily_commission = total_commission / total_days total_slippage = df["slippage"].sum() daily_slippage = total_slippage / total_days total_turnover = df["turnover"].sum() daily_turnover = total_turnover / total_days total_trade_count = df["trade_count"].sum() daily_trade_count = total_trade_count / total_days total_return = (end_balance / self.capital - 1) * 100 annual_return = total_return / total_days * 240 daily_return = df["return"].mean() * 100 return_std = df["return"].std() * 100 if return_std: sharpe_ratio = daily_return / return_std * np.sqrt(240) else: sharpe_ratio = 0 ```   ### 统计指标绘图 通过matplotlib绘制4幅图: - 资金曲线图 - 资金回撤图 - 每日盈亏图 - 每日盈亏分布图 ``` def show_chart(self, df: DataFrame = None): """""" if not df: df = self.daily_df if df is None: return plt.figure(figsize=(10, 16)) balance_plot = plt.subplot(4, 1, 1) balance_plot.set_title("Balance") df["balance"].plot(legend=True) drawdown_plot = plt.subplot(4, 1, 2) drawdown_plot.set_title("Drawdown") drawdown_plot.fill_between(range(len(df)), df["drawdown"].values) pnl_plot = plt.subplot(4, 1, 3) pnl_plot.set_title("Daily Pnl") df["net_pnl"].plot(kind="bar", legend=False, grid=False, xticks=[]) distribution_plot = plt.subplot(4, 1, 4) distribution_plot.set_title("Daily Pnl Distribution") df["net_pnl"].hist(bins=50) plt.show() ```   ### 单策略回测示例 - 导入回测引擎和CTA策略 - 设置回测相关参数,如:品种、K线周期、回测开始和结束日期、手续费、滑点、合约规模、起始资金 - 载入策略和数据到引擎中,运行回测。 - 计算基于逐日统计盈利情况,计算统计指标,统计指标绘图。 ``` from vnpy.app.cta_strategy.backtesting import BacktestingEngine from vnpy.app.cta_strategy.strategies.boll_channel_strategy import ( BollChannelStrategy, ) from datetime import datetime engine = BacktestingEngine() engine.set_parameters( vt_symbol="IF88.CFFEX", interval="1m", start=datetime(2018, 1, 1), end=datetime(2019, 1, 1), rate=3.0/10000, slippage=0.2, size=300, pricetick=0.2, capital=1_000_000, ) engine.add_strategy(AtrRsiStrategy, {}) engine.load_data() engine.run_backtesting() df = engine.calculate_result() engine.calculate_statistics() engine.show_chart() ```   ### 投资组合回测示例 投资组合回测是基于单策略回测的,其关键是每个策略都对应着各自的BacktestingEngine对象,下面介绍具体流程: - 创建回测函数run_backtesting(),这样每添加一个策略就创建其BacktestingEngine对象。 ``` from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting from vnpy.app.cta_strategy.strategies.atr_rsi_strategy import AtrRsiStrategy from vnpy.app.cta_strategy.strategies.boll_channel_strategy import BollChannelStrategy from datetime import datetime def run_backtesting(strategy_class, setting, vt_symbol, interval, start, end, rate, slippage, size, pricetick, capital): engine = BacktestingEngine() engine.set_parameters( vt_symbol=vt_symbol, interval=interval, start=start, end=end, rate=rate, slippage=slippage, size=size, pricetick=pricetick, capital=capital ) engine.add_strategy(strategy_class, setting) engine.load_data() engine.run_backtesting() df = engine.calculate_result() return df ```   - 分别进行单策略回测,得到各自的DataFrame,(该DataFrame包含交易时间、今仓、昨仓、手续费、滑点、当日净盈亏、累计净盈亏等基本信息,但是不包括最大回撤,夏普比率等统计信息),然后把DataFrame相加并且去除空值后即得到投资组合的DataFrame。 ``` df1 = run_backtesting( strategy_class=AtrRsiStrategy, setting={}, vt_symbol="IF88.CFFEX", interval="1m", start=datetime(2019, 1, 1), end=datetime(2019, 4, 30), rate=0.3/10000, slippage=0.2, size=300, pricetick=0.2, capital=1_000_000, ) df2 = run_backtesting( strategy_class=BollChannelStrategy, setting={'fixed_size': 16}, vt_symbol="RB88.SHFE", interval="1m", start=datetime(2019, 1, 1), end=datetime(2019, 4, 30), rate=1/10000, slippage=1, size=10, pricetick=1, capital=1_000_000, ) dfp = df1 + df2 dfp =dfp.dropna() ```   - 创建show_portafolio()函数,同样也是创建新的BacktestingEngine对象,对传入的DataFrame计算如夏普比率等统计指标,并且画图。故该函数不仅能显示单策略回测效果,也能展示投资组合回测效果。 ``` def show_portafolio(df): engine = BacktestingEngine() engine.calculate_statistics(df) engine.show_chart(df) show_portafolio(dfp) ```   ## 参数优化 参数优化模块主要由3部分构成: ### 参数设置 - 设置参数优化区间:如boll_window设置起始值为18,终止值为24,步进为2,这样就得到了[18, 20, 22, 24] 这4个待优化的参数了。 - 设置优化目标字段:如夏普比率、盈亏比、总收益率等。 - 随机生成参数对组合:使用迭代工具产生参数对组合,然后把参数对组合打包到一个个字典组成的列表中 ``` class OptimizationSetting: """ Setting for runnning optimization. """ def __init__(self): """""" self.params = {} self.target_name = "" def add_parameter( self, name: str, start: float, end: float = None, step: float = None ): """""" if not end and not step: self.params[name] = [start] return if start >= end: print("参数优化起始点必须小于终止点") return if step <= 0: print("参数优化步进必须大于0") return value = start value_list = [] while value <= end: value_list.append(value) value += step self.params[name] = value_list def set_target(self, target_name: str): """""" self.target_name = target_name def generate_setting(self): """""" keys = self.params.keys() values = self.params.values() products = list(product(*values)) settings = [] for p in products: setting = dict(zip(keys, p)) settings.append(setting) return settings ```   ### 参数对组合回测 多进程优化时,每个进程都会运行optimize函数,输出参数对组合以及目标优化字段的结果。其步骤如下: - 调用回测引擎 - 输入回测相关设置 - 输入参数对组合到策略中 - 运行回测 - 返回回测结果,包括:参数对组合、目标优化字段数值、策略统计指标 ``` def optimize( target_name: str, strategy_class: CtaTemplate, setting: dict, vt_symbol: str, interval: Interval, start: datetime, rate: float, slippage: float, size: float, pricetick: float, capital: int, end: datetime, mode: BacktestingMode, ): """ Function for running in multiprocessing.pool """ engine = BacktestingEngine() engine.set_parameters( vt_symbol=vt_symbol, interval=interval, start=start, rate=rate, slippage=slippage, size=size, pricetick=pricetick, capital=capital, end=end, mode=mode ) engine.add_strategy(strategy_class, setting) engine.load_data() engine.run_backtesting() engine.calculate_result() statistics = engine.calculate_statistics() target_value = statistics[target_name] return (str(setting), target_value, statistics) ```   ### 多进程优化 - 根据CPU的核数来创建进程:若CPU为4核,则创建4个进程 - 在每个进程都调用apply_async( )的方法运行参数对组合回测,其回测结果添加到results中 (apply_async是异步非阻塞的,即不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。) - pool.close()与pool.join()用于进程跑完任务后,去关闭进程。 - 对results的内容通过目标优化字段标准进行排序,输出结果。 ``` pool = multiprocessing.Pool(multiprocessing.cpu_count()) results = [] for setting in settings: result = (pool.apply_async(optimize, ( target_name, self.strategy_class, setting, self.vt_symbol, self.interval, self.start, self.rate, self.slippage, self.size, self.pricetick, self.capital, self.end, self.mode ))) results.append(result) pool.close() pool.join() # Sort results and output result_values = [result.get() for result in results] result_values.sort(reverse=True, key=lambda result: result[1]) for value in result_values: msg = f"参数:{value[0]}, 目标:{value[1]}" self.output(msg) return result_values ```   ## 实盘运行 在实盘环境,用户可以基于编写好的CTA策略来创建新的实例,一键初始化、启动、停止策略。 ### 创建策略实例 用户可以基于编写好的CTA策略来创建新的实例,策略实例的好处在于同一个策略可以同时去运行多个品种合约,并且每个实例的参数可以是不同的。 在创建实例的时候需要填写如图的实例名称、合约品种、参数设置。注意:实例名称不能重名;合约名称是vt_symbol的格式,如IF1905.CFFEX。 ![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/add_strategy.png) 创建策略流程如下: - 检查策略实例重名 - 添加策略配置信息(strategy_name, vt_symbol, setting)到strategies字典上 - 添加该策略要订阅行情的合约信息到symbol_strategy_map字典中; - 把策略配置信息保存到json文件内; - 在图形化界面更新状态信息。 ``` def add_strategy( self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict ): """ Add a new strategy. """ if strategy_name in self.strategies: self.write_log(f"创建策略失败,存在重名{strategy_name}") return strategy_class = self.classes[class_name] strategy = strategy_class(self, strategy_name, vt_symbol, setting) self.strategies[strategy_name] = strategy # Add vt_symbol to strategy map. strategies = self.symbol_strategy_map[vt_symbol] strategies.append(strategy) # Update to setting file. self.update_strategy_setting(strategy_name, setting) self.put_strategy_event(strategy) ```   ### 初始化策略 - 调用策略类的on_init()回调函数,并且载入历史数据; - 恢复上次退出之前的策略状态; - 从.vntrader/cta_strategy_data.json内读取策略参数,最新的技术指标,以及持仓数量; - 调用接口的subcribe()函数订阅指定行情信息; - 策略初始化状态变成True,并且更新到日志上。 ``` def _init_strategy(self): """ Init strategies in queue. """ while not self.init_queue.empty(): strategy_name = self.init_queue.get() strategy = self.strategies[strategy_name] if strategy.inited: self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作") continue self.write_log(f"{strategy_name}开始执行初始化") # Call on_init function of strategy self.call_strategy_func(strategy, strategy.on_init) # Restore strategy data(variables) data = self.strategy_data.get(strategy_name, None) if data: for name in strategy.variables: value = data.get(name, None) if value: setattr(strategy, name, value) # Subscribe market data contract = self.main_engine.get_contract(strategy.vt_symbol) if contract: req = SubscribeRequest( symbol=contract.symbol, exchange=contract.exchange) self.main_engine.subscribe(req, contract.gateway_name) else: self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy) # Put event to update init completed status. strategy.inited = True self.put_strategy_event(strategy) self.write_log(f"{strategy_name}初始化完成") self.init_thread = None ```   ### 启动策略 - 检查策略初始化状态; - 检查策略启动状态,避免重复启动; - 调用策略类的on_start()函数启动策略; - 策略启动状态变成True,并且更新到图形化界面上。 ``` def start_strategy(self, strategy_name: str): """ Start a strategy. """ strategy = self.strategies[strategy_name] if not strategy.inited: self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化") return if strategy.trading: self.write_log(f"{strategy_name}已经启动,请勿重复操作") return self.call_strategy_func(strategy, strategy.on_start) strategy.trading = True self.put_strategy_event(strategy) ```   ### 停止策略 - 检查策略启动状态; - 调用策略类的on_stop()函数停止策略; - 更新策略启动状态为False; - 对所有为成交的委托(市价单/限价单/本地停止单)进行撤单操作; - 把策略参数,最新的技术指标,以及持仓数量保存到.vntrader/cta_strategy_data.json内; - 在图形化界面更新策略状态。 ``` def stop_strategy(self, strategy_name: str): """ Stop a strategy. """ strategy = self.strategies[strategy_name] if not strategy.trading: return # Call on_stop function of the strategy self.call_strategy_func(strategy, strategy.on_stop) # Change trading status of strategy to False strategy.trading = False # Cancel all orders of the strategy self.cancel_all(strategy) # Sync strategy variables to data file self.sync_strategy_data(strategy) # Update GUI self.put_strategy_event(strategy) ```   ### 编辑策略 - 重新配置策略参数字典setting; - 更新参数字典到策略中; - 在图像化界面更新策略状态。 ``` def edit_strategy(self, strategy_name: str, setting: dict): """ Edit parameters of a strategy. """ strategy = self.strategies[strategy_name] strategy.update_setting(setting) self.update_strategy_setting(strategy_name, setting) self.put_strategy_event(strategy) ```   ### 移除策略 - 检查策略状态,只有停止策略后从可以移除策略; - 从json文件移除策略配置信息(strategy_name, vt_symbol, setting); - 从symbol_strategy_map字典中移除该策略订阅的合约信息; - 从strategy_orderid_map字典移除活动委托记录; - 从strategies字典移除该策略的相关配置信息。 ``` def remove_strategy(self, strategy_name: str): """ Remove a strategy. """ strategy = self.strategies[strategy_name] if strategy.trading: self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止") return # Remove setting self.remove_strategy_setting(strategy_name) # Remove from symbol strategy map strategies = self.symbol_strategy_map[strategy.vt_symbol] strategies.remove(strategy) # Remove from active orderid map if strategy_name in self.strategy_orderid_map: vt_orderids = self.strategy_orderid_map.pop(strategy_name) # Remove vt_orderid strategy map for vt_orderid in vt_orderids: if vt_orderid in self.orderid_strategy_map: self.orderid_strategy_map.pop(vt_orderid) # Remove from strategies self.strategies.pop(strategy_name) return True ```   ### 锁仓操作 用户在编写策略时,可以通过填写lock字段来让策略完成锁仓操作,即禁止平今,通过反向开仓来代替。 - 在cta策略模板template中,可以看到如下具体委托函数都有lock字段,并且默认为False。 ``` def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False): """ Send buy order to open a long position. """ return self.send_order(Direction.LONG, Offset.OPEN, price, volume, stop, lock) def sell(self, price: float, volume: float, stop: bool = False, lock: bool = False): """ Send sell order to close a long position. """ return self.send_order(Direction.SHORT, Offset.CLOSE, price, volume, stop, lock) def short(self, price: float, volume: float, stop: bool = False, lock: bool = False): """ Send short order to open as short position. """ return self.send_order(Direction.SHORT, Offset.OPEN, price, volume, stop, lock) def cover(self, price: float, volume: float, stop: bool = False, lock: bool = False): """ Send cover order to close a short position. """ return self.send_order(Direction.LONG, Offset.CLOSE, price, volume, stop, lock) def send_order( self, direction: Direction, offset: Offset, price: float, volume: float, stop: bool = False, lock: bool = False ): """ Send a new order. """ if self.trading: vt_orderids = self.cta_engine.send_order( self, direction, offset, price, volume, stop, lock ) return vt_orderids else: return [] ```   - 设置lock=True后,cta实盘引擎send_order()函数发生响应,并且调用其最根本的委托函数send_server_order()去处理锁仓委托转换。首先是创建原始委托original_req,然后调用converter文件里面OffsetConverter类的convert_order_request来进行相关转换。 ``` def send_order( self, strategy: CtaTemplate, direction: Direction, offset: Offset, price: float, volume: float, stop: bool, lock: bool ): """ """ contract = self.main_engine.get_contract(strategy.vt_symbol) if not contract: self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy) return "" if stop: if contract.stop_supported: return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock) else: return self.send_local_stop_order(strategy, direction, offset, price, volume, lock) else: return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock) def send_limit_order( self, strategy: CtaTemplate, contract: ContractData, direction: Direction, offset: Offset, price: float, volume: float, lock: bool ): """ Send a limit order to server. """ return self.send_server_order( strategy, contract, direction, offset, price, volume, OrderType.LIMIT, lock ) def send_server_order( self, strategy: CtaTemplate, contract: ContractData, direction: Direction, offset: Offset, price: float, volume: float, type: OrderType, lock: bool ): """ Send a new order to server. """ # Create request and send order. original_req = OrderRequest( symbol=contract.symbol, exchange=contract.exchange, direction=direction, offset=offset, type=type, price=price, volume=volume, ) # Convert with offset converter req_list = self.offset_converter.convert_order_request(original_req, lock) # Send Orders vt_orderids = [] for req in req_list: vt_orderid = self.main_engine.send_order( req, contract.gateway_name) vt_orderids.append(vt_orderid) self.offset_converter.update_order_request(req, vt_orderid) # Save relationship between orderid and strategy. self.orderid_strategy_map[vt_orderid] = strategy self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid) return vt_orderids ```   - 在convert_order_request_lock()函数中,先计算今仓的量和昨可用量;然后进行判断:若有今仓,只能开仓(锁仓);无今仓时候,若平仓量小于等于昨可用,全部平昨,反之,先平昨,剩下的反向开仓。 ``` def convert_order_request_lock(self, req: OrderRequest): """""" if req.direction == Direction.LONG: td_volume = self.short_td yd_available = self.short_yd - self.short_yd_frozen else: td_volume = self.long_td yd_available = self.long_yd - self.long_yd_frozen # If there is td_volume, we can only lock position if td_volume: req_open = copy(req) req_open.offset = Offset.OPEN return [req_open] # If no td_volume, we close opposite yd position first # then open new position else: open_volume = max(0, req.volume - yd_available) req_list = [] if yd_available: req_yd = copy(req) if self.exchange == Exchange.SHFE: req_yd.offset = Offset.CLOSEYESTERDAY else: req_yd.offset = Offset.CLOSE req_list.append(req_yd) if open_volume: req_open = copy(req) req_open.offset = Offset.OPEN req_open.volume = open_volume req_list.append(req_open) return req_list ```