From f8cdca8cb8934b2eca45a386b9d5ba98e78a8d35 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Sat, 10 Aug 2019 11:58:12 +0800 Subject: [PATCH] [Mod] use ThreadPoolExecutor to run cta strategy init task in parallel --- examples/vn_trader/run.py | 4 +- vnpy/app/cta_strategy/engine.py | 74 +++++++++++++++------------------ 2 files changed, 35 insertions(+), 43 deletions(-) diff --git a/examples/vn_trader/run.py b/examples/vn_trader/run.py index 35a3567b..5ae053bf 100644 --- a/examples/vn_trader/run.py +++ b/examples/vn_trader/run.py @@ -11,7 +11,7 @@ from vnpy.trader.ui import MainWindow, create_qapp # from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctptest import CtptestGateway # from vnpy.gateway.mini import MiniGateway -from vnpy.gateway.sec import SecGateway +from vnpy.gateway.sopt import SoptGateway # from vnpy.gateway.minitest import MinitestGateway # from vnpy.gateway.femas import FemasGateway # from vnpy.gateway.tiger import TigerGateway @@ -49,7 +49,7 @@ def main(): # main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtptestGateway) # main_engine.add_gateway(MiniGateway) - main_engine.add_gateway(SecGateway) + main_engine.add_gateway(SoptGateway) # main_engine.add_gateway(MinitestGateway) # main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(IbGateway) diff --git a/vnpy/app/cta_strategy/engine.py b/vnpy/app/cta_strategy/engine.py index cd02823a..bc3e7fa7 100644 --- a/vnpy/app/cta_strategy/engine.py +++ b/vnpy/app/cta_strategy/engine.py @@ -7,8 +7,7 @@ from collections import defaultdict from pathlib import Path from typing import Any, Callable from datetime import datetime, timedelta -from threading import Thread -from queue import Queue +from concurrent.futures import ThreadPoolExecutor from copy import copy from vnpy.event import Event, EventEngine @@ -92,8 +91,7 @@ class CtaEngine(BaseEngine): self.stop_order_count = 0 # for generating stop_orderid self.stop_orders = {} # stop_orderid: stop_order - self.init_thread = None - self.init_queue = Queue() + self.init_executor = ThreadPoolExecutor(max_workers=3) self.rq_client = None self.rq_symbols = set() @@ -606,53 +604,47 @@ class CtaEngine(BaseEngine): def init_strategy(self, strategy_name: str): """ Init a strategy. - """ - self.init_queue.put(strategy_name) + """ + self.init_executor.submit(self._init_strategy, strategy_name) - if not self.init_thread: - self.init_thread = Thread(target=self._init_strategy) - self.init_thread.start() - - def _init_strategy(self): + def _init_strategy(self, strategy_name: str): """ Init strategies in queue. """ - while not self.init_queue.empty(): - strategy_name = self.init_queue.get() - strategy = self.strategies[strategy_name] + strategy = self.strategies[strategy_name] - if strategy.inited: - self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作") - continue + print(datetime.now(), strategy_name, strategy.vt_symbol) - self.write_log(f"{strategy_name}开始执行初始化") + if strategy.inited: + self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作") + return - # Call on_init function of strategy - self.call_strategy_func(strategy, strategy.on_init) + self.write_log(f"{strategy_name}开始执行初始化") - # Restore strategy data(variables) - data = self.strategy_data.get(strategy_name, None) - if data: - for name in strategy.variables: - value = data.get(name, None) - if value: - setattr(strategy, name, value) + # Call on_init function of strategy + self.call_strategy_func(strategy, strategy.on_init) - # Subscribe market data - contract = self.main_engine.get_contract(strategy.vt_symbol) - if contract: - req = SubscribeRequest( - symbol=contract.symbol, exchange=contract.exchange) - self.main_engine.subscribe(req, contract.gateway_name) - else: - self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy) + # Restore strategy data(variables) + data = self.strategy_data.get(strategy_name, None) + if data: + for name in strategy.variables: + value = data.get(name, None) + if value: + setattr(strategy, name, value) - # Put event to update init completed status. - strategy.inited = True - self.put_strategy_event(strategy) - self.write_log(f"{strategy_name}初始化完成") - - self.init_thread = None + # Subscribe market data + contract = self.main_engine.get_contract(strategy.vt_symbol) + if contract: + req = SubscribeRequest( + symbol=contract.symbol, exchange=contract.exchange) + self.main_engine.subscribe(req, contract.gateway_name) + else: + self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy) + + # Put event to update init completed status. + strategy.inited = True + self.put_strategy_event(strategy) + self.write_log(f"{strategy_name}初始化完成") def start_strategy(self, strategy_name: str): """