[Add] SpreadStrategyEngine for managing spread trading strategies

This commit is contained in:
vn.py 2019-09-17 16:26:48 +08:00
parent 68090e4552
commit 3b9f1c3148
2 changed files with 504 additions and 49 deletions

View File

@ -1,6 +1,10 @@
from typing import List, Dict, Set
import traceback
import importlib
import os
from typing import List, Dict, Set, Callable, Any, Type
from collections import defaultdict
from copy import copy
from path import Path
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import BaseEngine, MainEngine
@ -40,6 +44,7 @@ class SpreadEngine(BaseEngine):
self.data_engine: SpreadDataEngine = SpreadDataEngine(self)
self.algo_engine: SpreadAlgoEngine = SpreadAlgoEngine(self)
self.strategy_engine: SpreadStrategyEngine = SpreadStrategyEngine(self)
self.add_spread = self.data_engine.add_spread
self.remove_spread = self.data_engine.remove_spread
@ -506,6 +511,8 @@ class SpreadAlgoEngine:
class SpreadStrategyEngine:
""""""
setting_filename = "spraed_trading_strategy.json"
def __init__(self, spread_engine: SpreadEngine):
""""""
self.spread_engine: SpreadEngine = spread_engine
@ -514,28 +521,107 @@ class SpreadStrategyEngine:
self.write_log = spread_engine.write_log
self.strategy_setting: Dict[str: Dict] = {}
self.classes: Dict[str: Type[SpreadStrategyTemplate]] = {}
self.strategies: Dict[str: SpreadStrategyTemplate] = {}
self.order_strategy_map: dict[str: SpreadStrategyTemplate] = {}
self.name_strategy_map: dict[str: SpreadStrategyTemplate] = defaultdict(
self.algo_strategy_map: dict[str: SpreadStrategyTemplate] = {}
self.spread_strategy_map: dict[str: SpreadStrategyTemplate] = defaultdict(
list)
self.vt_tradeids: Set = set()
def start(self):
""""""
self.load_setting()
self.load_strategy_class()
self.load_strategy_setting()
self.register_event()
self.write_log("价差策略引擎启动成功")
def load_setting(self):
def close(self):
""""""
pass
self.stop_all_strategies()
def save_setting(self):
""""""
pass
def load_strategy_class(self):
"""
Load strategy class from source code.
"""
path1 = Path(__file__).parent.joinpath("strategies")
self.load_strategy_class_from_folder(
path1, "vnpy.app.cta_strategy.strategies")
path2 = Path.cwd().joinpath("strategies")
self.load_strategy_class_from_folder(path2, "strategies")
def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
"""
Load strategy class from certain folder.
"""
for dirpath, dirnames, filenames in os.walk(str(path)):
for filename in filenames:
if filename.endswith(".py"):
strategy_module_name = ".".join(
[module_name, filename.replace(".py", "")])
elif filename.endswith(".pyd"):
strategy_module_name = ".".join(
[module_name, filename.split(".")[0]])
self.load_strategy_class_from_module(strategy_module_name)
def load_strategy_class_from_module(self, module_name: str):
"""
Load strategy class from module file.
"""
try:
module = importlib.import_module(module_name)
for name in dir(module):
value = getattr(module, name)
if (isinstance(value, type) and issubclass(value, SpreadStrategyTemplate) and value is not SpreadStrategyTemplate):
self.classes[value.__name__] = value
except: # noqa
msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
self.write_log(msg)
def load_strategy_setting(self):
"""
Load setting file.
"""
self.strategy_setting = load_json(self.setting_filename)
for strategy_name, strategy_config in self.strategy_setting.items():
self.add_strategy(
strategy_config["class_name"],
strategy_name,
strategy_config["spread_name"],
strategy_config["setting"]
)
def update_strategy_setting(self, strategy_name: str, setting: dict):
"""
Update setting file.
"""
strategy = self.strategies[strategy_name]
self.strategy_setting[strategy_name] = {
"class_name": strategy.__class__.__name__,
"spread_name": strategy.spread_name,
"setting": setting,
}
save_json(self.setting_filename, self.strategy_setting)
def remove_strategy_setting(self, strategy_name: str):
"""
Update setting file.
"""
if strategy_name not in self.strategy_setting:
return
self.strategy_setting.pop(strategy_name)
save_json(self.setting_filename, self.strategy_setting)
def register_event(self):
""""""
@ -548,27 +634,152 @@ class SpreadStrategyEngine:
def process_spread_data_event(self, event: Event):
""""""
pass
spread = event.data
strategies = self.spread_strategy_map[spread.name]
for strategy in strategies:
self.call_strategy_func(strategy, strategy.on_spread_data)
def process_spread_pos_event(self, event: Event):
""""""
pass
spread = event.data
strategies = self.spread_strategy_map[spread.name]
for strategy in strategies:
self.call_strategy_func(strategy, strategy.on_spread_pos)
def process_spread_algo_event(self, event: Event):
""""""
pass
algo = event.data
strategy = self.algo_strategy_map.get(algo.algoid, None)
if strategy:
self.call_strategy_func(strategy, strategy.update_spread_algo, algo)
def process_order_event(self, event: Event):
""""""
pass
order = event.data
strategy = self.order_strategy_map.get(order.vt_orderid, None)
if strategy:
self.call_strategy_func(strategy, strategy.update_order, order)
def process_trade_event(self, event: Event):
""""""
pass
trade = event.data
strategy = self.trade_strategy_map.get(trade.vt_orderid, None)
if strategy:
self.call_strategy_func(strategy, strategy.on_trade, trade)
def call_strategy_func(
self, strategy: SpreadStrategyTemplate, func: Callable, params: Any = None
):
"""
Call function of a strategy and catch any exception raised.
"""
try:
if params:
func(params)
else:
func()
except Exception:
strategy.trading = False
strategy.inited = False
msg = f"触发异常已停止\n{traceback.format_exc()}"
self.write_log(msg, strategy)
def add_strategy(
self, class_name: str, strategy_name: str, spread_name: str, setting: dict
):
"""
Add a new strategy.
"""
if strategy_name in self.strategies:
self.write_log(f"创建策略失败,存在重名{strategy_name}")
return
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
self.write_log(f"创建策略失败,找不到策略类{class_name}")
return
strategy = strategy_class(self, strategy_name, spread_name, setting)
self.strategies[strategy_name] = strategy
# Add vt_symbol to strategy map.
strategies = self.spread_strategy_map[spread_name]
strategies.append(strategy)
# Update to setting file.
self.update_strategy_setting(strategy_name, setting)
self.put_strategy_event(strategy)
def init_strategy(self, strategy_name: str):
""""""
strategy = self.strategies[strategy_name]
if strategy.inited:
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
return
self.call_strategy_func(strategy, strategy.on_init)
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")
def start_strategy(self, strategy_name: str):
""""""
strategy = self.strategies[strategy_name]
if not strategy.inited:
self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
return
if strategy.trading:
self.write_log(f"{strategy_name}已经启动,请勿重复操作")
return
self.call_strategy_func(strategy, strategy.on_start)
strategy.trading = True
self.put_strategy_event(strategy)
def stop_strategy(self, strategy_name: str):
""""""
strategy = self.strategies[strategy_name]
if not strategy.trading:
return
self.call_strategy_func(strategy, strategy.on_stop)
strategy.trading = False
strategy.stop_all_algos()
strategy.cancel_all_orders()
self.put_strategy_event(strategy)
def init_all_strategies(self):
""""""
for strategy in self.strategies.values():
self.init_strategy(strategy)
def start_all_strategies(self):
""""""
for strategy in self.strategies.values():
self.start_strategy(strategy)
def stop_all_strategies(self):
""""""
for strategy in self.strategies.values():
self.stop_strategy(strategy)
def start_algo(
self,
strategy: SpreadStrategyTemplate,
spread_name: str,
direction: Direction,
price: float,
volume: float,
@ -577,9 +788,25 @@ class SpreadStrategyEngine:
lock: bool
) -> str:
""""""
pass
algoid = self.spread_engine.start_algo(
spread_name,
direction,
price,
volume,
payup,
interval,
lock
)
def stop_algo(self, algoid: str):
self.algo_strategy_map[algoid] = strategy
return algoid
def stop_algo(self, strategy: SpreadStrategyTemplate, algoid: str):
""""""
self.spread_engine.stop_algo(algoid)
def stop_all_algos(self, strategy: SpreadStrategyTemplate):
""""""
pass
@ -590,19 +817,74 @@ class SpreadStrategyEngine:
price: float,
volume: float,
direction: Direction,
offset: Offset
) -> str:
pass
offset: Offset,
lock: bool
) -> List[str]:
contract = self.main_engine.get_contract(vt_symbol)
def cancel_order(self, vt_orderid: str):
original_req = OrderRequest(
symbol=contract.symbol,
exchange=contract.exchange,
direction=direction,
offset=offset,
type=OrderType.LIMIT,
price=price,
volume=volume
)
# Convert with offset converter
req_list = self.offset_converter.convert_order_request(
original_req, lock)
# Send Orders
vt_orderids = []
for req in req_list:
vt_orderid = self.main_engine.send_order(
req, contract.gateway_name)
# Check if sending order successful
if not vt_orderid:
continue
vt_orderids.append(vt_orderid)
self.offset_converter.update_order_request(req, vt_orderid)
# Save relationship between orderid and strategy.
self.order_strategy_map[vt_orderid] = strategy
return vt_orderids
def cancel_order(self, strategy: SpreadStrategyTemplate, vt_orderid: str):
""""""
order = self.main_engine.get_order(vt_orderid)
if not order:
self.write_strategy_log(strategy, "撤单失败,找不到委托{}".format(vt_orderid))
return
req = order.create_cancel_request()
self.main_engine.cancel_order(req, order.gateway_name)
def cancel_all_orders(self, strategy: SpreadStrategyTemplate):
""""""
pass
def put_strategy_event(self, strategy: SpreadStrategyTemplate):
""""""
pass
event = Event(EVENT_SPREAD_STRATEGY, strategy)
self.event_engine.put(event)
def write_strategy_log(self, strategy: SpreadStrategyTemplate, msg: str):
""""""
pass
msg = f"{strategy.strategy_name}{msg}"
self.write_log(msg)
def send_strategy_email(self, strategy: SpreadStrategyTemplate, msg: str):
""""""
if strategy:
subject = f"{strategy.strategy_name}"
else:
subject = "价差策略引擎"
self.main_engine.send_email(subject, msg)

View File

@ -1,7 +1,8 @@
from collections import defaultdict
from typing import Dict, List
from typing import Dict, List, Set
from math import floor, ceil
from copy import copy
from vnpy.trader.object import TickData, TradeData, OrderData, ContractData
from vnpy.trader.constant import Direction, Status, Offset
@ -272,42 +273,158 @@ class SpreadStrategyTemplate:
"""
Template for implementing spread trading strategies.
"""
strategy_name = "StrategyTemplate"
author: str = ""
parameters: List[str] = []
variables: List[str] = []
def __init__(
self,
strategy_engine,
strategy_id: str,
spread: SpreadData
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
self.strategy_engine = strategy_engine
self.strategy_id = strategy_id
self.strategy_name = strategy_name
self.spread = spread
self.spread_name = spread.name
self.inited = False
self.trading = False
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.vt_orderids: Set[str] = set()
self.algoids: Set[str] = set()
self.update_setting(setting)
def update_setting(self, setting: dict):
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
@classmethod
def get_class_parameters(cls):
"""
Get default parameters dict of strategy class.
"""
class_parameters = {}
for name in cls.parameters:
class_parameters[name] = getattr(cls, name)
return class_parameters
def get_parameters(self):
"""
Get strategy parameters dict.
"""
strategy_parameters = {}
for name in self.parameters:
strategy_parameters[name] = getattr(self, name)
return strategy_parameters
def get_variables(self):
"""
Get strategy variables dict.
"""
strategy_variables = {}
for name in self.variables:
strategy_variables[name] = getattr(self, name)
return strategy_variables
def get_data(self):
"""
Get strategy data.
"""
strategy_data = {
"strategy_name": self.strategy_name,
"spread_name": self.spread_name,
"class_name": self.__class__.__name__,
"author": self.author,
"parameters": self.get_parameters(),
"variables": self.get_variables(),
}
return strategy_data
def update_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
if not algo.is_active() and algo.algoid in self.algoids:
self.algoids.pop(algo.algoid)
self.on_spread_algo(algo)
def update_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
if not order.is_active() and order.vt_orderid in self.vt_orderids:
self.vt_orderids.pop(order.vt_orderid)
self.on_order(order)
@virtual
def on_init(self):
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_start(self):
"""
Callback when strategy is started.
"""
pass
@virtual
def on_stop(self):
"""
Callback when strategy is stopped.
"""
pass
@virtual
def on_spread_data(self):
""""""
"""
Callback when spread price is updated.
"""
pass
@virtual
def on_spread_pos(self):
""""""
"""
Callback when spread position is updated.
"""
pass
@virtual
def on_spread_algo(self, algo: SpreadAlgoTemplate):
""""""
"""
Callback when algo status is updated.
"""
pass
@virtual
def on_order(self, order: OrderData):
""""""
"""
Callback when order status is updated.
"""
pass
@virtual
def on_trade(self, trade: TradeData):
""""""
"""
Callback when new trade data is received.
"""
pass
def start_algo(
@ -320,7 +437,23 @@ class SpreadStrategyTemplate:
lock: bool
) -> str:
""""""
pass
if not self.trading:
return ""
algoid: str = self.strategy_engine.start_algo(
self,
self.spread_name,
direction,
price,
volume,
payup,
interval,
lock
)
self.algoids.add(algoid)
return algoid
def start_long_algo(
self,
@ -346,23 +479,31 @@ class SpreadStrategyTemplate:
def stop_algo(self, algoid: str):
""""""
pass
if not self.trading:
return
def buy(self, vt_symbol: str, price: float, volume: float):
""""""
return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.OPEN)
self.strategy_engine.stop_algo(self, algoid)
def sell(self, vt_symbol: str, price: float, volume: float):
def stop_all_algos(self):
""""""
return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.CLOSE)
for algoid in self.algoids:
self.stop_algo(algoid)
def short(self, vt_symbol: str, price: float, volume: float):
def buy(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.OPEN)
return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.OPEN, lock)
def cover(self, vt_symbol: str, price: float, volume: float):
def sell(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.CLOSE)
return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.CLOSE, lock)
def short(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.OPEN, lock)
def cover(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.CLOSE, lock)
def send_order(
self,
@ -370,22 +511,47 @@ class SpreadStrategyTemplate:
price: float,
volume: float,
direction: Direction,
offset: Offset
):
offset: Offset,
lock: bool
) -> List[str]:
""""""
pass
if not self.trading:
return []
vt_orderids: List[str] = self.strategy_engine.send_order(
self,
vt_symbol,
price,
volume,
direction,
offset,
lock
)
for vt_orderid in vt_orderids:
self.vt_orderids.add(vt_orderid)
return vt_orderids
def cancel_order(self, vt_orderid: str):
""""""
pass
if not self.trading:
return
self.strategy_engine.cancel_order(self, vt_orderid)
def cancel_all_orders(self):
""""""
for vt_orderid in self.vt_orderids:
self.cancel_order(vt_orderid)
def put_event(self):
""""""
pass
self.strategy_engine.put_strategy_event(self)
def write_log(self, msg: str):
""""""
pass
self.strategy_engine.write_strategy_log(self, msg)
def get_spread_tick(self) -> TickData:
""""""
@ -417,3 +583,10 @@ class SpreadStrategyTemplate:
return leg.long_pos
else:
return leg.short_pos
def send_email(self, msg: str):
"""
Send email to default receiver.
"""
if self.inited:
self.strategy_engine.send_email(msg, self)