From 0edd9783e9eebda0816b401c909187bfac20eb01 Mon Sep 17 00:00:00 2001 From: 1122455801 Date: Mon, 29 Apr 2019 14:33:15 +0800 Subject: [PATCH] Update cta_strategy.md --- docs/cta_strategy.md | 398 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 375 insertions(+), 23 deletions(-) diff --git a/docs/cta_strategy.md b/docs/cta_strategy.md index 30024fee..1ac9d3e6 100644 --- a/docs/cta_strategy.md +++ b/docs/cta_strategy.md @@ -14,21 +14,163 @@ CTA策略模块主要由7部分构成,如下图: - engine:定义了CTA策略实盘引擎,其中包括:RQData客户端初始化和数据载入、策略的初始化和启动、推送Tick订阅行情到策略中、挂撤单操作、策略的停止和移除等。 - ui:基于PyQt5的GUI图形应用。 -![enter image description here](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/seix_elementos.png "enter image title here") +![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/seix_elementos.png "enter image title here")   -## 2. 历史数据 +## 历史数据 + +### 回测历史数据 +回测所需要的历史数据可通过运行getdata.py文件进行下载。该文件处于根目录下tests\backtesting文件夹内。 +下载历史数据的原理是调用RQData的get_price()函数把数据下载到内存里面;再通过generate_bar_from_row()函数,以固定格式把数据从内存载入到硬盘数据库中。 + +下面介绍具体流程: + +- 填写RQData的账号密码,初始化RQData +``` +import rqdatac as rq + + +USERNAME = "" +PASSWORD = "" +FIELDS = ["open", "high", "low", "close", "volume"] + +rq.init(USERNAME, PASSWORD, ("rqdatad-pro.ricequant.com", 16011)) +``` + +  + +- 定义数据插入格式。需要插入的数据包括:合约代码、交易所、K线周期、开盘价、最高价、最低价、收盘价、成交量、数据库名称、vt_symbol(注意:K线周期可以是"1m"、"1h"、"d"、"w"。to_pydatetime()用于时间转换成datetime格式) +``` +def generate_bar_from_row(row, symbol, exchange): + """""" + bar = DbBarData() + + bar.symbol = symbol + bar.exchange = exchange + bar.interval = "1m" + bar.open_price = row["open"] + bar.high_price = row["high"] + bar.low_price = row["low"] + bar.close_price = row["close"] + bar.volume = row["volume"] + bar.datetime = row.name.to_pydatetime() + bar.gateway_name = "DB" + bar.vt_symbol = f"{symbol}.{exchange}" + + return bar +``` + +  + +- 定义数据下载函数。主要调用RQData的get_price()获取指定合约或合约列表的历史数据(包含起止日期,日线或分钟线)。目前仅支持中国市场的股票、期货、ETF和上金所现货的行情数据,如黄金、铂金和白银产品。(注意:起始日期默认是2013-01-04,结束日期默认是2014-01-04) + +``` +def download_minute_bar(vt_symbol): + """下载某一合约的分钟线数据""" + print(f"开始下载合约数据{vt_symbol}") + symbol, exchange = vt_symbol.split(".") + + start = time() + + df = rq.get_price(symbol, start_date="2018-01-01", end_date="2019-01-01", frequency="1m", fields=FIELDS) + + with DB.atomic(): + for ix, row in df.iterrows(): + print(row.name) + bar = generate_bar_from_row(row, symbol, exchange) + DbBarData.replace(bar.__data__).execute() + + end = time() + cost = (end - start) * 1000 + + print( + "合约%s的分钟K线数据下载完成%s - %s,耗时%s毫秒" + % (symbol, df.index[0], df.index[-1], cost) + ) + +```   -## 3. 策略开发 + +### 实盘历史数据 +在实盘中,RQData通过实时载入数据进行策略的初始化。该功能主要在CTA实盘引擎engine.py内实现。 +下面介绍具体流程: +- 配置json文件:在用户目录下.vntrader文件夹找到vt_setting.json,输入RQData的账号和密码,如图。 + +![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/RQData_setting.png "enter image title here") + +- 初始化RQData客户端:从vt_setting.json中读取RQData的账户、密码到rq_client.init()函数进行初始化 + +``` + def init_rqdata(self): + """ + Init RQData client. + """ + username = SETTINGS["rqdata.username"] + password = SETTINGS["rqdata.password"] + if not username or not password: + return + + import rqdatac + + self.rq_client = rqdatac + self.rq_client.init(username, password, + ('rqdatad-pro.ricequant.com', 16011)) +``` + + +- RQData载入实盘数据:输入vt_symbol后,首先会转换成符合RQData格式的rq_symbol,通过get_price()函数下载数据,并且插入到数据库中。 + +``` + def query_bar_from_rq( + self, vt_symbol: str, interval: Interval, start: datetime, end: datetime + ): + """ + Query bar data from RQData. + """ + symbol, exchange_str = vt_symbol.split(".") + rq_symbol = to_rq_symbol(vt_symbol) + if rq_symbol not in self.rq_symbols: + return None + + end += timedelta(1) # For querying night trading period data + + df = self.rq_client.get_price( + rq_symbol, + frequency=interval.value, + fields=["open", "high", "low", "close", "volume"], + start_date=start, + end_date=end + ) + + data = [] + for ix, row in df.iterrows(): + bar = BarData( + symbol=symbol, + exchange=Exchange(exchange_str), + interval=interval, + datetime=row.name.to_pydatetime(), + open_price=row["open"], + high_price=row["high"], + low_price=row["low"], + close_price=row["close"], + volume=row["volume"], + gateway_name="RQ" + ) + data.append(bar) +``` + +  + +## 策略开发 CTA策略模板提供完整的信号生成和委托管理功能,用户可以基于该模板自行开发策略。新策略可以放在根目录下vnpy\app\cta_strategy\strategies文件夹内,也可以放在用户运行的文件内(VN Station模式)。注意:策略文件命名是以下划线模式,如boll_channel_strategy.py;而策略类命名采用的是驼峰式,如BollChannelStrategy。 下面通过BollChannelStrategy策略示例,来展示策略开发的具体步骤: -### 3.1 参数设置 +### 参数设置 定义策略参数并且初始化策略变量。策略参数为策略类的公有属性,用户可以通过创建新的实例来调用或者改变策略参数。 @@ -54,7 +196,7 @@ CTA策略模板提供完整的信号生成和委托管理功能,用户可以 short_stop = 0 ``` -### 3.2 类的初始化 +### 类的初始化 初始化分3步: - 通过super( )的方法继承CTA策略模板,在__init__( )函数传入CTA引擎、策略名称、vt_symbol、参数设置。 - 调用K线生成模块:通过时间切片来把Tick数据合成1分钟K线数据,然后更大的时间周期数据,如15分钟K线。 @@ -71,7 +213,7 @@ CTA策略模板提供完整的信号生成和委托管理功能,用户可以 self.am = ArrayManager() ``` -### 3.3 策略的初始化、启动、停止 +### 策略的初始化、启动、停止 通过“CTA策略”组件的相关功能按钮实现。 注意:函数load_bar(10),代表策略初始化需要载入10个交易日的历史数据。该历史数据可以是Tick数据,也可以是K线数据。 @@ -96,7 +238,7 @@ CTA策略模板提供完整的信号生成和委托管理功能,用户可以 """ self.write_log("策略停止") ``` -### 3.4 Tick数据回报 +### Tick数据回报 策略订阅某品种合约行情,交易所会推送Tick数据到该策略上。 由于BollChannelStrategy是基于15分钟K线来生成交易信号的,故收到Tick数据后,需要用到K线生成模块里面的update_tick函数,通过时间切片的方法,聚合成1分钟K线数据,并且推送到on_bar函数。 @@ -109,7 +251,7 @@ CTA策略模板提供完整的信号生成和委托管理功能,用户可以 self.bg.update_tick(tick) ``` -### 3.5 K线数据回报 +### K线数据回报 收到推送过来的1分钟K线数据后,通过K线生成模块里面的update_bar函数,以分钟切片的方法,合成15分钟K线数据,并且推送到on_15min_bar函数。 ``` @@ -120,7 +262,7 @@ CTA策略模板提供完整的信号生成和委托管理功能,用户可以 self.bg.update_bar(bar) ``` -### 3.6 15分钟K线数据回报 +### 15分钟K线数据回报 负责CTA信号的生成,由3部分组成: - 清空未成交委托:为了防止之前下的单子在上一个15分钟没有成交,但是下一个15分钟可能已经调整了价格,就用cancel_all()方法立刻撤销之前未成交的所有委托,保证策略在当前这15分钟开始时的整个状态是清晰和唯一的。 @@ -169,7 +311,7 @@ CTA策略模板提供完整的信号生成和委托管理功能,用户可以 self.put_event() ``` -### 3.7 委托回报、成交回报、停止单回报 +### 委托回报、成交回报、停止单回报 在策略中可以直接pass,其具体逻辑应用交给回测/实盘引擎负责。 ``` @@ -197,10 +339,10 @@ CTA策略模板提供完整的信号生成和委托管理功能,用户可以   -## 4. 回测研究 +## 回测研究 backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以及回测引擎应用示例: -### 4.1 加载策略 +### 加载策略 把CTA策略逻辑,对应合约品种,以及参数设置(可在策略文件外修改)载入到回测引擎中。 ``` @@ -213,7 +355,7 @@ backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以 ```   -### 4.2 载入历史数据 +### 载入历史数据 负责载入对应品种的历史数据,大概有4个步骤: - 根据数据类型不同,分成K线模式和Tick模式; @@ -254,7 +396,7 @@ backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以 ```   -### 4.3 撮合成交 +### 撮合成交 载入CTA策略以及相关历史数据后,策略会根据最新的数据来计算相关指标。若符合条件会生成交易信号,发出具体委托(buy/sell/short/cover),并且在下一根K线成交。 @@ -349,7 +491,7 @@ backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以   -### 4.4 计算策略盈亏情况 +### 计算策略盈亏情况 基于收盘价、当日持仓量、合约规模、滑点、手续费率等计算总盈亏与净盈亏,并且其计算结果以DataFrame格式输出,完成基于逐日盯市盈亏统计。 @@ -404,7 +546,7 @@ backtesting.py定义了回测引擎,下面主要介绍相关功能函数,以 -### 4.5 计算策略统计指标 +### 计算策略统计指标 calculate_statistics函数是基于逐日盯市盈亏情况(DateFrame格式)来计算衍生指标,如最大回撤、年化收益、盈亏比、夏普比率等。 ``` @@ -456,7 +598,7 @@ calculate_statistics函数是基于逐日盯市盈亏情况(DateFrame格式) ```   -### 4.6 统计指标绘图 +### 统计指标绘图 通过matplotlib绘制4幅图: - 资金曲线图 - 资金回撤图 @@ -495,7 +637,7 @@ calculate_statistics函数是基于逐日盯市盈亏情况(DateFrame格式)   -### 4.7 回测引擎使用示例 +### 回测引擎使用示例 - 导入回测引擎和CTA策略 - 设置回测相关参数,如:品种、K线周期、回测开始和结束日期、手续费、滑点、合约规模、起始资金 @@ -533,10 +675,10 @@ engine.show_chart()   -## 5. 参数优化 +## 参数优化 参数优化模块主要由3部分构成: -### 5.1 参数设置 +### 参数设置 - 设置参数优化区间:如boll_window设置起始值为18,终止值为24,步进为2,这样就得到了[18, 20, 22, 24] 这4个待优化的参数了。 - 设置优化目标字段:如夏普比率、盈亏比、总收益率等。 @@ -598,7 +740,7 @@ class OptimizationSetting:   -### 5.2 参数对组合回测 +### 参数对组合回测 多进程优化时,每个进程都会运行optimize函数,输出参数对组合以及目标优化字段的结果。其步骤如下: - 调用回测引擎 @@ -652,7 +794,7 @@ def optimize(   -### 5.3 多进程优化 +### 多进程优化 - 根据CPU的核数来创建进程:若CPU为4核,则创建4个进程 - 在每个进程都调用apply_async( )的方法运行参数对组合回测,其回测结果添加到results中 (apply_async是异步非阻塞的,即不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。) @@ -697,5 +839,215 @@ def optimize(   -## 6. 实盘运行 +## 实盘运行 +在实盘环境,用户可以基于编写好的CTA策略来创建新的实例,一键初始化、启动、停止策略。 + +### 创建策略实例 +用户可以基于编写好的CTA策略来创建新的实例,策略实例的好处在于同一个策略可以同时去运行多个品种合约,并且每个实例的参数可以是不同的。 +在创建实例的时候需要填写如图的实例名称、合约品种、参数设置。注意:实例名称不能重名;合约名称是vt_symbol的格式,如IF1905.CFFEX。 + +![](https://vnpy-community.oss-cn-shanghai.aliyuncs.com/forum_experience/yazhang/cta_strategy/add_strategy.png) + +创建策略流程如下: +- 检查策略实例重名 +- 添加策略配置信息(strategy_name, vt_symbol, setting)到strategies字典上 +- 添加该策略要订阅行情的合约信息到symbol_strategy_map字典中; +- 把策略配置信息保存到json文件内; +- 在图形化界面更新状态信息。 + +``` + def add_strategy( + self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict + ): + """ + Add a new strategy. + """ + if strategy_name in self.strategies: + self.write_log(f"创建策略失败,存在重名{strategy_name}") + return + + strategy_class = self.classes[class_name] + + strategy = strategy_class(self, strategy_name, vt_symbol, setting) + self.strategies[strategy_name] = strategy + + # Add vt_symbol to strategy map. + strategies = self.symbol_strategy_map[vt_symbol] + strategies.append(strategy) + + # Update to setting file. + self.update_strategy_setting(strategy_name, setting) + + self.put_strategy_event(strategy) +``` + +  + +### 初始化策略 +- 调用策略类的on_init()回调函数,并且载入历史数据; +- 恢复上次退出之前的策略状态; +- 调用接口的subcribe()函数订阅指定行情信息; +- 策略初始化状态变成True,并且更新到日志上。 + +``` + def _init_strategy(self): + """ + Init strategies in queue. + """ + while not self.init_queue.empty(): + strategy_name = self.init_queue.get() + strategy = self.strategies[strategy_name] + + if strategy.inited: + self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作") + continue + + self.write_log(f"{strategy_name}开始执行初始化") + + # Call on_init function of strategy + self.call_strategy_func(strategy, strategy.on_init) + + # Restore strategy data(variables) + data = self.strategy_data.get(strategy_name, None) + if data: + for name in strategy.variables: + value = data.get(name, None) + if value: + setattr(strategy, name, value) + + # Subscribe market data + contract = self.main_engine.get_contract(strategy.vt_symbol) + if contract: + req = SubscribeRequest( + symbol=contract.symbol, exchange=contract.exchange) + self.main_engine.subscribe(req, contract.gateway_name) + else: + self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy) + + # Put event to update init completed status. + strategy.inited = True + self.put_strategy_event(strategy) + self.write_log(f"{strategy_name}初始化完成") + + self.init_thread = None +``` + +  + +### 启动策略 +- 检查策略初始化状态; +- 检查策略启动状态,避免重复启动; +- 调用策略类的on_start()函数启动策略; +- 策略启动状态变成True,并且更新到图形化界面上。 + +``` + def start_strategy(self, strategy_name: str): + """ + Start a strategy. + """ + strategy = self.strategies[strategy_name] + if not strategy.inited: + self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化") + return + + if strategy.trading: + self.write_log(f"{strategy_name}已经启动,请勿重复操作") + return + + self.call_strategy_func(strategy, strategy.on_start) + strategy.trading = True + + self.put_strategy_event(strategy) +``` + +  + +### 停止策略 +- 检查策略启动状态; +- 调用策略类的on_stop()函数停止策略; +- 更新策略启动状态为False; +- 对所有为成交的委托(市价单/限价单/本地停止单)进行撤单操作; +- 在图形化界面更新策略状态。 + +``` + def stop_strategy(self, strategy_name: str): + """ + Stop a strategy. + """ + strategy = self.strategies[strategy_name] + if not strategy.trading: + return + + # Call on_stop function of the strategy + self.call_strategy_func(strategy, strategy.on_stop) + + # Change trading status of strategy to False + strategy.trading = False + + # Cancel all orders of the strategy + self.cancel_all(strategy) + + # Update GUI + self.put_strategy_event(strategy) +``` + +  + +### 编辑策略 +- 重新配置策略参数字典setting; +- 更新参数字典到策略中; +- 在图像化界面更新策略状态。 + +``` + def edit_strategy(self, strategy_name: str, setting: dict): + """ + Edit parameters of a strategy. + """ + strategy = self.strategies[strategy_name] + strategy.update_setting(setting) + + self.update_strategy_setting(strategy_name, setting) + self.put_strategy_event(strategy) +``` + +  + +### 移除策略 +- 检查策略状态,只有停止策略后从可以移除策略; +- 从json文件移除策略配置信息(strategy_name, vt_symbol, setting); +- 从symbol_strategy_map字典中移除该策略订阅的合约信息; +- 从strategy_orderid_map字典移除活动委托记录; +- 从strategies字典移除该策略的相关配置信息。 + +``` + def remove_strategy(self, strategy_name: str): + """ + Remove a strategy. + """ + strategy = self.strategies[strategy_name] + if strategy.trading: + self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止") + return + + # Remove setting + self.remove_strategy_setting(strategy_name) + + # Remove from symbol strategy map + strategies = self.symbol_strategy_map[strategy.vt_symbol] + strategies.remove(strategy) + + # Remove from active orderid map + if strategy_name in self.strategy_orderid_map: + vt_orderids = self.strategy_orderid_map.pop(strategy_name) + + # Remove vt_orderid strategy map + for vt_orderid in vt_orderids: + if vt_orderid in self.orderid_strategy_map: + self.orderid_strategy_map.pop(vt_orderid) + + # Remove from strategies + self.strategies.pop(strategy_name) + + return True +```