From 3b9f1c314826b202f26f17012d37b9a7f32bb916 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Tue, 17 Sep 2019 16:26:48 +0800 Subject: [PATCH] [Add] SpreadStrategyEngine for managing spread trading strategies --- vnpy/app/spread_trading/engine.py | 326 ++++++++++++++++++++++++++-- vnpy/app/spread_trading/template.py | 227 ++++++++++++++++--- 2 files changed, 504 insertions(+), 49 deletions(-) diff --git a/vnpy/app/spread_trading/engine.py b/vnpy/app/spread_trading/engine.py index deb7e801..6025d30e 100644 --- a/vnpy/app/spread_trading/engine.py +++ b/vnpy/app/spread_trading/engine.py @@ -1,6 +1,10 @@ -from typing import List, Dict, Set +import traceback +import importlib +import os +from typing import List, Dict, Set, Callable, Any, Type from collections import defaultdict from copy import copy +from path import Path from vnpy.event import EventEngine, Event from vnpy.trader.engine import BaseEngine, MainEngine @@ -40,6 +44,7 @@ class SpreadEngine(BaseEngine): self.data_engine: SpreadDataEngine = SpreadDataEngine(self) self.algo_engine: SpreadAlgoEngine = SpreadAlgoEngine(self) + self.strategy_engine: SpreadStrategyEngine = SpreadStrategyEngine(self) self.add_spread = self.data_engine.add_spread self.remove_spread = self.data_engine.remove_spread @@ -506,6 +511,8 @@ class SpreadAlgoEngine: class SpreadStrategyEngine: """""" + setting_filename = "spraed_trading_strategy.json" + def __init__(self, spread_engine: SpreadEngine): """""" self.spread_engine: SpreadEngine = spread_engine @@ -514,28 +521,107 @@ class SpreadStrategyEngine: self.write_log = spread_engine.write_log + self.strategy_setting: Dict[str: Dict] = {} + + self.classes: Dict[str: Type[SpreadStrategyTemplate]] = {} self.strategies: Dict[str: SpreadStrategyTemplate] = {} self.order_strategy_map: dict[str: SpreadStrategyTemplate] = {} - self.name_strategy_map: dict[str: SpreadStrategyTemplate] = defaultdict( + self.algo_strategy_map: dict[str: SpreadStrategyTemplate] = {} + self.spread_strategy_map: dict[str: SpreadStrategyTemplate] = defaultdict( list) self.vt_tradeids: Set = set() def start(self): """""" - self.load_setting() + self.load_strategy_class() + self.load_strategy_setting() self.register_event() self.write_log("价差策略引擎启动成功") - def load_setting(self): + def close(self): """""" - pass + self.stop_all_strategies() - def save_setting(self): - """""" - pass + def load_strategy_class(self): + """ + Load strategy class from source code. + """ + path1 = Path(__file__).parent.joinpath("strategies") + self.load_strategy_class_from_folder( + path1, "vnpy.app.cta_strategy.strategies") + + path2 = Path.cwd().joinpath("strategies") + self.load_strategy_class_from_folder(path2, "strategies") + + def load_strategy_class_from_folder(self, path: Path, module_name: str = ""): + """ + Load strategy class from certain folder. + """ + for dirpath, dirnames, filenames in os.walk(str(path)): + for filename in filenames: + if filename.endswith(".py"): + strategy_module_name = ".".join( + [module_name, filename.replace(".py", "")]) + elif filename.endswith(".pyd"): + strategy_module_name = ".".join( + [module_name, filename.split(".")[0]]) + + self.load_strategy_class_from_module(strategy_module_name) + + def load_strategy_class_from_module(self, module_name: str): + """ + Load strategy class from module file. + """ + try: + module = importlib.import_module(module_name) + + for name in dir(module): + value = getattr(module, name) + if (isinstance(value, type) and issubclass(value, SpreadStrategyTemplate) and value is not SpreadStrategyTemplate): + self.classes[value.__name__] = value + except: # noqa + msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}" + self.write_log(msg) + + def load_strategy_setting(self): + """ + Load setting file. + """ + self.strategy_setting = load_json(self.setting_filename) + + for strategy_name, strategy_config in self.strategy_setting.items(): + self.add_strategy( + strategy_config["class_name"], + strategy_name, + strategy_config["spread_name"], + strategy_config["setting"] + ) + + def update_strategy_setting(self, strategy_name: str, setting: dict): + """ + Update setting file. + """ + strategy = self.strategies[strategy_name] + + self.strategy_setting[strategy_name] = { + "class_name": strategy.__class__.__name__, + "spread_name": strategy.spread_name, + "setting": setting, + } + save_json(self.setting_filename, self.strategy_setting) + + def remove_strategy_setting(self, strategy_name: str): + """ + Update setting file. + """ + if strategy_name not in self.strategy_setting: + return + + self.strategy_setting.pop(strategy_name) + save_json(self.setting_filename, self.strategy_setting) def register_event(self): """""" @@ -548,27 +634,152 @@ class SpreadStrategyEngine: def process_spread_data_event(self, event: Event): """""" - pass + spread = event.data + strategies = self.spread_strategy_map[spread.name] + + for strategy in strategies: + self.call_strategy_func(strategy, strategy.on_spread_data) def process_spread_pos_event(self, event: Event): """""" - pass + spread = event.data + strategies = self.spread_strategy_map[spread.name] + + for strategy in strategies: + self.call_strategy_func(strategy, strategy.on_spread_pos) def process_spread_algo_event(self, event: Event): """""" - pass + algo = event.data + strategy = self.algo_strategy_map.get(algo.algoid, None) + + if strategy: + self.call_strategy_func(strategy, strategy.update_spread_algo, algo) def process_order_event(self, event: Event): """""" - pass + order = event.data + strategy = self.order_strategy_map.get(order.vt_orderid, None) + + if strategy: + self.call_strategy_func(strategy, strategy.update_order, order) def process_trade_event(self, event: Event): """""" - pass + trade = event.data + strategy = self.trade_strategy_map.get(trade.vt_orderid, None) + + if strategy: + self.call_strategy_func(strategy, strategy.on_trade, trade) + + def call_strategy_func( + self, strategy: SpreadStrategyTemplate, func: Callable, params: Any = None + ): + """ + Call function of a strategy and catch any exception raised. + """ + try: + if params: + func(params) + else: + func() + except Exception: + strategy.trading = False + strategy.inited = False + + msg = f"触发异常已停止\n{traceback.format_exc()}" + self.write_log(msg, strategy) + + def add_strategy( + self, class_name: str, strategy_name: str, spread_name: str, setting: dict + ): + """ + Add a new strategy. + """ + if strategy_name in self.strategies: + self.write_log(f"创建策略失败,存在重名{strategy_name}") + return + + strategy_class = self.classes.get(class_name, None) + if not strategy_class: + self.write_log(f"创建策略失败,找不到策略类{class_name}") + return + + strategy = strategy_class(self, strategy_name, spread_name, setting) + self.strategies[strategy_name] = strategy + + # Add vt_symbol to strategy map. + strategies = self.spread_strategy_map[spread_name] + strategies.append(strategy) + + # Update to setting file. + self.update_strategy_setting(strategy_name, setting) + + self.put_strategy_event(strategy) + + def init_strategy(self, strategy_name: str): + """""" + strategy = self.strategies[strategy_name] + + if strategy.inited: + self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作") + return + + self.call_strategy_func(strategy, strategy.on_init) + strategy.inited = True + + self.put_strategy_event(strategy) + self.write_log(f"{strategy_name}初始化完成") + + def start_strategy(self, strategy_name: str): + """""" + strategy = self.strategies[strategy_name] + if not strategy.inited: + self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化") + return + + if strategy.trading: + self.write_log(f"{strategy_name}已经启动,请勿重复操作") + return + + self.call_strategy_func(strategy, strategy.on_start) + strategy.trading = True + + self.put_strategy_event(strategy) + + def stop_strategy(self, strategy_name: str): + """""" + strategy = self.strategies[strategy_name] + if not strategy.trading: + return + + self.call_strategy_func(strategy, strategy.on_stop) + strategy.trading = False + + strategy.stop_all_algos() + strategy.cancel_all_orders() + + self.put_strategy_event(strategy) + + def init_all_strategies(self): + """""" + for strategy in self.strategies.values(): + self.init_strategy(strategy) + + def start_all_strategies(self): + """""" + for strategy in self.strategies.values(): + self.start_strategy(strategy) + + def stop_all_strategies(self): + """""" + for strategy in self.strategies.values(): + self.stop_strategy(strategy) def start_algo( self, strategy: SpreadStrategyTemplate, + spread_name: str, direction: Direction, price: float, volume: float, @@ -577,9 +788,25 @@ class SpreadStrategyEngine: lock: bool ) -> str: """""" - pass + algoid = self.spread_engine.start_algo( + spread_name, + direction, + price, + volume, + payup, + interval, + lock + ) - def stop_algo(self, algoid: str): + self.algo_strategy_map[algoid] = strategy + + return algoid + + def stop_algo(self, strategy: SpreadStrategyTemplate, algoid: str): + """""" + self.spread_engine.stop_algo(algoid) + + def stop_all_algos(self, strategy: SpreadStrategyTemplate): """""" pass @@ -590,19 +817,74 @@ class SpreadStrategyEngine: price: float, volume: float, direction: Direction, - offset: Offset - ) -> str: - pass + offset: Offset, + lock: bool + ) -> List[str]: + contract = self.main_engine.get_contract(vt_symbol) - def cancel_order(self, vt_orderid: str): + original_req = OrderRequest( + symbol=contract.symbol, + exchange=contract.exchange, + direction=direction, + offset=offset, + type=OrderType.LIMIT, + price=price, + volume=volume + ) + + # Convert with offset converter + req_list = self.offset_converter.convert_order_request( + original_req, lock) + + # Send Orders + vt_orderids = [] + + for req in req_list: + vt_orderid = self.main_engine.send_order( + req, contract.gateway_name) + + # Check if sending order successful + if not vt_orderid: + continue + + vt_orderids.append(vt_orderid) + + self.offset_converter.update_order_request(req, vt_orderid) + + # Save relationship between orderid and strategy. + self.order_strategy_map[vt_orderid] = strategy + + return vt_orderids + + def cancel_order(self, strategy: SpreadStrategyTemplate, vt_orderid: str): + """""" + order = self.main_engine.get_order(vt_orderid) + if not order: + self.write_strategy_log(strategy, "撤单失败,找不到委托{}".format(vt_orderid)) + return + + req = order.create_cancel_request() + self.main_engine.cancel_order(req, order.gateway_name) + + def cancel_all_orders(self, strategy: SpreadStrategyTemplate): """""" pass def put_strategy_event(self, strategy: SpreadStrategyTemplate): """""" - pass + event = Event(EVENT_SPREAD_STRATEGY, strategy) + self.event_engine.put(event) def write_strategy_log(self, strategy: SpreadStrategyTemplate, msg: str): """""" - pass - + msg = f"{strategy.strategy_name}:{msg}" + self.write_log(msg) + + def send_strategy_email(self, strategy: SpreadStrategyTemplate, msg: str): + """""" + if strategy: + subject = f"{strategy.strategy_name}" + else: + subject = "价差策略引擎" + + self.main_engine.send_email(subject, msg) diff --git a/vnpy/app/spread_trading/template.py b/vnpy/app/spread_trading/template.py index 8f428d4b..88f572ed 100644 --- a/vnpy/app/spread_trading/template.py +++ b/vnpy/app/spread_trading/template.py @@ -1,7 +1,8 @@ from collections import defaultdict -from typing import Dict, List +from typing import Dict, List, Set from math import floor, ceil +from copy import copy from vnpy.trader.object import TickData, TradeData, OrderData, ContractData from vnpy.trader.constant import Direction, Status, Offset @@ -272,42 +273,158 @@ class SpreadStrategyTemplate: """ Template for implementing spread trading strategies. """ - strategy_name = "StrategyTemplate" + + author: str = "" + parameters: List[str] = [] + variables: List[str] = [] def __init__( self, strategy_engine, - strategy_id: str, - spread: SpreadData + strategy_name: str, + spread: SpreadData, + setting: dict ): """""" self.strategy_engine = strategy_engine - self.strategy_id = strategy_id + self.strategy_name = strategy_name self.spread = spread + self.spread_name = spread.name + + self.inited = False + self.trading = False + + self.variables = copy(self.variables) + self.variables.insert(0, "inited") + self.variables.insert(1, "trading") + + self.vt_orderids: Set[str] = set() + self.algoids: Set[str] = set() + + self.update_setting(setting) + + def update_setting(self, setting: dict): + """ + Update strategy parameter wtih value in setting dict. + """ + for name in self.parameters: + if name in setting: + setattr(self, name, setting[name]) + + @classmethod + def get_class_parameters(cls): + """ + Get default parameters dict of strategy class. + """ + class_parameters = {} + for name in cls.parameters: + class_parameters[name] = getattr(cls, name) + return class_parameters + + def get_parameters(self): + """ + Get strategy parameters dict. + """ + strategy_parameters = {} + for name in self.parameters: + strategy_parameters[name] = getattr(self, name) + return strategy_parameters + + def get_variables(self): + """ + Get strategy variables dict. + """ + strategy_variables = {} + for name in self.variables: + strategy_variables[name] = getattr(self, name) + return strategy_variables + + def get_data(self): + """ + Get strategy data. + """ + strategy_data = { + "strategy_name": self.strategy_name, + "spread_name": self.spread_name, + "class_name": self.__class__.__name__, + "author": self.author, + "parameters": self.get_parameters(), + "variables": self.get_variables(), + } + return strategy_data + + def update_spread_algo(self, algo: SpreadAlgoTemplate): + """ + Callback when algo status is updated. + """ + if not algo.is_active() and algo.algoid in self.algoids: + self.algoids.pop(algo.algoid) + + self.on_spread_algo(algo) + + def update_order(self, order: OrderData): + """ + Callback when order status is updated. + """ + if not order.is_active() and order.vt_orderid in self.vt_orderids: + self.vt_orderids.pop(order.vt_orderid) + + self.on_order(order) + + @virtual + def on_init(self): + """ + Callback when strategy is inited. + """ + pass + + @virtual + def on_start(self): + """ + Callback when strategy is started. + """ + pass + + @virtual + def on_stop(self): + """ + Callback when strategy is stopped. + """ + pass @virtual def on_spread_data(self): - """""" + """ + Callback when spread price is updated. + """ pass @virtual def on_spread_pos(self): - """""" + """ + Callback when spread position is updated. + """ pass @virtual def on_spread_algo(self, algo: SpreadAlgoTemplate): - """""" + """ + Callback when algo status is updated. + """ pass @virtual def on_order(self, order: OrderData): - """""" + """ + Callback when order status is updated. + """ pass @virtual def on_trade(self, trade: TradeData): - """""" + """ + Callback when new trade data is received. + """ pass def start_algo( @@ -320,7 +437,23 @@ class SpreadStrategyTemplate: lock: bool ) -> str: """""" - pass + if not self.trading: + return "" + + algoid: str = self.strategy_engine.start_algo( + self, + self.spread_name, + direction, + price, + volume, + payup, + interval, + lock + ) + + self.algoids.add(algoid) + + return algoid def start_long_algo( self, @@ -346,23 +479,31 @@ class SpreadStrategyTemplate: def stop_algo(self, algoid: str): """""" - pass + if not self.trading: + return - def buy(self, vt_symbol: str, price: float, volume: float): - """""" - return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.OPEN) + self.strategy_engine.stop_algo(self, algoid) - def sell(self, vt_symbol: str, price: float, volume: float): + def stop_all_algos(self): """""" - return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.CLOSE) + for algoid in self.algoids: + self.stop_algo(algoid) - def short(self, vt_symbol: str, price: float, volume: float): + def buy(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]: """""" - return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.OPEN) + return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.OPEN, lock) - def cover(self, vt_symbol: str, price: float, volume: float): + def sell(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]: """""" - return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.CLOSE) + return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.CLOSE, lock) + + def short(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]: + """""" + return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.OPEN, lock) + + def cover(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]: + """""" + return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.CLOSE, lock) def send_order( self, @@ -370,22 +511,47 @@ class SpreadStrategyTemplate: price: float, volume: float, direction: Direction, - offset: Offset - ): + offset: Offset, + lock: bool + ) -> List[str]: """""" - pass + if not self.trading: + return [] + + vt_orderids: List[str] = self.strategy_engine.send_order( + self, + vt_symbol, + price, + volume, + direction, + offset, + lock + ) + + for vt_orderid in vt_orderids: + self.vt_orderids.add(vt_orderid) + + return vt_orderids def cancel_order(self, vt_orderid: str): """""" - pass + if not self.trading: + return + + self.strategy_engine.cancel_order(self, vt_orderid) + + def cancel_all_orders(self): + """""" + for vt_orderid in self.vt_orderids: + self.cancel_order(vt_orderid) def put_event(self): """""" - pass + self.strategy_engine.put_strategy_event(self) def write_log(self, msg: str): """""" - pass + self.strategy_engine.write_strategy_log(self, msg) def get_spread_tick(self) -> TickData: """""" @@ -417,3 +583,10 @@ class SpreadStrategyTemplate: return leg.long_pos else: return leg.short_pos + + def send_email(self, msg: str): + """ + Send email to default receiver. + """ + if self.inited: + self.strategy_engine.send_email(msg, self)