[Add] spread trading algo and engine

This commit is contained in:
vn.py 2019-09-15 17:44:08 +08:00
parent 0f4402833d
commit 0ad86c8638
8 changed files with 591 additions and 43 deletions

View File

@ -5,7 +5,7 @@ from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
# from vnpy.gateway.binance import BinanceGateway
# from vnpy.gateway.bitmex import BitmexGateway
from vnpy.gateway.bitmex import BitmexGateway
# from vnpy.gateway.futu import FutuGateway
# from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
@ -38,6 +38,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.risk_manager import RiskManagerApp
from vnpy.app.script_trader import ScriptTraderApp
from vnpy.app.rpc_service import RpcServiceApp
from vnpy.app.spread_trading import SpreadTradingApp
def main():
@ -57,7 +58,7 @@ def main():
# main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway)
# main_engine.add_gateway(BitmexGateway)
main_engine.add_gateway(BitmexGateway)
# main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(OesGateway)
# main_engine.add_gateway(OkexGateway)
@ -74,14 +75,15 @@ def main():
# main_engine.add_gateway(DaGateway)
main_engine.add_gateway(CoinbaseGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CtaStrategyApp)
# main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CsvLoaderApp)
# main_engine.add_app(AlgoTradingApp)
# main_engine.add_app(DataRecorderApp)
# main_engine.add_app(RiskManagerApp)
main_engine.add_app(ScriptTraderApp)
main_engine.add_app(RpcServiceApp)
# main_engine.add_app(ScriptTraderApp)
# main_engine.add_app(RpcServiceApp)
main_engine.add_app(SpreadTradingApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()

View File

@ -5,7 +5,7 @@ from vnpy.trader.app import BaseApp
from .engine import SpreadEngine, APP_NAME
class AlgoTradingApp(BaseApp):
class SpreadTradingApp(BaseApp):
""""""
app_name = APP_NAME

View File

@ -0,0 +1,132 @@
from typing import Any
from math import floor, ceil
from vnpy.trader.constant import Direction
from vnpy.trader.object import (TickData, OrderData, TradeData)
from .template import SpreadAlgoTemplate
from .base import SpreadData
class SpreadTakerAlgo(SpreadAlgoTemplate):
""""""
algo_name = "SpreadTaker"
def __init__(
self,
algo_engine: Any,
algoid: str,
spread: SpreadData,
direction: Direction,
price: float,
volume: float
):
""""""
super().__init__(algo_engine, algoid, spread, direction, price, volume)
self.cancel_interval: int = 2
self.timer_count: int = 0
def on_tick(self, tick: TickData):
""""""
# Return if there are any existing orders
if not self.check_order_finished():
return
# Hedge if active leg is not fully hedged
if not self.check_hedge_finished():
self.hedge_passive_legs()
return
# Otherwise check if should take active leg
if self.direction == Direction.LONG:
if self.spread.ask_price <= self.price:
self.take_active_leg()
else:
if self.spread.bid_price >= self.price:
self.take_active_leg()
def on_order(self, order: OrderData):
""""""
# Only care active leg order update
if order.vt_symbol != self.spread.active_leg.vt_symbol:
return
# Do nothing if still any existing orders
if not self.check_order_finished():
return
# Hedge passive legs if necessary
if not self.check_hedge_finished():
self.hedge_passive_legs()
def on_trade(self, trade: TradeData):
""""""
pass
def on_interval(self):
""""""
if not self.check_order_finished():
self.cancel_all_order()
def take_active_leg(self):
""""""
# Calculate spread order volume of new round trade
spread_volume_left = self.target - self.traded
if self.direction == Direction.LONG:
spread_order_volume = self.spread.ask_volume
spread_order_volume = min(spread_order_volume, spread_volume_left)
else:
spread_order_volume = -self.spread.bid_volume
spread_order_volume = max(spread_order_volume, spread_volume_left)
# Calculate active leg order volume
leg_order_volume = self.spread.caculate_leg_volume(
self.spread.active_leg.vt_symbol,
spread_order_volume
)
# Send active leg order
self.send_leg_order(
self.spread.active_leg.vt_symbol,
leg_order_volume
)
def hedge_passive_legs(self):
"""
Send orders to hedge all passive legs.
"""
# Calcualte spread volume to hedge
active_leg = self.spread.active_leg
active_traded = self.leg_traded[active_leg.vt_symbol]
hedge_volume = self.spread.calculate_spread_volume(
active_leg.vt_symbol,
active_traded
)
# Calculate passive leg target volume and do hedge
for leg in self.spread.passive_legs:
passive_traded = self.leg_orders[leg.vt_symbol]
passive_target = self.spread.caculate_leg_volume(
leg.vt_symbol,
hedge_volume
)
leg_order_volume = passive_target - passive_traded
if leg_order_volume:
self.send_leg_order(leg.vt_symbol, leg_order_volume)
def send_leg_order(self, vt_symbol: str, leg_volume: float):
""""""
leg = self.spread.legs[vt_symbol]
leg_tick = self.get_tick(vt_symbol)
leg_contract = self.get_contract(vt_symbol)
if leg_volume > 0:
price = leg_tick.ask_price_1 + leg_contract.pricetick * self.payup
self.send_long_order(leg.vt_symbol, price, abs(leg_volume))
else:
price = leg_tick.bid_price_1 - leg_contract.pricetick * self.payup
self.send_short_order(leg.vt_symbol, price, abs(leg_volume))

View File

@ -6,6 +6,12 @@ from vnpy.trader.object import TickData, PositionData
from vnpy.trader.constant import Direction
EVENT_SPREAD_DATA = "eSpreadData"
EVENT_SPREAD_LOG = "eSpreadLog"
EVENT_SPREAD_ALGO = "eSpreadAlgo"
EVENT_SPREAD_STRATEGY = "eSpreadStrategy"
class LegData:
""""""
@ -69,6 +75,9 @@ class SpreadData:
self.active_leg: LegData = None
self.passive_legs: List[LegData] = []
self.price_formula: str = ""
self.trading_formula: str = ""
for leg in legs:
self.legs[leg.vt_symbol] = leg
if leg.vt_symbol == active_symbol:
@ -76,6 +85,16 @@ class SpreadData:
else:
self.passive_legs.append(leg)
if leg.price_multiplier > 0:
self.price_formula += f"+{leg.trading_multiplier}*{leg.vt_symbol}"
else:
self.price_formula += f"{leg.trading_multiplier}*{leg.vt_symbol}"
if leg.trading_multiplier > 0:
self.trading_formula += f"+{leg.trading_multiplier}*{leg.vt_symbol}"
else:
self.trading_formula += f"{leg.trading_multiplier}*{leg.vt_symbol}"
# Spread data
self.bid_price: float = 0
self.ask_price: float = 0
@ -154,3 +173,21 @@ class SpreadData:
self.ask_price = 0
self.bid_volume = 0
self.ask_volume = 0
def calculate_leg_volume(self, vt_symbol: str, spread_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
leg_volume = spread_volume * leg.trading_multiplier
return leg_volume
def calculate_spread_volume(self, vt_symbol: str, leg_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
spread_volume = leg_volume / leg.trading_multiplier
if spread_volume > 0:
spread_volume = floor(spread_volume)
else:
spread_volume = ceil(spread_volume)
return spread_volume

View File

@ -1,13 +1,27 @@
from typing import List, Dict
from collections import defaultdict
from copy import copy
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.event import EVENT_TICK, EVENT_POSITION
from vnpy.trader.event import (
EVENT_TICK, EVENT_POSITION, EVENT_CONTRACT,
EVENT_ORDER, EVENT_TRADE, EVENT_TIMER
)
from vnpy.trader.utility import load_json, save_json
from vnpy.trader.object import TickData, ContractData
from vnpy.trader.object import (
TickData, ContractData, LogData,
SubscribeRequest, OrderRequest, CancelRequest
)
from vnpy.trader.constant import Direction
from .base import LegData, SpreadData
from .base import (
LegData, SpreadData,
EVENT_SPREAD_DATA, EVENT_SPREAD_ALGO,
EVENT_SPREAD_LOG, EVENT_SPREAD_STRATEGY
)
from .template import SpreadAlgoTemplate
from .algo import SpreadTakerAlgo
APP_NAME = "SpreadTrading"
@ -20,9 +34,28 @@ class SpreadEngine(BaseEngine):
"""Constructor"""
super().__init__(main_engine, event_engine, APP_NAME)
self.active = False
self.data_engine: SpreadDataEngine = SpreadDataEngine(self)
self.algo_engine: SpreadAlgoEngine = SpreadAlgoEngine(self)
def start(self):
""""""
if self.active:
return
self.active = True
self.data_engine.start()
self.algo_engine.start()
def write_log(self, msg: str):
""""""
pass
log = LogData(
msg=msg,
gateway_name=APP_NAME
)
event = Event(EVENT_SPREAD_LOG, log)
self.event_engine.put(event)
class SpreadDataEngine:
@ -41,10 +74,33 @@ class SpreadDataEngine:
self.spreads: Dict[str, SpreadData] = {} # name: spread
self.symbol_spread_map: Dict[str, List[SpreadData]] = defaultdict(list)
def start(self):
""""""
self.load_setting()
self.register_event()
self.test()
def load_setting(self):
self.write_log("价差数据引擎启动成功")
def test(self):
""""""
name = "test"
leg_settings = [
{
"vt_symbol": "XBTUSD.BITMEX",
"price_multiplier": 1,
"trading_multiplier": 1
},
{
"vt_symbol": "XBTZ19.BITMEX",
"price_multiplier": -1,
"trading_multiplier": -1
}
]
active_symbol = "XBTUSD.BITMEX"
self.add_spread(name, leg_settings, active_symbol, True)
def load_setting(self) -> None:
""""""
setting = load_json(self.setting_filename)
@ -56,13 +112,13 @@ class SpreadDataEngine:
save=False
)
def save_setting(self):
def save_setting(self) -> None:
""""""
setting = []
for spread in self.spreads.values():
leg_settings = []
for leg in spread.legs:
for leg in spread.legs.values():
leg_setting = {
"vt_symbol": leg.vt_symbol,
"price_multiplier": leg.price_multiplier,
@ -79,12 +135,13 @@ class SpreadDataEngine:
save_json(self.setting_filename, setting)
def register_event(self):
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
def process_tick_event(self, event: Event):
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
@ -96,7 +153,9 @@ class SpreadDataEngine:
for spread in self.symbol_spread_map[tick.vt_symbol]:
spread.calculate_price()
def process_position_event(self, event: Event):
self.put_data_event(spread)
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
@ -108,13 +167,30 @@ class SpreadDataEngine:
for spread in self.symbol_spread_map[position.vt_symbol]:
spread.calculate_pos()
self.put_data_event(spread)
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
if contract.vt_symbol in self.legs:
req = SubscribeRequest(
contract.symbol, contract.exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
def put_data_event(self, spread: SpreadData) -> None:
""""""
event = Event(EVENT_SPREAD_DATA, spread)
self.event_engine.put(event)
def add_spread(
self,
name: str,
leg_settings: List[Dict],
active_symbol: str,
save: bool = True
):
) -> None:
""""""
if name in self.spreads:
self.write_log("价差创建失败,名称重复:{}".format(name))
@ -138,15 +214,16 @@ class SpreadDataEngine:
spread = SpreadData(name, legs, active_symbol)
self.spreads[name] = spread
for leg in spread.legs:
for leg in spread.legs.values():
self.symbol_spread_map[leg.vt_symbol].append(spread)
if save:
self.save_setting()
self.write_log("价差创建成功:{}".format(name))
self.put_data_event(spread)
def remove_spread(self, name: str):
def remove_spread(self, name: str) -> None:
""""""
if name not in self.spreads:
return
@ -161,6 +238,7 @@ class SpreadDataEngine:
class SpreadAlgoEngine:
""""""
algo_class = SpreadTakerAlgo
def __init__(self, spread_engine: SpreadEngine):
""""""
@ -170,23 +248,146 @@ class SpreadAlgoEngine:
self.write_log = spread_engine.write_log
def put_event(self, algo) -> None:
self.spreads: Dict[str: SpreadData] = {}
self.algos: Dict[str: SpreadAlgoTemplate] = {}
self.order_algo_map: dict[str: SpreadAlgoTemplate] = {}
self.symbol_algo_map: dict[str: SpreadAlgoTemplate] = defaultdict(list)
self.algo_count: int = 0
def start(self):
""""""
pass
self.register_event()
self.write_log("价差算法引擎启动成功")
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
def process_spread_event(self, event: Event):
""""""
spread: SpreadData = event.data
self.spreads[spread.name] = spread
def process_tick_event(self, event: Event):
""""""
tick = event.data
algos = self.symbol_algo_map[tick.vt_symbol]
if not algos:
return
buf = copy(algos)
for algo in buf:
if not algo.is_active():
algos.remove(algo)
else:
algo.update_tick(tick)
def process_order_event(self, event: Event):
""""""
order = event.data
algo = self.order_algo_map.get(order.vt_orderid, None)
if algo and algo.is_active():
algo.update_order(order)
def process_trade_event(self, event: Event):
""""""
trade = event.data
algo = self.order_algo_map.get(trade.vt_orderid, None)
if algo and algo.is_active():
algo.update_trade(trade)
def process_timer_event(self, event: Event):
""""""
buf = self.algos.values()
for algo in buf:
if not algo.is_active():
self.algos.pop(algo.algoid)
else:
algo.update_timer()
def start_algo(
self,
spread_name: str,
direction: Direction,
price: float,
volume: float,
payup: int,
interval: int
) -> str:
# Find spread object
spread = self.spreads.get(spread_name, None)
if not spread:
self.write_log("创建价差算法失败,找不到价差:{}".format(spread_name))
return ""
# Generate algoid str
self.algo_count += 1
algo_count_str = str(self.algo_count).rjust(6, "0")
algoid = f"{self.algo_class.algo_name}_{algo_count_str}"
# Create algo object
algo = self.algo_class(
self,
algoid,
spread,
direction,
price,
volume,
payup,
interval
)
self.algos[algoid] = algo
# Generate map between vt_symbol and algo
for leg in spread.legs.values():
self.symbol_algo_map[leg.vt_symbol].append(algo)
# Put event to update GUI
self.put_algo_event(algo)
return algoid
def stop_algo(
self,
algoid: str
):
""""""
algo = self.algos.get(algoid, None)
if not algo:
self.write_log("停止价差算法失败,找不到算法:{}".format(algoid))
return
algo.stop()
def put_algo_event(self, algo: SpreadAlgoTemplate) -> None:
""""""
event = Event(EVENT_SPREAD_ALGO, algo)
self.event_engine.put(event)
def write_algo_log(self, algo: SpreadAlgoTemplate, msg: str) -> None:
""""""
msg = f"{algo.algoid}{msg}"
self.write_log(msg)
def send_order(
self,
algo,
vt_symbol,
price,
volume,
direction,
offset
algo: SpreadAlgoTemplate,
vt_symbol: str,
price: float,
volume: float,
direction: Direction,
) -> List[str]:
""""""
pass
def cancel_order(self, algo, vt_orderid) -> None:
def cancel_order(self, algo: SpreadAlgoTemplate, vt_orderid: str) -> None:
""""""
pass

View File

@ -4,11 +4,10 @@ from typing import Dict, List
from math import floor, ceil
from vnpy.trader.object import TickData, TradeData, OrderData, ContractData
from vnpy.trader.constant import Direction, Status, Offset
from vnpy.trader.constant import Direction, Status
from vnpy.trader.utility import virtual
from .base import SpreadData
from .engine import SpreadAlgoEngine
class SpreadAlgoTemplate:
@ -19,31 +18,37 @@ class SpreadAlgoTemplate:
def __init__(
self,
algo_engine: SpreadAlgoEngine,
algo_engine,
algoid: str,
spread: SpreadData,
direction: Direction,
price: float,
volume: float,
payup: int
payup: int,
interval: int
):
""""""
self.algo_engine: SpreadAlgoEngine = algo_engine
self.algo_engine = algo_engine
self.algoid: str = algoid
self.spread: SpreadData = spread
self.spread_name: str = spread.name
self.direction: Direction = direction
self.price: float = price
self.volume: float = volume
self.payup: int = payup
self.interval = interval
if direction == Direction.LONG:
self.target = volume
else:
self.target = -volume
self.status: Status = Status.NOTTRADED
self.traded: float = 0
self.status: Status = Status.NOTTRADED # Algo status
self.count: int = 0 # Timer count
self.traded: float = 0 # Volume traded
self.traded_volume: float = 0 # Volume traded (Abs value)
self.leg_traded: Dict[str, float] = defaultdict(int)
self.leg_orders: Dict[str, List[str]] = defaultdict[list]
@ -55,12 +60,50 @@ class SpreadAlgoTemplate:
else:
return False
def check_order_finished(self):
""""""
finished = True
for leg in self.spread.legs.values():
vt_orderids = self.leg_orders[leg.vt_symbol]
if vt_orderids:
finished = False
break
return finished
def check_hedge_finished(self):
""""""
active_symbol = self.spread.active_leg.vt_symbol
active_traded = self.leg_traded[active_symbol]
spread_volume = self.spread.calculate_spread_volume(
active_symbol, active_traded
)
finished = True
for leg in self.spread.passive_legs:
passive_symbol = leg.vt_symbol
leg_target = self.spread.calculate_leg_volume(
passive_symbol, spread_volume
)
leg_traded = self.leg_traded[passive_symbol]
if leg_traded != leg_target:
finished = False
break
return finished
def stop(self):
""""""
if self.is_active():
self.cancel_leg_order()
self.cancel_all_order()
self.status = Status.CANCELLED
self.put_event()
self.put_algo_event()
def update_tick(self, tick: TickData):
""""""
@ -86,7 +129,12 @@ class SpreadAlgoTemplate:
def update_timer(self):
""""""
self.on_timer()
self.count += 1
if self.count < self.interval:
return
self.count = 0
self.on_interval()
def put_event(self):
""""""
@ -94,7 +142,7 @@ class SpreadAlgoTemplate:
def write_log(self, msg: str):
""""""
self.algo_engine.write_log(msg)
self.algo_engine.write_algo_log(msg)
def send_long_order(self, vt_symbol: str, price: float, volume: float):
""""""
@ -153,6 +201,8 @@ class SpreadAlgoTemplate:
else:
self.traded = max(self.traded, adjusted_leg_traded)
self.traded_volume = abs(self.traded)
if self.traded == self.target:
self.status = Status.ALLTRADED
elif not self.traded:
@ -184,6 +234,6 @@ class SpreadAlgoTemplate:
pass
@virtual
def on_timer(self):
def on_interval(self):
""""""
pass

View File

@ -5,11 +5,21 @@ Widget for spread trading.
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import QtWidgets, QtCore
from vnpy.trader.ui.widget import (
BaseMonitor, BaseCell,
BidCell, AskCell,
TimeCell, MsgCell,
PnlCell, DirectionCell,
EnumCell,
)
from ..engine import (
AlgoEngine,
AlgoTemplate,
SpreadEngine,
APP_NAME,
EVENT_SPREAD_DATA,
EVENT_SPREAD_LOG,
EVENT_SPREAD_ALGO,
EVENT_SPREAD_STRATEGY
)
@ -23,3 +33,116 @@ class SpreadManager(QtWidgets.QWidget):
self.main_engine = main_engine
self.event_engine = event_engine
self.spread_engine = main_engine.get_engine(APP_NAME)
self.init_ui()
def init_ui(self):
""""""
self.setWindowTitle("价差交易")
self.data_monitor = SpreadDataMonitor(
self.main_engine,
self.event_engine
)
self.log_monitor = SpreadLogMonitor(
self.main_engine,
self.event_engine
)
self.algo_monitor = SpreadAlgoMonitor(
self.main_engine,
self.event_engine
)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(self.data_monitor)
vbox.addWidget(self.log_monitor)
hbox = QtWidgets.QHBoxLayout()
hbox.addLayout(vbox)
hbox.addWidget(self.algo_monitor)
self.setLayout(hbox)
def show(self):
""""""
self.spread_engine.start()
self.showMaximized()
class SpreadDataMonitor(BaseMonitor):
"""
Monitor for spread data.
"""
event_type = EVENT_SPREAD_DATA
data_key = "name"
sorting = False
headers = {
"name": {"display": "名称", "cell": BaseCell, "update": False},
"price_formula": {"display": "定价", "cell": BaseCell, "update": False},
"trading_formula": {"display": "交易", "cell": BaseCell, "update": False},
"bid_volume": {"display": "买量", "cell": BidCell, "update": True},
"bid_price": {"display": "买价", "cell": BidCell, "update": True},
"ask_price": {"display": "卖价", "cell": AskCell, "update": True},
"ask_volume": {"display": "卖量", "cell": AskCell, "update": True},
"net_pos": {"display": "净仓", "cell": PnlCell, "update": True},
"datetime": {"display": "时间", "cell": TimeCell, "update": True},
}
class SpreadLogMonitor(QtWidgets.QTextEdit):
"""
Monitor for log data.
"""
signal = QtCore.pyqtSignal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.init_ui()
self.register_event()
def init_ui(self):
""""""
self.setReadOnly(True)
def register_event(self):
""""""
self.signal.connect(self.process_log_event)
self.event_engine.register(EVENT_SPREAD_LOG, self.signal.emit)
def process_log_event(self, event: Event):
""""""
log = event.data
msg = f"{log.time}{log.msg}"
self.append(msg)
class SpreadAlgoMonitor(BaseMonitor):
"""
Monitor for algo status.
"""
event_type = EVENT_SPREAD_ALGO
data_key = "algoid"
sorting = False
headers = {
"algoid": {"display": "算法", "cell": BaseCell, "update": False},
"spread_name": {"display": "价差", "cell": BaseCell, "update": False},
"direction": {"display": "方向", "cell": EnumCell, "update": False},
"price": {"display": "价格", "cell": BaseCell, "update": False},
"payup": {"display": "超价", "cell": BaseCell, "update": False},
"volume": {"display": "数量", "cell": BaseCell, "update": False},
"traded_volume": {"display": "成交", "cell": BaseCell, "update": True},
"interval": {"display": "间隔", "cell": BaseCell, "update": False},
"count": {"display": "计数", "cell": BaseCell, "update": True},
"status": {"display": "状态", "cell": EnumCell, "update": True},
}

View File

@ -156,6 +156,9 @@ class TimeCell(BaseCell):
"""
Time format is 12:12:12.5
"""
if content is None:
return
timestamp = content.strftime("%H:%M:%S")
millisecond = int(content.microsecond / 1000)