In [1]:
import random
import multiprocessing
import numpy as np
from deap import creator, base, tools, algorithms
from vnpy.app.cta_strategy.backtesting import BacktestingEngine,OptimizationSetting
from vnpy.app.cta_strategy.strategies.boll_channel_strategy import BollChannelStrategy
from vnpy.app.cta_strategy.strategies.atr_rsi_strategy import AtrRsiStrategy
from datetime import datetime
import multiprocessing           #多进程
from functools import lru_cache

In [None]:
setting = OptimizationSetting()
#setting.add_parameter('atr_length', 10, 50, 2)
#setting.add_parameter('atr_ma_length', 10, 50, 2)
#setting.add_parameter('rsi_length', 4, 50, 2)
#setting.add_parameter('rsi_entry', 4, 30, 1)
setting.add_parameter('boll_window', 4, 50, 2)
#setting.add_parameter('boll_dev', 4, 50, 2)
setting.add_parameter('cci_window', 4, 50, 2)
setting.add_parameter('atr_window', 4, 50, 2)


local_setting = setting.generate_setting()
total_sample = len(local_setting)
print("数据总体：",total_sample)

In [None]:
setting_names = random.choice(local_setting).keys()
setting_names

In [None]:
def parameter_generate():
    setting_param = list(random.choice(local_setting).values())
    return setting_param

In [None]:
parameter_generate()

In [None]:
setting=dict(zip(setting_names,parameter_generate()))
setting

In [None]:
def object_func(strategy_avg):
    """"""
    return run_backtesting(tuple(strategy_avg))
    #return run_backtesting(strategy_avg)
    

@lru_cache(maxsize=1000000)
def run_backtesting(strategy_avg):
    # 创建回测引擎对象
    engine = BacktestingEngine()
    engine.set_parameters(
        vt_symbol="IF88.CFFEX",
        interval="1m",
        start=datetime(2016, 1, 1),
        end=datetime(2019, 1,1),
        rate=0.3/10000,
        slippage=0.2,
        size=300,
        pricetick=0.2,
        capital=1_000_000,
    )
    
    setting=dict(zip(setting_names,strategy_avg))
           

    #加载策略          
    #engine.initStrategy(TurtleTradingStrategy, setting)
    engine.add_strategy(BollChannelStrategy, setting)
    engine.load_data()
    engine.run_backtesting()
    engine.calculate_result()
    result = engine.calculate_statistics(output=False)

    return_drawdown_ratio = round(result['return_drawdown_ratio'],2)  #收益回撤比
    sharpe_ratio= round(result['sharpe_ratio'],2)                   #夏普比率
    return return_drawdown_ratio , sharpe_ratio

In [None]:
object_func(parameter_generate())

In [None]:
target_names = ["return_drawdown_ratio" , "sharpe_ratio"]
def show_result(hof):
    for i in range(len(hof)):
        solution = hof[i]    
        parameter=dict(zip(setting_names,solution))
        result=dict(zip(target_names,list(object_func(solution))))
        print({**parameter, **result})

In [None]:
from time import time
#设置优化方向：最大化收益回撤比，最大化夏普比率
creator.create("FitnessMax", base.Fitness, weights=(1.0, 1.0)) # 1.0 求最大值；-1.0 求最小值
creator.create("Individual", list, fitness=creator.FitnessMax)

def optimize(population=None):
    """"""           
    start = time()    
    toolbox = base.Toolbox() 

    # 初始化     
    toolbox.register("individual", tools.initIterate, creator.Individual,parameter_generate)                          
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)                                            
    toolbox.register("mate", tools.cxTwoPoint)                                               
    toolbox.register("mutate", tools.mutUniformInt,low = 4,up = 40,indpb=1)               
    toolbox.register("evaluate", object_func)                                                
    toolbox.register("select", tools.selNSGA2)       
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    toolbox.register("map", pool.map)
    #toolbox.register("map", futures.map)
    
    
    #遗传算法参数设置
    MU = 80                                  #设置每一代选择的个体数
    LAMBDA = 100  #设置每一代产生的子女数
    POP=100
    CXPB, MUTPB, NGEN = 0.95, 0.05,30        #分别为种群内部个体的交叉概率、变异概率、产生种群代数
    
    if population==None:
        LAMBDA = POP = int(pow(total_sample, 1/2.7))
        MU = int(0.8*POP)    
    
    pop = toolbox.population(POP)            #设置族群里面的个体数量
    hof = tools.ParetoFront()                #解的集合：帕累托前沿(非占优最优集)

    stats = tools.Statistics(lambda ind: ind.fitness.values)
    np.set_printoptions(suppress=True)            #对numpy默认输出的科学计数法转换
    stats.register("mean", np.mean, axis=0)       #统计目标优化函数结果的平均值
    stats.register("std", np.std, axis=0)         #统计目标优化函数结果的标准差
    stats.register("min", np.min, axis=0)         #统计目标优化函数结果的最小值
    stats.register("max", np.max, axis=0)         #统计目标优化函数结果的最大值
    print("开始运行遗传算法，每代族群总数：%s, 优良品种筛选个数：%s，迭代次数：%s，交叉概率：%s，突变概率：%s" %(POP,MU,NGEN,CXPB,MUTPB))
    

    #运行算法
    algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats,
                              halloffame=hof)     #esMuPlusLambda是一种基于(μ+λ)选择策略的多目标优化分段遗传算法

    end = time()
    cost = int((end - start))

    print("遗传算法优化完成，耗时%s秒"% (cost))
    print("输出帕累托前沿解集：")
    show_result(hof)
    

In [None]:
optimize()

In [None]:
    MU = 80                                  #设置每一代选择的个体数
    POP = 100                             #设置每一代产生的子女数
    CXPB, MUTPB, NGEN = 0.95, 0.05,20 
    print("开始运行遗传算法，每代族群总数：%s, 优良品种筛选个数：%s，迭代次数：%s，交叉概率：%s，突变概率：%s" %(POP,MU,NGEN,CXPB,MUTPB))