[Mod] use ThreadPoolExecutor to run cta strategy init task in parallel

This commit is contained in:
vn.py 2019-08-10 11:58:12 +08:00
parent d151b13ef5
commit f8cdca8cb8
2 changed files with 35 additions and 43 deletions

View File

@ -11,7 +11,7 @@ from vnpy.trader.ui import MainWindow, create_qapp
# from vnpy.gateway.ctp import CtpGateway # from vnpy.gateway.ctp import CtpGateway
# from vnpy.gateway.ctptest import CtptestGateway # from vnpy.gateway.ctptest import CtptestGateway
# from vnpy.gateway.mini import MiniGateway # from vnpy.gateway.mini import MiniGateway
from vnpy.gateway.sec import SecGateway from vnpy.gateway.sopt import SoptGateway
# from vnpy.gateway.minitest import MinitestGateway # from vnpy.gateway.minitest import MinitestGateway
# from vnpy.gateway.femas import FemasGateway # from vnpy.gateway.femas import FemasGateway
# from vnpy.gateway.tiger import TigerGateway # from vnpy.gateway.tiger import TigerGateway
@ -49,7 +49,7 @@ def main():
# main_engine.add_gateway(CtpGateway) # main_engine.add_gateway(CtpGateway)
# main_engine.add_gateway(CtptestGateway) # main_engine.add_gateway(CtptestGateway)
# main_engine.add_gateway(MiniGateway) # main_engine.add_gateway(MiniGateway)
main_engine.add_gateway(SecGateway) main_engine.add_gateway(SoptGateway)
# main_engine.add_gateway(MinitestGateway) # main_engine.add_gateway(MinitestGateway)
# main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway) # main_engine.add_gateway(IbGateway)

View File

@ -7,8 +7,7 @@ from collections import defaultdict
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from datetime import datetime, timedelta from datetime import datetime, timedelta
from threading import Thread from concurrent.futures import ThreadPoolExecutor
from queue import Queue
from copy import copy from copy import copy
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
@ -92,8 +91,7 @@ class CtaEngine(BaseEngine):
self.stop_order_count = 0 # for generating stop_orderid self.stop_order_count = 0 # for generating stop_orderid
self.stop_orders = {} # stop_orderid: stop_order self.stop_orders = {} # stop_orderid: stop_order
self.init_thread = None self.init_executor = ThreadPoolExecutor(max_workers=3)
self.init_queue = Queue()
self.rq_client = None self.rq_client = None
self.rq_symbols = set() self.rq_symbols = set()
@ -607,52 +605,46 @@ class CtaEngine(BaseEngine):
""" """
Init a strategy. Init a strategy.
""" """
self.init_queue.put(strategy_name) self.init_executor.submit(self._init_strategy, strategy_name)
if not self.init_thread: def _init_strategy(self, strategy_name: str):
self.init_thread = Thread(target=self._init_strategy)
self.init_thread.start()
def _init_strategy(self):
""" """
Init strategies in queue. Init strategies in queue.
""" """
while not self.init_queue.empty(): strategy = self.strategies[strategy_name]
strategy_name = self.init_queue.get()
strategy = self.strategies[strategy_name]
if strategy.inited: print(datetime.now(), strategy_name, strategy.vt_symbol)
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
continue
self.write_log(f"{strategy_name}开始执行初始化") if strategy.inited:
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
return
# Call on_init function of strategy self.write_log(f"{strategy_name}开始执行初始化")
self.call_strategy_func(strategy, strategy.on_init)
# Restore strategy data(variables) # Call on_init function of strategy
data = self.strategy_data.get(strategy_name, None) self.call_strategy_func(strategy, strategy.on_init)
if data:
for name in strategy.variables:
value = data.get(name, None)
if value:
setattr(strategy, name, value)
# Subscribe market data # Restore strategy data(variables)
contract = self.main_engine.get_contract(strategy.vt_symbol) data = self.strategy_data.get(strategy_name, None)
if contract: if data:
req = SubscribeRequest( for name in strategy.variables:
symbol=contract.symbol, exchange=contract.exchange) value = data.get(name, None)
self.main_engine.subscribe(req, contract.gateway_name) if value:
else: setattr(strategy, name, value)
self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
# Put event to update init completed status. # Subscribe market data
strategy.inited = True contract = self.main_engine.get_contract(strategy.vt_symbol)
self.put_strategy_event(strategy) if contract:
self.write_log(f"{strategy_name}初始化完成") req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange)
self.main_engine.subscribe(req, contract.gateway_name)
else:
self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
self.init_thread = None # Put event to update init completed status.
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")
def start_strategy(self, strategy_name: str): def start_strategy(self, strategy_name: str):
""" """