# encoding: UTF-8 """ 批量测试相关方法 # 华富资产 李来佳 """ import sys, os, platform, gc, copy,multiprocessing from datetime import datetime from time import sleep vnpy_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) sys.path.append(vnpy_root) import numpy as np import pandas as pd import talib as ta # 科学计算库 import statsmodels.api as sm # 统计库 import matplotlib import matplotlib.pyplot as plt import math # 数学计算相关 matplotlib.rcParams['figure.figsize'] = (20.0, 10.0) import traceback from vnpy.trader.setup_logger import * from vnpy.trader.app.ctaStrategy.ctaBacktesting import BacktestingEngine, OptimizationSetting, MINUTE_DB_NAME # 合并回测结果 def combine_results(results_list): result_df = None # 判断结果集是否有数据 if len(results_list) < 1: print('no records') return None effected_results = 0 for dict in results_list: # 测试项目 test_item = dict['test_item'] # 测试csv文件 file_name = dict['result_file'] if not os.path.isfile(file_name): continue effected_results += 1 # 读取测试文件 df = pd.read_csv(file_name) # 修正索引为时间日期索引 df = df.set_index(pd.DatetimeIndex(df['date'])) if result_df is None: # 首个测试结果,将净值字段设置为周期 result_df = df['rate'].to_frame(name=test_item) # 汇总净值 result_df['rate'] = result_df[test_item] else: # 增加新的测试结果数据 result_df[test_item] = df['rate'] # 汇总净值 result_df['rate'] = result_df['rate'] + result_df[test_item] # 释放内存 l = [df] del df del l if effected_results > 0: # 净值平均 result_df['avg_rate'] = result_df['rate'] / effected_results # 组合净值累加(仍然按照1个策略的总资金,累加各策略的收益) result_df['group_rate'] = result_df['rate'] - effected_results + 1 # 删除累加的rate result_df.drop('rate', axis=1, inplace=True) return result_df # 计算最大回撤,单日最大回撤 def calculate_result(result_df, rate_column): max_rate = 0 max_loss = 0 max_rate_date = None max_loss_date = None max_loss_info = '-' for idx in result_df.index: # 当前日净值 cur_rate = result_df[rate_column].loc[idx] if cur_rate > max_rate: max_rate = cur_rate max_rate_date = idx.strftime('%Y-%m-%d') cur_loss = max_rate - cur_rate if cur_loss > max_loss: max_loss = cur_loss max_loss_date = idx.strftime('%Y-%m-%d') max_loss_percent = max_loss / max_rate max_loss_info = u'{} from {} to {},rate {}=>{},max loss rate {}'.format(rate_column, max_rate_date, max_loss_date, max_rate, cur_rate, max_loss_percent) return max_loss_info def single_strategy_test(test_settings): """ 根据设置参数,执行回测品种, :param strategyClass: 策略类 :param test_settings: 策略参数设置 test_settings['bar_file']: 回测的bar csv文件路径 test_settings['bar_interval']: 回测的Bar csv周期 test_settings['report_file']: 净值输出报告的保存路径 :return: """ from vnpy.trader.vtEvent import EventEngine2 eventEngine = EventEngine2() eventEngine.start() # 创建回测引擎 engine = BacktestingEngine(eventEngine=eventEngine) # 设置回测的策略类 engine.setStrategyName(test_settings['name']) # 创建日志 if 'debug' in test_settings: engine.createLogger(debug=test_settings['debug']) else: engine.createLogger() # 设置引擎的回测模式 if test_settings['mode'] == 'tick': engine.setBacktestingMode(engine.TICK_MODE) else: engine.setBacktestingMode(engine.BAR_MODE) strategy_settings = copy.copy(test_settings) # 设置回测的策略类 if 'filenamePrefix' in strategy_settings: engine.setStrategyName(strategy_settings["filenamePrefix"]) else: engine.setStrategyName(test_settings['name']) if 'is_7x24' in test_settings: engine.is_7x24 = test_settings['is_7x24'] # del strategy_settings['size'] #del strategy_settings['margin_rate'] del strategy_settings['initCapital'] if 'report_file' in strategy_settings: del strategy_settings['report_file'] # 设置回测用的数据起始日期 if 'start_date' in strategy_settings: engine.setStartDate(test_settings['start_date'], initDays = strategy_settings.get('initDays', 10)) del strategy_settings['start_date'] else: engine.setStartDate('20110101',initDays = strategy_settings.get('initDays',10)) # 设置回测用的数据结束日期 if 'end_date' in test_settings: engine.setEndDate(test_settings['end_date']) del strategy_settings['end_date'] else: engine.setEndDate('20171201') # engine.connectMysql() engine.setDatabase(dbName=MINUTE_DB_NAME, symbol=test_settings['vtSymbol']) # 设置产品相关参数 if 'slippage' in test_settings and test_settings['slippage'] > 0: engine.setSlippage(test_settings['slippage']) else: engine.setSlippage(0) engine.setRate(test_settings['rate'] if 'rate' in test_settings else float(0.0001)) # 万1 engine.setSize(test_settings['size']) # 合约大小 engine.setMinDiff(test_settings['minDiff']) # 合约价格最小跳动 engine.setMarginRate(test_settings['margin_rate']) # 合约保证金率 if 'fixCommission' in test_settings: engine.fixCommission = float(test_settings['fixCommission']) # 固定交易费用(每次开平仓收费) # 删除本地json文件 data_path = os.path.abspath(os.path.join(os.getcwd(),'data')) if not os.path.exists(data_path): os.mkdir(data_path) logs_path = os.path.abspath(os.path.join(os.getcwd(), 'logs')) if not os.path.exists(logs_path): os.mkdir(logs_path) up_grid_json_file = os.path.abspath(os.path.join(data_path,'{0}_upGrids.json'.format(test_settings['name']))) dn_grid_json_file = os.path.abspath(os.path.join(data_path,'{0}_dnGrids.json'.format(test_settings['name']))) grid_json_file = os.path.abspath(os.path.join(data_path,'{0}_Grids.json'.format(test_settings['name']))) policy_json_file = os.path.abspath(os.path.join(data_path, '{0}_Policy.json'.format(test_settings['name']))) if os.path.isfile(up_grid_json_file): print(u'{0} exist,remove it'.format(up_grid_json_file)) try: os.remove(up_grid_json_file) except Exception as ex: print(u'{0}:{1}'.format(Exception, ex)) return False if os.path.isfile(dn_grid_json_file): print(u'{0}exist,remove it'.format(dn_grid_json_file)) try: os.remove(dn_grid_json_file) except Exception as ex: print(u'{0}:{1}'.format(Exception, ex)) return False if os.path.isfile(grid_json_file): print(u'{0}exist,remove it'.format(grid_json_file)) try: os.remove(grid_json_file) except Exception as ex: print(u'{0}:{1}'.format(Exception, ex)) return False if os.path.isfile(policy_json_file): print(u'{0}exist,remove it'.format(policy_json_file)) try: os.remove(policy_json_file) except Exception as ex: print(u'{0}:{1}'.format(Exception, ex)) return False # 在引擎中创建策略对象 print(u'run {} using:{}'.format(test_settings['strategy'],strategy_settings)) engine.initStrategy(test_settings['strategy'], setting=strategy_settings) # 设置每日净值的报告文档存储路径 daily_report_file = 'DailyList.csv' if 'report_file' not in test_settings else test_settings['report_file'] engine.setDailyReportName(daily_report_file) # 使用简单复利模式计算 engine.usageCompounding = False # True时,只针对FINAL_MODE有效 # 启用实时计算净值模式REALTIME_MODE / FINAL_MODE 回测结束时统一计算模式 engine.calculateMode = engine.REALTIME_MODE engine.capital = test_settings['initCapital'] # 设置期初资金 engine.initCapital = test_settings['initCapital'] # 设置期初资金 engine.avaliable = test_settings['initCapital'] # 设置期初资金 engine.netCapital = test_settings['initCapital'] engine.maxCapital = test_settings['initCapital'] # 设置期初资金 engine.maxNetCapital = test_settings['initCapital'] # 设置期初资金 engine.percentLimit = test_settings['percentLimit'] # 设置资金使用上限比例(%) engine.barTimeInterval = 60 * test_settings['bar_interval'] # 回测文件中,bar的周期秒数,用于csv文件自动减时间 try: # 前置动作(无参数) pre_functions = test_settings.get('pre_functions',[]) for fun_name in pre_functions: try: if not isinstance(fun_name,str): continue if hasattr(engine.strategy,fun_name): fun = getattr(engine.strategy,fun_name) if fun is not None: fun() except Exception as ex: print(u'调用前置动作异常:{},{}'.format(str(ex),traceback.format_exc()),file=sys.stderr) # 开始跑回测 if 'bar_file' in test_settings: engine.runBackTestingWithBarFile(test_settings['bar_file']) else: engine.runBackTestingWithDataSource() print('{}finished loop bars'.format(test_settings['name'])) # 显示回测结果 engine.showBacktestingResult() # 保存策略得内部数据 engine.saveStrategyData() print('{} finished'.format(test_settings['name'])) return True except Exception as ex: print(u'single_strategy_test exception:{}'.format(str(ex))) traceback.print_exc() return False def multi_period_test(gid, group_setting): """ 多周期回测品种组合 1、对group_settings进行分解,分解出各个运行周期的参数设置 2、逐一周期运行测试 3、添加回测结果 :param gid: 测试组名 :param group_setting:dict,包含参数,多周期清单 如果多周期,则对每一周期执行回测,并汇总结果。 :return 回测的每日净值统计文件 """ # 回测的分钟周期 minutes_interval_list = group_setting['minute_list'] # 回测分钟队列(3,5,10等) strategyClass = group_setting['strategy'] # 测试批次时间 test_dt = datetime.now().strftime('%Y%m%d_%H%M') # 回测结果队列,对应测试分钟队列 daily_results = [] return_results = [] # 启动多进程 pool = multiprocessing.Pool(multiprocessing.cpu_count()) l = [] # 逐一分钟级别进行回测 for m_i in minutes_interval_list: settings = copy.copy(group_setting) del settings['minute_list'] settings['name'] = '{}_{}_{}_M{}'.format(gid, strategyClass.className, group_setting['symbol'], m_i) # 资金占用比例,根据组合内周期数量,进行平均分配 settings['percentLimit'] = group_setting['percentLimit']/ len(minutes_interval_list) settings['vtSymbol'] = group_setting['symbol'] settings['MinInterval'] = m_i settings['mode'] = 'bar' settings['backtesting'] = True settings['bar_interval'] = group_setting['bar_interval'] if 'bar_interval' in group_setting else 1 settings['strategy'] = strategyClass # 回测报告文件保存路径: 组合,测试实例名称,测试时间 daily_report_file = os.path.abspath(os.path.join(group_setting['report_folder'], u'{}_daily_{}.csv' .format(settings['name'], test_dt))) settings['report_file'] = daily_report_file #if rt: # 回测报告集登记 daily_results.append({'test_item': 'M{}'.format(m_i), 'result_file': daily_report_file}) l.append(pool.apply_async(single_strategy_test, (settings,))) #rt = single_strategy_test(test_settings=settings) # 执行内存回收 gc.collect() sleep(10) result_list = [res.get() for res in l] for idx, rt in enumerate(result_list): if rt: return_results.append(daily_results[idx]) pool.close() pool.join() return return_results def run_multiperiod_test(gid, group_settings): """ 运行多周期的组合测试 :param gid: 组合名称 :param group_settings: :return: """ m = '_'.join(str(e) for e in group_settings['minute_list']) if 'report_folder' in group_settings: final_file = os.path.abspath(os.path.join(group_settings['report_folder'], '{}_{}_Report_{}.csv'.format(gid, group_settings['symbol'], m))) else: # 报告所在目录 report_folder = os.path.abspath(os.path.join(os.getcwd(), 'logs', gid)) if not os.path.exists(report_folder): os.mkdir(report_folder) # 汇总报告文件 final_file = os.path.abspath( os.path.join(report_folder, '{}_{}_Report_{}.csv'.format(gid, group_settings['symbol'], m))) group_settings['report_folder'] = report_folder if not os.path.exists(group_settings['report_folder']): os.makedirs(group_settings['report_folder']) # 运行回测方法,统计结果 daily_results = multi_period_test(gid, group_settings) # 统计结果 backtest_df = combine_results(daily_results) # 保存汇总记录到文件 backtest_df.to_csv(final_file) # 显示资金曲线汇总 fig, ax1 = plt.subplots() period_columns = [item['test_item'] for item in daily_results] fig.patch.set_facecolor('white') ax1.plot(backtest_df[period_columns]) ax1.legend() # 释放内存 l = [backtest_df] del backtest_df del l # 释放内存 gc.collect() print( 'finished run_multiparameter_test') def multi_parameter_test(gid, settings_list): """ 不同参数的回测组合 :param gid: 组合ID :param settings_list: 参数列表 :return: """ # 回测结果队列,对应测试参数队列 daily_results = [] return_results = [] # 每个测试的参数名称 para_list = [i['paraName'] for i in settings_list] if len(para_list) == 0: return daily_results # 启动多进程 pool = multiprocessing.Pool(multiprocessing.cpu_count()) #pool = multiprocessing.Pool(2) l = [] print('multi_parameter_test,total:{}'.format(len(settings_list))) for idx, strategy_settings in enumerate(settings_list): settings = copy.copy(strategy_settings) # 测试时间 test_dt = datetime.now().strftime('%Y%m%d_%H%M') del settings['log_file'] if 'minute_list' in settings: del settings['minute_list'] settings['vtSymbol'] = settings['symbol'] settings['mode'] = 'bar' settings['backtesting'] = True if 'bar_interval' not in settings: settings['bar_interval'] = 1 # 回测报告文件保存路径: 组合,测试实例名称,测试时间 daily_report_file = os.path.abspath(os.path.join(settings['report_folder'], u'{}_daily_{}.csv'.format(settings['name'], test_dt))) settings['report_file'] = daily_report_file l.append(pool.apply_async(single_strategy_test, (settings,))) # 回测报告集登记 daily_results.append({'test_item': settings['paraName'], 'result_file': daily_report_file}) # 执行内存回收 gc.collect() sleep(10) result_list = [res.get() for res in l] # 返回结果是正确的,才添加到返回列表中 for idx, rt in enumerate(result_list): if rt: return_results.append(daily_results[idx]) pool.close() pool.join() return return_results def run_multiparameter_test(gid, settings_list): """ 多策略组测试 :param gid: :param settings_list: :return: """ if len(settings_list) == 0: raise ReferenceError('Zero settings') first_setting = settings_list[0] paraNames = '_'.join(i['paraName'] for i in settings_list) if 'report_folder' in first_setting: report_folder = first_setting['report_folder'] final_file = os.path.abspath(os.path.join(first_setting['report_folder'], '{}_{}_Report_{}.csv'.format(gid, first_setting['symbol'], paraNames))) else: # 报告所在目录 report_folder = os.path.abspath(os.path.join(os.getcwd(), 'logs')) final_file = os.path.abspath( os.path.join(report_folder, '{}_{}_Report_{}.csv'.format(gid, first_setting['symbol'], paraNames))) if not os.path.exists(report_folder): os.makedirs(report_folder) for settings in settings_list: settings['report_folder'] = report_folder # 运行回测方法,统计结果 daily_results = multi_parameter_test(gid, settings_list) # 统计结果 #backtest_df = combine_results(daily_results) # 保存汇总记录到文件 #backtest_df.to_csv(final_file) # 显示资金曲线汇总 ##fig, ax = plt.subplots() #period_columns = [item['test_item'] for item in daily_results] #fig.patch.set_facecolor('white') #for column in period_columns: # ax.plot(backtest_df[column], label=column) #ax.legend() #title = u'{} {}'.format(gid, paraNames) #plt.title(title) #fig.savefig(u'{}/rate.png'.format(report_folder)) ## 释放内存 gc.collect() print( 'finished run_multiparameter_test') def single_func(para): logger=setup_logger('MyLog', name='my{}'.format(para)) if para > 5: print( u'more than 5') logger.info('More than 5') return True else: print ('less') logger.info('Less than 5') return False def multi_func(): import logging # 启动多进程 pool = multiprocessing.Pool(multiprocessing.cpu_count()) logger = setup_logger('MyLog') logger.info('main process') l = [] for i in range(0,10): l.append(pool.apply_async(single_func,(i,))) results = [res.get() for res in l] pool.close() pool.join()