diff --git a/vnpy/app/cta_strategy_pro/__init__.py b/vnpy/app/cta_strategy_pro/__init__.py new file mode 100644 index 00000000..0a7f5467 --- /dev/null +++ b/vnpy/app/cta_strategy_pro/__init__.py @@ -0,0 +1,22 @@ +from pathlib import Path + +from vnpy.trader.app import BaseApp +from vnpy.trader.constant import Direction +from vnpy.trader.object import TickData, BarData, TradeData, OrderData +from vnpy.trader.utility import BarGenerator, ArrayManager + +from .base import APP_NAME, StopOrder +from .engine import CtaEngine + +from .template import CtaTemplate, CtaSignal, TargetPosTemplate + +class CtaStrategyProApp(BaseApp): + """""" + + app_name = APP_NAME + app_module = __module__ + app_path = Path(__file__).parent + display_name = "CTA策略PRO" + engine_class = CtaEngine + widget_name = "CtaManager" + icon_name = "cta.ico" diff --git a/vnpy/app/cta_strategy_pro/base.py b/vnpy/app/cta_strategy_pro/base.py new file mode 100644 index 00000000..cf27f3dd --- /dev/null +++ b/vnpy/app/cta_strategy_pro/base.py @@ -0,0 +1,93 @@ +""" +Defines constants and objects used in CtaStrategyPro App. +""" + +from dataclasses import dataclass, field +from enum import Enum +from datetime import timedelta + +from vnpy.trader.constant import Direction, Offset, Interval + +APP_NAME = "CtaStrategyPro" +STOPORDER_PREFIX = "STOP" + + +class StopOrderStatus(Enum): + WAITING = "等待中" + CANCELLED = "已撤销" + TRIGGERED = "已触发" + + +class EngineType(Enum): + LIVE = "实盘" + BACKTESTING = "回测" + + +class BacktestingMode(Enum): + BAR = 1 + TICK = 2 + + +class Area(Enum): + """ Kline area """ + LONG_A = 'LONG_A' + LONG_B = 'LONG_B' + LONG_C = 'LONG_C' + LONG_D = 'LONG_D' + LONG_E = 'LONG_E' + SHORT_A = 'SHORT_A' + SHORT_B = 'SHORT_B' + SHORT_C = 'SHORT_C' + SHORT_D = 'SHORT_D' + SHORT_E = 'SHORT_E' + + +# 各类商品所在市场,underly_symbol: price_tick +# 上期所夜盘,9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~2:30 +NIGHT_MARKET_SQ1 = {'AU': 0.05, 'AG': 1, 'SC': 0.1} +# 上期所夜盘,9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~1:00 +NIGHT_MARKET_SQ2 = {'CU': 10, 'PB': 5, 'AL': 5, 'ZN': 5, 'WR': 1, 'NI': 10} +# 上期所夜盘,9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00 +NIGHT_MARKET_SQ3 = {'RU': 5, 'RB': 1, 'HC': 1, 'SP': 2, 'FU': 1, 'BU': 2, 'NR': 5, 'C': 1, 'CS': 1} +# 郑商所夜盘,9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00 +NIGHT_MARKET_ZZ = {'TA': 2, 'JR': 1, 'OI': 0, 'RO': 1, 'PM': 1, 'WH': 1, 'CF': 5, 'SR': 0, 'FG': 1, + 'MA': 1, 'RS': 1, 'RM': 1, 'RI': 1, 'ZC': 0.2} +# 大商所夜盘,9:00~10:15, 10:30~11:30, 13:30~15:00, 21:00 ~23:00 +NIGHT_MARKET_DL = {'V': 5, 'L': 5, 'BB': 0.05, 'I': 0.5, 'FB': 0.05, 'C': 1, 'PP': 1, 'A': 1, 'B': 1, 'M': 1, 'Y': 2, + 'P': 2, + 'JM': 0.5, 'J': 0.5, 'EG': 1} +# 中金日盘,9:15 ~11:30, 13:00~15:15 +MARKET_ZJ = {'IC': 0.2, 'IF': 0.2, 'IH': 0.2, 'T': 0.005, 'TF': 0.005, 'TS': 0.005} + +# 只有日盘得合约 +MARKET_DAY_ONLY = {'IC': 0.2, 'IF': 0.2, 'IH': 0.2, 'T': 0.005, 'TF': 0.005, 'TS': 0.005, + 'JD': 1, 'BB': 0.05, 'CS': 1, 'FB': 0.05, 'L': 5, 'V': 5, + 'JR': 1, 'LR': 1, 'PM': 1, 'RI': 1, 'RS': 1, 'SM': 2, 'WH': 1, 'AP': 1, 'CJ': 1, 'UR': 1} + +# 夜盘23:00收盘的合约 +NIGHT_MARKET_23 = {**NIGHT_MARKET_DL, **NIGHT_MARKET_ZZ, **NIGHT_MARKET_SQ3} + + +@dataclass +class StopOrder: + vt_symbol: str + direction: Direction + offset: Offset + price: float + volume: float + stop_orderid: str + strategy_name: str + lock: bool = False + vt_orderids: list = field(default_factory=list) + status: StopOrderStatus = StopOrderStatus.WAITING + + +EVENT_CTA_LOG = "eCtaLog" +EVENT_CTA_STRATEGY = "eCtaStrategy" +EVENT_CTA_STOPORDER = "eCtaStopOrder" + +INTERVAL_DELTA_MAP = { + Interval.MINUTE: timedelta(minutes=1), + Interval.HOUR: timedelta(hours=1), + Interval.DAILY: timedelta(days=1), +} diff --git a/vnpy/app/cta_strategy_pro/engine.py b/vnpy/app/cta_strategy_pro/engine.py new file mode 100644 index 00000000..3f2b7b04 --- /dev/null +++ b/vnpy/app/cta_strategy_pro/engine.py @@ -0,0 +1,1004 @@ +"""""" + +import importlib +import os +import sys +import traceback +from collections import defaultdict +from pathlib import Path +from typing import Any, Callable +from datetime import datetime, timedelta +from concurrent.futures import ThreadPoolExecutor +from copy import copy +from functools import lru_cache + +from vnpy.event import Event, EventEngine +from vnpy.trader.engine import BaseEngine, MainEngine +from vnpy.trader.object import ( + OrderRequest, + SubscribeRequest, + HistoryRequest, + LogData, + TickData, + BarData, + ContractData +) +from vnpy.trader.event import ( + EVENT_TICK, + EVENT_ORDER, + EVENT_TRADE, + EVENT_POSITION +) +from vnpy.trader.constant import ( + Direction, + OrderType, + Interval, + Exchange, + Offset, + Status +) +from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to, get_folder_path +from vnpy.trader.util_logger import setup_logger, logging +from vnpy.trader.converter import OffsetConverter + +from .base import ( + APP_NAME, + EVENT_CTA_LOG, + EVENT_CTA_STRATEGY, + EVENT_CTA_STOPORDER, + EngineType, + StopOrder, + StopOrderStatus, + STOPORDER_PREFIX +) +from .template import CtaTemplate + +STOP_STATUS_MAP = { + Status.SUBMITTING: StopOrderStatus.WAITING, + Status.NOTTRADED: StopOrderStatus.WAITING, + Status.PARTTRADED: StopOrderStatus.TRIGGERED, + Status.ALLTRADED: StopOrderStatus.TRIGGERED, + Status.CANCELLED: StopOrderStatus.CANCELLED, + Status.REJECTED: StopOrderStatus.CANCELLED +} + + +class CtaEngine(BaseEngine): + """ + 策略引擎【增强版】 + 1、策略日志单独输出=》log/strategy_name_yyyy-mm-dd.log + 2、使用免费的tdx源,替代rqdata源 + 3、取消初始化数据时,从全局的cta_strategy_data中恢复数据,改为策略自己初始化恢复数据 + 4、支持多合约订阅和多合约交易. 扩展的合约在setting中配置,由策略进行订阅 + """ + + engine_type = EngineType.LIVE # live trading engine + + setting_filename = "cta_strategy_setting.json" + data_filename = "cta_strategy_data.json" + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + """""" + super(CtaEngine, self).__init__( + main_engine, event_engine, APP_NAME) + + self.strategy_setting = {} # strategy_name: dict + self.strategy_data = {} # strategy_name: dict + + self.classes = {} # class_name: stategy_class + self.class_module_map = {} # class_name: mudule_name + self.strategies = {} # strategy_name: strategy + + self.strategy_loggers = {} # strategy_name: logger + + self.symbol_strategy_map = defaultdict(list) # vt_symbol: strategy list + self.strategy_symbol_map = defaultdict(set) # strategy_name: vt_symbol set + + self.orderid_strategy_map = {} # vt_orderid: strategy + self.strategy_orderid_map = defaultdict( + set) # strategy_name: orderid list + + self.stop_order_count = 0 # for generating stop_orderid + self.stop_orders = {} # stop_orderid: stop_order + + self.init_executor = ThreadPoolExecutor(max_workers=1) + + self.vt_tradeids = set() # for filtering duplicate trade + + self.offset_converter = OffsetConverter(self.main_engine) + + def init_engine(self): + """ + """ + self.load_strategy_class() + self.load_strategy_setting() + self.register_event() + self.write_log("CTA策略引擎初始化成功") + + def close(self): + """""" + self.stop_all_strategies() + + def register_event(self): + """""" + self.event_engine.register(EVENT_TICK, self.process_tick_event) + self.event_engine.register(EVENT_ORDER, self.process_order_event) + self.event_engine.register(EVENT_TRADE, self.process_trade_event) + self.event_engine.register(EVENT_POSITION, self.process_position_event) + + def process_tick_event(self, event: Event): + """""" + tick = event.data + + strategies = self.symbol_strategy_map[tick.vt_symbol] + if not strategies: + return + + self.check_stop_order(tick) + + for strategy in strategies: + if strategy.inited: + self.call_strategy_func(strategy, strategy.on_tick, tick) + + def process_order_event(self, event: Event): + """""" + order = event.data + + self.offset_converter.update_order(order) + + strategy = self.orderid_strategy_map.get(order.vt_orderid, None) + if not strategy: + return + + # Remove vt_orderid if order is no longer active. + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] + if order.vt_orderid in vt_orderids and not order.is_active(): + vt_orderids.remove(order.vt_orderid) + + # For server stop order, call strategy on_stop_order function + if order.type == OrderType.STOP: + so = StopOrder( + vt_symbol=order.vt_symbol, + direction=order.direction, + offset=order.offset, + price=order.price, + volume=order.volume, + stop_orderid=order.vt_orderid, + strategy_name=strategy.strategy_name, + status=STOP_STATUS_MAP[order.status], + vt_orderids=[order.vt_orderid], + ) + self.call_strategy_func(strategy, strategy.on_stop_order, so) + + # Call strategy on_order function + self.call_strategy_func(strategy, strategy.on_order, order) + + def process_trade_event(self, event: Event): + """""" + trade = event.data + + # Filter duplicate trade push + if trade.vt_tradeid in self.vt_tradeids: + return + self.vt_tradeids.add(trade.vt_tradeid) + + self.offset_converter.update_trade(trade) + + strategy = self.orderid_strategy_map.get(trade.vt_orderid, None) + if not strategy: + return + + # Update strategy pos before calling on_trade method + if trade.direction == Direction.LONG: + strategy.pos += trade.volume + else: + strategy.pos -= trade.volume + + self.call_strategy_func(strategy, strategy.on_trade, trade) + + # Sync strategy variables to data file + # 取消此功能,由策略自身完成数据持久化 + # self.sync_strategy_data(strategy) + + # Update GUI + self.put_strategy_event(strategy) + + def process_position_event(self, event: Event): + """""" + position = event.data + + self.offset_converter.update_position(position) + + def check_stop_order(self, tick: TickData): + """""" + for stop_order in list(self.stop_orders.values()): + if stop_order.vt_symbol != tick.vt_symbol: + continue + + long_triggered = ( + stop_order.direction == Direction.LONG and tick.last_price >= stop_order.price + ) + short_triggered = ( + stop_order.direction == Direction.SHORT and tick.last_price <= stop_order.price + ) + + if long_triggered or short_triggered: + strategy = self.strategies[stop_order.strategy_name] + + # To get excuted immediately after stop order is + # triggered, use limit price if available, otherwise + # use ask_price_5 or bid_price_5 + if stop_order.direction == Direction.LONG: + if tick.limit_up: + price = tick.limit_up + else: + price = tick.ask_price_5 + else: + if tick.limit_down: + price = tick.limit_down + else: + price = tick.bid_price_5 + + contract = self.main_engine.get_contract(stop_order.vt_symbol) + + vt_orderids = self.send_limit_order( + strategy, + contract, + stop_order.direction, + stop_order.offset, + price, + stop_order.volume, + stop_order.lock + ) + + # Update stop order status if placed successfully + if vt_orderids: + # Remove from relation map. + self.stop_orders.pop(stop_order.stop_orderid) + + strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name] + if stop_order.stop_orderid in strategy_vt_orderids: + strategy_vt_orderids.remove(stop_order.stop_orderid) + + # Change stop order status to cancelled and update to strategy. + stop_order.status = StopOrderStatus.TRIGGERED + stop_order.vt_orderids = vt_orderids + + self.call_strategy_func( + strategy, strategy.on_stop_order, stop_order + ) + self.put_stop_order_event(stop_order) + + def send_server_order( + self, + strategy: CtaTemplate, + contract: ContractData, + direction: Direction, + offset: Offset, + price: float, + volume: float, + type: OrderType, + lock: bool + ): + """ + Send a new order to server. + """ + # Create request and send order. + original_req = OrderRequest( + symbol=contract.symbol, + exchange=contract.exchange, + direction=direction, + offset=offset, + type=type, + price=price, + volume=volume, + ) + + # Convert with offset converter + req_list = self.offset_converter.convert_order_request(original_req, lock) + + # Send Orders + vt_orderids = [] + + for req in req_list: + vt_orderid = self.main_engine.send_order( + req, contract.gateway_name) + + # Check if sending order successful + if not vt_orderid: + continue + + vt_orderids.append(vt_orderid) + + self.offset_converter.update_order_request(req, vt_orderid) + + # Save relationship between orderid and strategy. + self.orderid_strategy_map[vt_orderid] = strategy + self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid) + + return vt_orderids + + def send_limit_order( + self, + strategy: CtaTemplate, + contract: ContractData, + direction: Direction, + offset: Offset, + price: float, + volume: float, + lock: bool + ): + """ + Send a limit order to server. + """ + return self.send_server_order( + strategy, + contract, + direction, + offset, + price, + volume, + OrderType.LIMIT, + lock + ) + + def send_server_stop_order( + self, + strategy: CtaTemplate, + contract: ContractData, + direction: Direction, + offset: Offset, + price: float, + volume: float, + lock: bool + ): + """ + Send a stop order to server. + + Should only be used if stop order supported + on the trading server. + """ + return self.send_server_order( + strategy, + contract, + direction, + offset, + price, + volume, + OrderType.STOP, + lock + ) + + def send_local_stop_order( + self, + strategy: CtaTemplate, + vt_symbol: str, + direction: Direction, + offset: Offset, + price: float, + volume: float, + lock: bool + ): + """ + Create a new local stop order. + """ + self.stop_order_count += 1 + stop_orderid = f"{STOPORDER_PREFIX}.{self.stop_order_count}" + + stop_order = StopOrder( + vt_symbol=vt_symbol, + direction=direction, + offset=offset, + price=price, + volume=volume, + stop_orderid=stop_orderid, + strategy_name=strategy.strategy_name, + lock=lock + ) + + self.stop_orders[stop_orderid] = stop_order + + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] + vt_orderids.add(stop_orderid) + + self.call_strategy_func(strategy, strategy.on_stop_order, stop_order) + self.put_stop_order_event(stop_order) + + return [stop_orderid] + + def cancel_server_order(self, strategy: CtaTemplate, vt_orderid: str): + """ + Cancel existing order by vt_orderid. + """ + order = self.main_engine.get_order(vt_orderid) + if not order: + self.write_log(msg=f"撤单失败,找不到委托{vt_orderid}", + strategy_Name=strategy.name, + level=logging.ERROR) + return + + req = order.create_cancel_request() + self.main_engine.cancel_order(req, order.gateway_name) + + def cancel_local_stop_order(self, strategy: CtaTemplate, stop_orderid: str): + """ + Cancel a local stop order. + """ + stop_order = self.stop_orders.get(stop_orderid, None) + if not stop_order: + return + strategy = self.strategies[stop_order.strategy_name] + + # Remove from relation map. + self.stop_orders.pop(stop_orderid) + + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] + if stop_orderid in vt_orderids: + vt_orderids.remove(stop_orderid) + + # Change stop order status to cancelled and update to strategy. + stop_order.status = StopOrderStatus.CANCELLED + + self.call_strategy_func(strategy, strategy.on_stop_order, stop_order) + self.put_stop_order_event(stop_order) + + def send_order( + self, + strategy: CtaTemplate, + vt_symbol: str, + direction: Direction, + offset: Offset, + price: float, + volume: float, + stop: bool, + lock: bool + ): + """ + 该方法供策略使用,发送委托。 + """ + contract = self.main_engine.get_contract(vt_symbol) + if not contract: + self.write_log(msg=f"委托失败,找不到合约:{vt_symbol}", + strategy_name=strategy.name, + level=logging.ERROR) + return "" + + # Round order price and volume to nearest incremental value + price = round_to(price, contract.pricetick) + volume = round_to(volume, contract.min_volume) + + if stop: + if contract.stop_supported: + return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock) + else: + return self.send_local_stop_order(strategy, vt_symbol, direction, offset, price, volume, lock) + else: + return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock) + + def cancel_order(self, strategy: CtaTemplate, vt_orderid: str): + """ + """ + if vt_orderid.startswith(STOPORDER_PREFIX): + self.cancel_local_stop_order(strategy, vt_orderid) + else: + self.cancel_server_order(strategy, vt_orderid) + + def cancel_all(self, strategy: CtaTemplate): + """ + Cancel all active orders of a strategy. + """ + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] + if not vt_orderids: + return + + for vt_orderid in copy(vt_orderids): + self.cancel_order(strategy, vt_orderid) + + def subscribe_symbol(self, strategy_name: str, vt_symbol: str): + """订阅合约""" + strategy = self.strategies.get(strategy_name, None) + if not strategy: + return False + + contract = self.main_engine.get_contract(vt_symbol) + if contract: + req = SubscribeRequest( + symbol=contract.symbol, exchange=contract.exchange) + self.main_engine.subscribe(req, contract.gateway_name) + + # 添加 合约订阅 vt_symbol <=> 策略实例 strategy 映射. + strategies = self.symbol_strategy_map[vt_symbol] + strategies.append(strategy) + + # 添加 策略名 strategy_name <=> 合约订阅 vt_symbol 的映射 + subscribe_symbol_set = self.strategy_symbol_map[strategy.name] + subscribe_symbol_set.add(contract.vt_symbol) + return True + + else: + self.write_log(msg=f"行情订阅失败,找不到合约{vt_symbol}", + strategy_name=strategy.name, + level=logging.CRITICAL) + return False + + @lru_cache() + def get_size(self, vt_symbol: str): + """查询合约的size""" + contract = self.main_engine.get_contract(vt_symbol) + if contract is None: + self.write_error(f'查询不到{vt_symbol}合约信息') + return 10 + return contract.size + + @lru_cache() + def get_margin_rate(self, vt_symbol: str): + """查询保证金比率""" + contract = self.main_engine.get_contract(vt_symbol) + if contract is None: + self.write_error(f'查询不到{vt_symbol}合约信息') + return 0.1 + if contract.margin_rate == 0: + return 0.1 + return contract.margin_rate + + @lru_cache() + def get_price_tick(self, vt_symbol: str): + """查询价格最小跳动""" + contract = self.main_engine.get_contract(vt_symbol) + if contract is None: + self.write_error(f'查询不到{vt_symbol}合约信息') + return 0.1 + + return contract.pricetick + + def get_price(self, vt_symbol: str): + """查询合约的最新价格""" + tick = self.main_engine.get_tick(vt_symbol) + if tick: + return tick.last_price + + return None + + def get_engine_type(self): + """""" + return self.engine_type + + def call_strategy_func( + self, strategy: CtaTemplate, func: Callable, params: Any = None + ): + """ + Call function of a strategy and catch any exception raised. + """ + try: + if params: + func(params) + else: + func() + except Exception: + strategy.trading = False + strategy.inited = False + + msg = f"触发异常已停止\n{traceback.format_exc()}" + self.write_log(msg=msg, + strategy_name=strategy.name, + level=logging.CRITICAL) + + def add_strategy( + self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict + ): + """ + Add a new strategy. + """ + if strategy_name in self.strategies: + self.write_log(msg=f"创建策略失败,存在重名{strategy_name}", + level=logging.CRITICAL) + return + + strategy_class = self.classes.get(class_name, None) + if not strategy_class: + self.write_log(msg=f"创建策略失败,找不到策略类{class_name}", + level=logging.CRITICAL) + return + + self.write_log(f'开始添加策略类{class_name},实例名:{strategy_name}') + strategy = strategy_class(self, strategy_name, vt_symbol, setting) + self.strategies[strategy_name] = strategy + + # Add vt_symbol to strategy map. + strategies = self.symbol_strategy_map[vt_symbol] + strategies.append(strategy) + + subscribe_symbol_set = self.strategy_symbol_map[strategy_name] + subscribe_symbol_set.add(vt_symbol) + + # Update to setting file. + self.update_strategy_setting(strategy_name, setting) + + self.put_strategy_event(strategy) + + # 判断设置中是否由自动初始化和自动启动项目 + if setting.get('auto_init', False): + self.init_strategy(strategy_name, auto_start=setting.get('auto_start', False)) + + def init_strategy(self, strategy_name: str, auto_start: bool = False): + """ + Init a strategy. + """ + self.init_executor.submit(self._init_strategy, strategy_name, auto_start) + + def _init_strategy(self, strategy_name: str, auto_start: bool = False): + """ + Init strategies in queue. + """ + strategy = self.strategies[strategy_name] + + if strategy.inited: + self.write_error(f"{strategy_name}已经完成初始化,禁止重复操作") + return + + self.write_log(f"{strategy_name}开始执行初始化") + + # Call on_init function of strategy + self.call_strategy_func(strategy, strategy.on_init) + + # Restore strategy data(variables) + # Pro 版本不使用自动恢复除了内部数据功能,由策略自身初始化时完成 + # data = self.strategy_data.get(strategy_name, None) + # if data: + # for name in strategy.variables: + # value = data.get(name, None) + # if value: + # setattr(strategy, name, value) + + # Subscribe market data 订阅缺省的vt_symbol, 如果有其他合约需要订阅,由策略内部初始化时提交订阅即可。 + self.subscribe_symbol(strategy_name, vt_symbol=strategy.vt_symbol) + + # Put event to update init completed status. + strategy.inited = True + self.put_strategy_event(strategy) + self.write_log(f"{strategy_name}初始化完成") + + # 初始化后,自动启动策略交易 + if auto_start: + self.start_strategy(strategy_name) + + def start_strategy(self, strategy_name: str): + """ + Start a strategy. + """ + strategy = self.strategies[strategy_name] + if not strategy.inited: + self.write_error(f"策略{strategy.strategy_name}启动失败,请先初始化") + return + + if strategy.trading: + self.write_error(f"{strategy_name}已经启动,请勿重复操作") + return + + self.call_strategy_func(strategy, strategy.on_start) + strategy.trading = True + + self.put_strategy_event(strategy) + + def stop_strategy(self, strategy_name: str): + """ + Stop a strategy. + """ + strategy = self.strategies[strategy_name] + if not strategy.trading: + self.write_log(f'{strategy_name}策略实例已处于停止交易状态') + return + + # Call on_stop function of the strategy + self.write_log(f'调用{strategy_name}的on_stop,停止交易') + self.call_strategy_func(strategy, strategy.on_stop) + + # Change trading status of strategy to False + strategy.trading = False + + # Cancel all orders of the strategy + self.write_log(f'撤销{strategy_name}所有委托') + self.cancel_all(strategy) + + # Sync strategy variables to data file + # 取消此功能,由策略自身完成数据的持久化 + # self.sync_strategy_data(strategy) + + # Update GUI + self.put_strategy_event(strategy) + + def edit_strategy(self, strategy_name: str, setting: dict): + """ + Edit parameters of a strategy. + 风险警示: 该方法强行干预策略的配置 + """ + strategy = self.strategies[strategy_name] + strategy.update_setting(setting) + + self.update_strategy_setting(strategy_name, setting) + self.put_strategy_event(strategy) + + def remove_strategy(self, strategy_name: str): + """ + Remove a strategy. + """ + strategy = self.strategies[strategy_name] + if strategy.trading: + self.write_error(f"策略{strategy.strategy_name}移除失败,请先停止") + return + + # Remove setting + self.remove_strategy_setting(strategy_name) + + # 移除订阅合约与策略的关联关系 + for vt_symbol in self.strategy_symbol_map[strategy_name]: + # Remove from symbol strategy map + self.write_log(f'移除{vt_symbol}《=》{strategy_name}的订阅关系') + strategies = self.symbol_strategy_map[vt_symbol] + strategies.remove(strategy) + + # Remove from active orderid map + if strategy_name in self.strategy_orderid_map: + vt_orderids = self.strategy_orderid_map.pop(strategy_name) + self.write_log(f'移除{strategy_name}的所有委托订单映射关系') + # Remove vt_orderid strategy map + for vt_orderid in vt_orderids: + if vt_orderid in self.orderid_strategy_map: + self.orderid_strategy_map.pop(vt_orderid) + + # Remove from strategies + self.write_log(f'移除{strategy_name}策略实例') + self.strategies.pop(strategy_name) + + return True + + def reload_strategy(self, strategy_name: str, vt_symbol: str = '', setting: dict = {}): + """ + 重新加载策略 + 一般使用于在线更新策略代码,或者更新策略参数,需要重新启动策略 + :param strategy_name: + :param setting: + :return: + """ + self.write_log(f'开始重新加载策略{strategy_name}') + + # 优先判断重启的策略,是否已经加载 + if strategy_name not in self.strategies or strategy_name not in self.strategy_setting: + self.write_error(f"{strategy_name}不在运行策略中,不能重启") + return False + old_strategy_config = copy(self.strategy_setting[strategy_name]) + + class_name = old_strategy_config.get('class_name') + if len(vt_symbol) == 0: + vt_symbol = old_strategy_config.get('vt_symbol') + if len(setting) == 0: + setting = old_strategy_config.get('setting') + + module_name = self.class_module_map[class_name] + # 重新load class module + if not self.load_strategy_class_from_module(module_name): + return False + + # 停止当前策略实例的运行,撤单 + self.stop_strategy(strategy_name) + + # 移除运行中的策略实例 + self.remove_strategy(strategy_name) + + # 重新添加策略 + self.add_strategy(class_name=class_name, + strategy_name=strategy_name, + vt_symbol=vt_symbol, + setting=setting) + + self.write_log(f'重新运行策略{strategy_name}执行完毕') + return True + + def load_strategy_class(self): + """ + Load strategy class from source code. + """ + # 加载 vnpy/app/cta_strategy_pro/strategies的所有策略 + path1 = Path(__file__).parent.joinpath("strategies") + self.load_strategy_class_from_folder( + path1, "vnpy.app.cta_strategy_pro.strategies") + + # 加载 当前运行目录下strategies子目录的所有策略 + path2 = Path.cwd().joinpath("strategies") + self.load_strategy_class_from_folder(path2, "strategies") + + def load_strategy_class_from_folder(self, path: Path, module_name: str = ""): + """ + Load strategy class from certain folder. + """ + for dirpath, dirnames, filenames in os.walk(str(path)): + for filename in filenames: + if filename.endswith(".py"): + strategy_module_name = ".".join( + [module_name, filename.replace(".py", "")]) + elif filename.endswith(".pyd"): + strategy_module_name = ".".join( + [module_name, filename.split(".")[0]]) + else: + continue + self.load_strategy_class_from_module(strategy_module_name) + + def load_strategy_class_from_module(self, module_name: str): + """ + Load/Reload strategy class from module file. + """ + try: + module = importlib.import_module(module_name) + + for name in dir(module): + value = getattr(module, name) + if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate): + class_name = value.__name__ + if class_name not in self.classes: + self.write_log(f"加载策略类{module_name}.{class_name}") + else: + self.write_log(f"更新策略类{module_name}.{class_name}") + self.classes[class_name] = value + self.class_module_map[class_name] = module_name + return True + except: # noqa + msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}" + self.write_log(msg=msg, level=logging.CRITICAL) + return False + + def load_strategy_data(self): + """ + Load strategy data from json file. + """ + print(f'load_strategy_data 此功能已取消,由策略自身完成数据的持久化加载', file=sys.stderr) + return + # self.strategy_data = load_json(self.data_filename) + + def sync_strategy_data(self, strategy: CtaTemplate): + """ + Sync strategy data into json file. + """ + # data = strategy.get_variables() + # data.pop("inited") # Strategy status (inited, trading) should not be synced. + # data.pop("trading") + # self.strategy_data[strategy.strategy_name] = data + # save_json(self.data_filename, self.strategy_data) + print(f'sync_strategy_data此功能已取消,由策略自身完成数据的持久化保存', file=sys.stderr) + + def get_all_strategy_class_names(self): + """ + Return names of strategy classes loaded. + """ + return list(self.classes.keys()) + + def get_strategy_class_parameters(self, class_name: str): + """ + Get default parameters of a strategy class. + """ + strategy_class = self.classes[class_name] + + parameters = {} + for name in strategy_class.parameters: + parameters[name] = getattr(strategy_class, name) + + return parameters + + def get_strategy_parameters(self, strategy_name): + """ + Get parameters of a strategy. + """ + strategy = self.strategies[strategy_name] + return strategy.get_parameters() + + def init_all_strategies(self): + """ + """ + for strategy_name in self.strategies.keys(): + self.init_strategy(strategy_name) + + def start_all_strategies(self): + """ + """ + for strategy_name in self.strategies.keys(): + self.start_strategy(strategy_name) + + def stop_all_strategies(self): + """ + """ + for strategy_name in self.strategies.keys(): + self.stop_strategy(strategy_name) + + def load_strategy_setting(self): + """ + Load setting file. + """ + self.strategy_setting = load_json(self.setting_filename) + + for strategy_name, strategy_config in self.strategy_setting.items(): + self.add_strategy( + strategy_config["class_name"], + strategy_name, + strategy_config["vt_symbol"], + strategy_config["setting"] + ) + + def update_strategy_setting(self, strategy_name: str, setting: dict): + """ + Update setting file. + """ + strategy = self.strategies[strategy_name] + + self.strategy_setting[strategy_name] = { + "class_name": strategy.__class__.__name__, + "vt_symbol": strategy.vt_symbol, + "setting": setting, + } + save_json(self.setting_filename, self.strategy_setting) + + def remove_strategy_setting(self, strategy_name: str): + """ + Update setting file. + """ + if strategy_name not in self.strategy_setting: + return + self.write_log(f'移除CTA引擎{strategy_name}的配置') + self.strategy_setting.pop(strategy_name) + save_json(self.setting_filename, self.strategy_setting) + + def put_stop_order_event(self, stop_order: StopOrder): + """ + Put an event to update stop order status. + """ + event = Event(EVENT_CTA_STOPORDER, stop_order) + self.event_engine.put(event) + + def put_strategy_event(self, strategy: CtaTemplate): + """ + Put an event to update strategy status. + """ + data = strategy.get_data() + event = Event(EVENT_CTA_STRATEGY, data) + self.event_engine.put(event) + + def write_log(self, msg: str, strategy_name: str = '', level: int = logging.INFO): + """ + Create cta engine log event. + """ + # 推送至全局CTA_LOG Event + log = LogData(msg=f"{strategy_name}: {msg}" if strategy_name else msg, + gateway_name="CtaStrategy", + level=level) + event = Event(type=EVENT_CTA_LOG, data=log) + self.event_engine.put(event) + + # 保存单独的策略日志 + if strategy_name: + strategy_logger = self.strategy_loggers.get(strategy_name, None) + if not strategy_logger: + log_path = get_folder_path('log') + log_filename = os.path.abspath(os.path.join(log_path, str(strategy_name))) + print(u'create logger:{}'.format(log_filename)) + self.strategy_loggers[strategy_name] = setup_logger(file_name=log_filename, + name=str(strategy_name)) + strategy_logger = self.strategy_loggers.get(strategy_name) + if strategy_logger: + strategy_logger.log(level, msg) + + # 如果日志数据异常,错误和告警,输出至sys.stderr + if level in [logging.CRITICAL, logging.ERROR, logging.WARNING]: + print(f"{strategy_name}: {msg}" if strategy_name else msg, file=sys.stderr) + + def write_error(self, msg: str, strategy_name: str = ''): + """写入错误日志""" + self.write_log(msg=msg, strategy_name=strategy_name, level=logging.ERROR) + + def send_email(self, msg: str, strategy: CtaTemplate = None): + """ + Send email to default receiver. + """ + if strategy: + subject = f"{strategy.strategy_name}" + else: + subject = "CTA策略引擎" + + self.main_engine.send_email(subject, msg) diff --git a/vnpy/app/cta_strategy_pro/strategies/turtle_signal_strategy.py b/vnpy/app/cta_strategy_pro/strategies/turtle_signal_strategy.py new file mode 100644 index 00000000..b2fa0bf2 --- /dev/null +++ b/vnpy/app/cta_strategy_pro/strategies/turtle_signal_strategy.py @@ -0,0 +1,166 @@ +from vnpy.app.cta_strategy_pro import ( + CtaTemplate, + StopOrder, + Direction, + TickData, + BarData, + TradeData, + OrderData, + BarGenerator, + ArrayManager, +) + + +class TurtleSignalStrategy(CtaTemplate): + """""" + author = "用Python的交易员" + + entry_window = 20 + exit_window = 10 + atr_window = 20 + fixed_size = 1 + + entry_up = 0 + entry_down = 0 + exit_up = 0 + exit_down = 0 + atr_value = 0 + + long_entry = 0 + short_entry = 0 + long_stop = 0 + short_stop = 0 + + parameters = ["entry_window", "exit_window", "atr_window", "fixed_size"] + variables = ["entry_up", "entry_down", "exit_up", "exit_down", "atr_value"] + + def __init__(self, cta_engine, strategy_name, vt_symbol, setting): + """""" + super(TurtleSignalStrategy, self).__init__( + cta_engine, strategy_name, vt_symbol, setting + ) + + self.bg = BarGenerator(self.on_bar) + self.am = ArrayManager() + + def on_init(self): + """ + Callback when strategy is inited. + """ + self.write_log("策略初始化") + self.load_bar(20) + + def on_start(self): + """ + Callback when strategy is started. + """ + self.write_log("策略启动") + + def on_stop(self): + """ + Callback when strategy is stopped. + """ + self.write_log("策略停止") + + def on_tick(self, tick: TickData): + """ + Callback of new tick data update. + """ + self.bg.update_tick(tick) + + def on_bar(self, bar: BarData): + """ + Callback of new bar data update. + """ + self.cancel_all() + + self.am.update_bar(bar) + if not self.am.inited: + return + + # Only calculates new entry channel when no position holding + if not self.pos: + self.entry_up, self.entry_down = self.am.donchian( + self.entry_window + ) + + self.exit_up, self.exit_down = self.am.donchian(self.exit_window) + + if not self.pos: + self.atr_value = self.am.atr(self.atr_window) + + self.long_entry = 0 + self.short_entry = 0 + self.long_stop = 0 + self.short_stop = 0 + + self.send_buy_orders(self.entry_up) + self.send_short_orders(self.entry_down) + elif self.pos > 0: + self.send_buy_orders(self.entry_up) + + sell_price = max(self.long_stop, self.exit_down) + self.sell(sell_price, abs(self.pos), True) + + elif self.pos < 0: + self.send_short_orders(self.entry_down) + + cover_price = min(self.short_stop, self.exit_up) + self.cover(cover_price, abs(self.pos), True) + + self.put_event() + + def on_trade(self, trade: TradeData): + """ + Callback of new trade data update. + """ + if trade.direction == Direction.LONG: + self.long_entry = trade.price + self.long_stop = self.long_entry - 2 * self.atr_value + else: + self.short_entry = trade.price + self.short_stop = self.short_entry + 2 * self.atr_value + + def on_order(self, order: OrderData): + """ + Callback of new order data update. + """ + pass + + def on_stop_order(self, stop_order: StopOrder): + """ + Callback of stop order update. + """ + pass + + def send_buy_orders(self, price): + """""" + t = self.pos / self.fixed_size + + if t < 1: + self.buy(price, self.fixed_size, True) + + if t < 2: + self.buy(price + self.atr_value * 0.5, self.fixed_size, True) + + if t < 3: + self.buy(price + self.atr_value, self.fixed_size, True) + + if t < 4: + self.buy(price + self.atr_value * 1.5, self.fixed_size, True) + + def send_short_orders(self, price): + """""" + t = self.pos / self.fixed_size + + if t > -1: + self.short(price, self.fixed_size, True) + + if t > -2: + self.short(price - self.atr_value * 0.5, self.fixed_size, True) + + if t > -3: + self.short(price - self.atr_value, self.fixed_size, True) + + if t > -4: + self.short(price - self.atr_value * 1.5, self.fixed_size, True) diff --git a/vnpy/app/cta_strategy_pro/template.py b/vnpy/app/cta_strategy_pro/template.py new file mode 100644 index 00000000..f9490e31 --- /dev/null +++ b/vnpy/app/cta_strategy_pro/template.py @@ -0,0 +1,451 @@ +"""""" +import sys +from abc import ABC +from copy import copy +from typing import Any, Callable +from logging import INFO, ERROR +from vnpy.trader.constant import Interval, Direction, Offset +from vnpy.trader.object import BarData, TickData, OrderData, TradeData +from vnpy.trader.utility import virtual + +from .base import StopOrder, EngineType + + +class CtaComponent(ABC): + """ CTA策略基础组件""" + def __init__(self, strategy=None, **kwargs): + """ + 构造 + :param strategy: + """ + self.strategy = strategy + + # ---------------------------------------------------------------------- + def write_log(self, content: str): + """记录日志""" + if self.strategy: + self.strategy.write_log(msg=content, level=INFO) + else: + print(content) + + # ---------------------------------------------------------------------- + def write_error(self, content: str, level: int = ERROR): + """记录错误日志""" + if self.strategy: + self.strategy.write_log(msg=content, level=level) + else: + print(content, file=sys.stderr) + + +class CtaTemplate(ABC): + """CTA策略模板""" + + author = "" + parameters = [] + variables = [] + + def __init__( + self, + cta_engine: Any, + strategy_name: str, + vt_symbol: str, + setting: dict, + ): + """""" + self.cta_engine = cta_engine + self.strategy_name = strategy_name + self.vt_symbol = vt_symbol + + self.inited = False + self.trading = False + self.pos = 0 + + # Copy a new variables list here to avoid duplicate insert when multiple + # strategy instances are created with the same strategy class. + self.variables = copy(self.variables) + self.variables.insert(0, "inited") + self.variables.insert(1, "trading") + self.variables.insert(2, "pos") + + self.update_setting(setting) + + def update_setting(self, setting: dict): + """ + Update strategy parameter wtih value in setting dict. + """ + for name in self.parameters: + if name in setting: + setattr(self, name, setting[name]) + + @classmethod + def get_class_parameters(cls): + """ + Get default parameters dict of strategy class. + """ + class_parameters = {} + for name in cls.parameters: + class_parameters[name] = getattr(cls, name) + return class_parameters + + def get_parameters(self): + """ + Get strategy parameters dict. + """ + strategy_parameters = {} + for name in self.parameters: + strategy_parameters[name] = getattr(self, name) + return strategy_parameters + + def get_variables(self): + """ + Get strategy variables dict. + """ + strategy_variables = {} + for name in self.variables: + strategy_variables[name] = getattr(self, name) + return strategy_variables + + def get_data(self): + """ + Get strategy data. + """ + strategy_data = { + "strategy_name": self.strategy_name, + "vt_symbol": self.vt_symbol, + "class_name": self.__class__.__name__, + "author": self.author, + "parameters": self.get_parameters(), + "variables": self.get_variables(), + } + return strategy_data + + @virtual + def on_init(self): + """ + Callback when strategy is inited. + """ + pass + + @virtual + def on_start(self): + """ + Callback when strategy is started. + """ + pass + + @virtual + def on_stop(self): + """ + Callback when strategy is stopped. + """ + pass + + @virtual + def on_tick(self, tick: TickData): + """ + Callback of new tick data update. + """ + pass + + @virtual + def on_bar(self, bar: BarData): + """ + Callback of new bar data update. + """ + pass + + @virtual + def on_trade(self, trade: TradeData): + """ + Callback of new trade data update. + """ + pass + + @virtual + def on_order(self, order: OrderData): + """ + Callback of new order data update. + """ + pass + + @virtual + def on_stop_order(self, stop_order: StopOrder): + """ + Callback of stop order update. + """ + pass + + def buy(self, price: float, volume: float, stop: bool = False, lock: bool = False, vt_symbol: str = ''): + """ + Send buy order to open a long position. + """ + return self.send_order(vt_symbol=vt_symbol, + direction=Direction.LONG, + offset=Offset.OPEN, + price=price, + volume=volume, + stop=stop, + lock=lock) + + def sell(self, price: float, volume: float, stop: bool = False, lock: bool = False, vt_symbol: str = ''): + """ + Send sell order to close a long position. + """ + return self.send_order(vt_symbol=vt_symbol, + direction=Direction.SHORT, + offset=Offset.CLOSE, + price=price, + volume=volume, + stop=stop, + lock=lock) + + def short(self, price: float, volume: float, stop: bool = False, lock: bool = False, vt_symbol: str = ''): + """ + Send short order to open as short position. + """ + return self.send_order(vt_symbol=vt_symbol, + direction=Direction.SHORT, + offset=Offset.OPEN, + price=price, + volume=volume, + stop=stop, + lock=lock) + + def cover(self, price: float, volume: float, stop: bool = False, lock: bool = False, vt_symbol: str = ''): + """ + Send cover order to close a short position. + """ + return self.send_order(vt_symbol=vt_symbol, + direction=Direction.LONG, + offset=Offset.CLOSE, + price=price, + volume=volume, + stop=stop, + lock=lock) + + def send_order( + self, + vt_symbol: str, + direction: Direction, + offset: Offset, + price: float, + volume: float, + stop: bool = False, + lock: bool = False + ): + """ + Send a new order. + """ + # 兼容cta_strategy的模板,缺省不指定vt_symbol时,使用策略配置的vt_symbol + if vt_symbol == '': + vt_symbol = self.vt_symbol + + if self.trading: + vt_orderids = self.cta_engine.send_order( + self, vt_symbol, direction, offset, price, volume, stop, lock + ) + return vt_orderids + else: + return [] + + def cancel_order(self, vt_orderid: str): + """ + Cancel an existing order. + """ + if self.trading: + self.cta_engine.cancel_order(self, vt_orderid) + + def cancel_all(self): + """ + Cancel all orders sent by strategy. + """ + if self.trading: + self.cta_engine.cancel_all(self) + + def write_log(self, msg: str, level: int = INFO): + """ + Write a log message. + """ + self.cta_engine.write_log(msg=msg, strategy_name=self.strategy_name, level=level) + + def get_engine_type(self): + """ + Return whether the cta_engine is backtesting or live trading. + """ + return self.cta_engine.get_engine_type() + + def load_bar( + self, + days: int, + interval: Interval = Interval.MINUTE, + callback: Callable = None, + ): + """ + Load historical bar data for initializing strategy. + """ + if not callback: + callback = self.on_bar + + self.cta_engine.load_bar(self.vt_symbol, days, interval, callback) + + def load_tick(self, days: int): + """ + Load historical tick data for initializing strategy. + """ + self.cta_engine.load_tick(self.vt_symbol, days, self.on_tick) + + def put_event(self): + """ + Put an strategy data event for ui update. + """ + if self.inited: + self.cta_engine.put_strategy_event(self) + + def send_email(self, msg): + """ + Send email to default receiver. + """ + if self.inited: + self.cta_engine.send_email(msg, self) + + def sync_data(self): + """ + Sync strategy variables value into disk storage. + """ + if self.trading: + self.cta_engine.sync_strategy_data(self) + + +class CtaSignal(ABC): + """""" + + def __init__(self): + """""" + self.signal_pos = 0 + + @virtual + def on_tick(self, tick: TickData): + """ + Callback of new tick data update. + """ + pass + + @virtual + def on_bar(self, bar: BarData): + """ + Callback of new bar data update. + """ + pass + + def set_signal_pos(self, pos): + """""" + self.signal_pos = pos + + def get_signal_pos(self): + """""" + return self.signal_pos + + +class TargetPosTemplate(CtaTemplate): + """""" + tick_add = 1 + + last_tick = None + last_bar = None + target_pos = 0 + vt_orderids = [] + + def __init__(self, cta_engine, strategy_name, vt_symbol, setting): + """""" + super(TargetPosTemplate, self).__init__( + cta_engine, strategy_name, vt_symbol, setting + ) + self.variables.append("target_pos") + + @virtual + def on_tick(self, tick: TickData): + """ + Callback of new tick data update. + """ + self.last_tick = tick + + if self.trading: + self.trade() + + @virtual + def on_bar(self, bar: BarData): + """ + Callback of new bar data update. + """ + self.last_bar = bar + + @virtual + def on_order(self, order: OrderData): + """ + Callback of new order data update. + """ + vt_orderid = order.vt_orderid + + if not order.is_active() and vt_orderid in self.vt_orderids: + self.vt_orderids.remove(vt_orderid) + + def set_target_pos(self, target_pos): + """""" + self.target_pos = target_pos + self.trade() + + def trade(self): + """""" + self.cancel_all() + + pos_change = self.target_pos - self.pos + if not pos_change: + return + + long_price = 0 + short_price = 0 + + if self.last_tick: + if pos_change > 0: + long_price = self.last_tick.ask_price_1 + self.tick_add + if self.last_tick.limit_up: + long_price = min(long_price, self.last_tick.limit_up) + else: + short_price = self.last_tick.bid_price_1 - self.tick_add + if self.last_tick.limit_down: + short_price = max(short_price, self.last_tick.limit_down) + + else: + if pos_change > 0: + long_price = self.last_bar.close_price + self.tick_add + else: + short_price = self.last_bar.close_price - self.tick_add + + if self.get_engine_type() == EngineType.BACKTESTING: + if pos_change > 0: + vt_orderids = self.buy(long_price, abs(pos_change)) + else: + vt_orderids = self.short(short_price, abs(pos_change)) + self.vt_orderids.extend(vt_orderids) + + else: + if self.vt_orderids: + return + + if pos_change > 0: + if self.pos < 0: + if pos_change < abs(self.pos): + vt_orderids = self.cover(long_price, pos_change) + else: + vt_orderids = self.cover(long_price, abs(self.pos)) + else: + vt_orderids = self.buy(long_price, abs(pos_change)) + else: + if self.pos > 0: + if abs(pos_change) < self.pos: + vt_orderids = self.sell(short_price, abs(pos_change)) + else: + vt_orderids = self.sell(short_price, abs(self.pos)) + else: + vt_orderids = self.short(short_price, abs(pos_change)) + self.vt_orderids.extend(vt_orderids) diff --git a/vnpy/app/cta_strategy_pro/ui/__init__.py b/vnpy/app/cta_strategy_pro/ui/__init__.py new file mode 100644 index 00000000..592d401a --- /dev/null +++ b/vnpy/app/cta_strategy_pro/ui/__init__.py @@ -0,0 +1 @@ +from .widget import CtaManager diff --git a/vnpy/app/cta_strategy_pro/ui/cta.ico b/vnpy/app/cta_strategy_pro/ui/cta.ico new file mode 100644 index 00000000..25cbaa73 Binary files /dev/null and b/vnpy/app/cta_strategy_pro/ui/cta.ico differ diff --git a/vnpy/app/cta_strategy_pro/ui/widget.py b/vnpy/app/cta_strategy_pro/ui/widget.py new file mode 100644 index 00000000..ab99b68d --- /dev/null +++ b/vnpy/app/cta_strategy_pro/ui/widget.py @@ -0,0 +1,455 @@ +from vnpy.event import Event, EventEngine +from vnpy.trader.engine import MainEngine +from vnpy.trader.ui import QtCore, QtGui, QtWidgets +from vnpy.trader.ui.widget import ( + BaseCell, + EnumCell, + MsgCell, + TimeCell, + BaseMonitor +) +from ..base import ( + APP_NAME, + EVENT_CTA_LOG, + EVENT_CTA_STOPORDER, + EVENT_CTA_STRATEGY +) +from ..engine import CtaEngine + + +class CtaManager(QtWidgets.QWidget): + """""" + + signal_log = QtCore.pyqtSignal(Event) + signal_strategy = QtCore.pyqtSignal(Event) + + def __init__(self, main_engine: MainEngine, event_engine: EventEngine): + super(CtaManager, self).__init__() + + self.main_engine = main_engine + self.event_engine = event_engine + self.cta_engine = main_engine.get_engine(APP_NAME) + + self.managers = {} + + self.init_ui() + self.register_event() + self.cta_engine.init_engine() + self.update_class_combo() + + def init_ui(self): + """""" + self.setWindowTitle("CTA策略") + + # Create widgets + self.class_combo = QtWidgets.QComboBox() + + add_button = QtWidgets.QPushButton("添加策略") + add_button.clicked.connect(self.add_strategy) + + init_button = QtWidgets.QPushButton("全部初始化") + init_button.clicked.connect(self.cta_engine.init_all_strategies) + + start_button = QtWidgets.QPushButton("全部启动") + start_button.clicked.connect(self.cta_engine.start_all_strategies) + + stop_button = QtWidgets.QPushButton("全部停止") + stop_button.clicked.connect(self.cta_engine.stop_all_strategies) + + clear_button = QtWidgets.QPushButton("清空日志") + clear_button.clicked.connect(self.clear_log) + + self.scroll_layout = QtWidgets.QVBoxLayout() + self.scroll_layout.addStretch() + + scroll_widget = QtWidgets.QWidget() + scroll_widget.setLayout(self.scroll_layout) + + scroll_area = QtWidgets.QScrollArea() + scroll_area.setWidgetResizable(True) + scroll_area.setWidget(scroll_widget) + + self.log_monitor = LogMonitor(self.main_engine, self.event_engine) + + self.stop_order_monitor = StopOrderMonitor( + self.main_engine, self.event_engine + ) + + # Set layout + hbox1 = QtWidgets.QHBoxLayout() + hbox1.addWidget(self.class_combo) + hbox1.addWidget(add_button) + hbox1.addStretch() + hbox1.addWidget(init_button) + hbox1.addWidget(start_button) + hbox1.addWidget(stop_button) + hbox1.addWidget(clear_button) + + grid = QtWidgets.QGridLayout() + grid.addWidget(scroll_area, 0, 0, 2, 1) + grid.addWidget(self.stop_order_monitor, 0, 1) + grid.addWidget(self.log_monitor, 1, 1) + + vbox = QtWidgets.QVBoxLayout() + vbox.addLayout(hbox1) + vbox.addLayout(grid) + + self.setLayout(vbox) + + def update_class_combo(self): + """""" + self.class_combo.addItems( + self.cta_engine.get_all_strategy_class_names() + ) + + def register_event(self): + """""" + self.signal_strategy.connect(self.process_strategy_event) + + self.event_engine.register( + EVENT_CTA_STRATEGY, self.signal_strategy.emit + ) + + def process_strategy_event(self, event): + """ + Update strategy status onto its monitor. + """ + data = event.data + strategy_name = data["strategy_name"] + + if strategy_name in self.managers: + manager = self.managers[strategy_name] + manager.update_data(data) + else: + manager = StrategyManager(self, self.cta_engine, data) + self.scroll_layout.insertWidget(0, manager) + self.managers[strategy_name] = manager + + def remove_strategy(self, strategy_name): + """""" + manager = self.managers.pop(strategy_name) + manager.deleteLater() + + def add_strategy(self): + """""" + class_name = str(self.class_combo.currentText()) + if not class_name: + return + + parameters = self.cta_engine.get_strategy_class_parameters(class_name) + editor = SettingEditor(parameters, class_name=class_name) + n = editor.exec_() + + if n == editor.Accepted: + setting = editor.get_setting() + vt_symbol = setting.pop("vt_symbol") + strategy_name = setting.pop("strategy_name") + + self.cta_engine.add_strategy( + class_name, strategy_name, vt_symbol, setting + ) + + def clear_log(self): + """""" + self.log_monitor.setRowCount(0) + + def show(self): + """""" + self.showMaximized() + + +class StrategyManager(QtWidgets.QFrame): + """ + Manager for a strategy + """ + + def __init__( + self, cta_manager: CtaManager, cta_engine: CtaEngine, data: dict + ): + """""" + super(StrategyManager, self).__init__() + + self.cta_manager = cta_manager + self.cta_engine = cta_engine + + self.strategy_name = data["strategy_name"] + self._data = data + + self.init_ui() + + def init_ui(self): + """""" + self.setFixedHeight(300) + self.setFrameShape(self.Box) + self.setLineWidth(1) + + init_button = QtWidgets.QPushButton("初始化") + init_button.clicked.connect(self.init_strategy) + + start_button = QtWidgets.QPushButton("启动") + start_button.clicked.connect(self.start_strategy) + + stop_button = QtWidgets.QPushButton("停止") + stop_button.clicked.connect(self.stop_strategy) + + edit_button = QtWidgets.QPushButton("编辑") + edit_button.clicked.connect(self.edit_strategy) + + remove_button = QtWidgets.QPushButton("移除") + remove_button.clicked.connect(self.remove_strategy) + + reload_button = QtWidgets.QPushButton("重载") + reload_button.clicked.connect(self.reload_strategy) + + strategy_name = self._data["strategy_name"] + vt_symbol = self._data["vt_symbol"] + class_name = self._data["class_name"] + author = self._data["author"] + + label_text = ( + f"{strategy_name} - {vt_symbol} ({class_name} by {author})" + ) + label = QtWidgets.QLabel(label_text) + label.setAlignment(QtCore.Qt.AlignCenter) + + self.parameters_monitor = DataMonitor(self._data["parameters"]) + self.variables_monitor = DataMonitor(self._data["variables"]) + + hbox = QtWidgets.QHBoxLayout() + hbox.addWidget(init_button) + hbox.addWidget(start_button) + hbox.addWidget(stop_button) + hbox.addWidget(edit_button) + hbox.addWidget(remove_button) + hbox.addWidget(reload_button) + + vbox = QtWidgets.QVBoxLayout() + vbox.addWidget(label) + vbox.addLayout(hbox) + vbox.addWidget(self.parameters_monitor) + vbox.addWidget(self.variables_monitor) + self.setLayout(vbox) + + def update_data(self, data: dict): + """""" + self._data = data + + self.parameters_monitor.update_data(data["parameters"]) + self.variables_monitor.update_data(data["variables"]) + + def init_strategy(self): + """""" + self.cta_engine.init_strategy(self.strategy_name) + + def start_strategy(self): + """""" + self.cta_engine.start_strategy(self.strategy_name) + + def stop_strategy(self): + """""" + self.cta_engine.stop_strategy(self.strategy_name) + + def edit_strategy(self): + """""" + strategy_name = self._data["strategy_name"] + + parameters = self.cta_engine.get_strategy_parameters(strategy_name) + editor = SettingEditor(parameters, strategy_name=strategy_name) + n = editor.exec_() + + if n == editor.Accepted: + setting = editor.get_setting() + self.cta_engine.edit_strategy(strategy_name, setting) + + def remove_strategy(self): + """""" + result = self.cta_engine.remove_strategy(self.strategy_name) + + # Only remove strategy gui manager if it has been removed from engine + if result: + self.cta_manager.remove_strategy(self.strategy_name) + + def reload_strategy(self): + """重新加载策略""" + self.cta_engine.reload_strategy(self.strategy_name) + + +class DataMonitor(QtWidgets.QTableWidget): + """ + Table monitor for parameters and variables. + """ + + def __init__(self, data: dict): + """""" + super(DataMonitor, self).__init__() + + self._data = data + self.cells = {} + + self.init_ui() + + def init_ui(self): + """""" + labels = list(self._data.keys()) + self.setColumnCount(len(labels)) + self.setHorizontalHeaderLabels(labels) + + self.setRowCount(1) + self.verticalHeader().setSectionResizeMode( + QtWidgets.QHeaderView.Stretch + ) + self.verticalHeader().setVisible(False) + self.setEditTriggers(self.NoEditTriggers) + + for column, name in enumerate(self._data.keys()): + value = self._data[name] + + cell = QtWidgets.QTableWidgetItem(str(value)) + cell.setTextAlignment(QtCore.Qt.AlignCenter) + + self.setItem(0, column, cell) + self.cells[name] = cell + + def update_data(self, data: dict): + """""" + for name, value in data.items(): + cell = self.cells[name] + cell.setText(str(value)) + + +class StopOrderMonitor(BaseMonitor): + """ + Monitor for local stop order. + """ + + event_type = EVENT_CTA_STOPORDER + data_key = "stop_orderid" + sorting = True + + headers = { + "stop_orderid": { + "display": "停止委托号", + "cell": BaseCell, + "update": False, + }, + "vt_orderids": {"display": "限价委托号", "cell": BaseCell, "update": True}, + "vt_symbol": {"display": "本地代码", "cell": BaseCell, "update": False}, + "direction": {"display": "方向", "cell": EnumCell, "update": False}, + "offset": {"display": "开平", "cell": EnumCell, "update": False}, + "price": {"display": "价格", "cell": BaseCell, "update": False}, + "volume": {"display": "数量", "cell": BaseCell, "update": False}, + "status": {"display": "状态", "cell": EnumCell, "update": True}, + "lock": {"display": "锁仓", "cell": BaseCell, "update": False}, + "strategy_name": {"display": "策略名", "cell": BaseCell, "update": False}, + } + + +class LogMonitor(BaseMonitor): + """ + Monitor for log data. + """ + + event_type = EVENT_CTA_LOG + data_key = "" + sorting = False + + headers = { + "time": {"display": "时间", "cell": TimeCell, "update": False}, + "msg": {"display": "信息", "cell": MsgCell, "update": False}, + } + + def init_ui(self): + """ + Stretch last column. + """ + super(LogMonitor, self).init_ui() + + self.horizontalHeader().setSectionResizeMode( + 1, QtWidgets.QHeaderView.Stretch + ) + + def insert_new_row(self, data): + """ + Insert a new row at the top of table. + """ + super(LogMonitor, self).insert_new_row(data) + self.resizeRowToContents(0) + + +class SettingEditor(QtWidgets.QDialog): + """ + For creating new strategy and editing strategy parameters. + """ + + def __init__( + self, parameters: dict, strategy_name: str = "", class_name: str = "" + ): + """""" + super(SettingEditor, self).__init__() + + self.parameters = parameters + self.strategy_name = strategy_name + self.class_name = class_name + + self.edits = {} + + self.init_ui() + + def init_ui(self): + """""" + form = QtWidgets.QFormLayout() + + # Add vt_symbol and name edit if add new strategy + if self.class_name: + self.setWindowTitle(f"添加策略:{self.class_name}") + button_text = "添加" + parameters = {"strategy_name": "", "vt_symbol": ""} + parameters.update(self.parameters) + else: + self.setWindowTitle(f"参数编辑:{self.strategy_name}") + button_text = "确定" + parameters = self.parameters + + for name, value in parameters.items(): + type_ = type(value) + + edit = QtWidgets.QLineEdit(str(value)) + if type_ is int: + validator = QtGui.QIntValidator() + edit.setValidator(validator) + elif type_ is float: + validator = QtGui.QDoubleValidator() + edit.setValidator(validator) + + form.addRow(f"{name} {type_}", edit) + + self.edits[name] = (edit, type_) + + button = QtWidgets.QPushButton(button_text) + button.clicked.connect(self.accept) + form.addRow(button) + + self.setLayout(form) + + def get_setting(self): + """""" + setting = {} + + if self.class_name: + setting["class_name"] = self.class_name + + for name, tp in self.edits.items(): + edit, type_ = tp + value_text = edit.text() + + if type_ == bool: + if value_text == "True": + value = True + else: + value = False + else: + value = type_(value_text) + + setting[name] = value + + return setting