[Add] spread price and pos calculation

This commit is contained in:
vn.py 2019-09-14 13:20:09 +08:00
parent 5a9dfa747d
commit 98912462f6
2 changed files with 301 additions and 0 deletions

View File

@ -0,0 +1,156 @@
from typing import Dict, List
from math import floor, ceil
from datetime import datetime
from vnpy.trader.object import TickData, PositionData
from vnpy.trader.constant import Direction
class LegData:
""""""
def __init__(
self,
vt_symbol: str,
price_multiplier: float,
trading_multiplier: float
):
""""""
self.vt_symbol: str = vt_symbol
# For calculating spread price
self.price_multiplier: float = price_multiplier
# For calculating spread pos and sending orders
self.trading_multiplier: float = trading_multiplier
# Price and position data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.long_pos: float = 0
self.short_pos: float = 0
self.net_pos: float = 0
def update_tick(self, tick: TickData):
""""""
self.bid_price = tick.bid_price_1
self.ask_price = tick.ask_price_1
self.bid_volume = tick.bid_volume_1
self.ask_volume = tick.ask_volume_1
def update_position(self, position: PositionData):
""""""
if position.direction == Direction.NET:
self.net_pos = position.volume
else:
if position.direction == Direction.LONG:
self.long_pos = position.volume
else:
self.short_pos = position.volume
self.net_pos = self.long_pos - self.short_pos
class SpreadData:
""""""
def __init__(
self,
name: str,
legs: List[LegData],
active_symbol: str
):
""""""
self.name: str = name
self.legs: Dict[str, LegData] = {}
self.active_leg: LegData = None
self.passive_legs: List[LegData] = []
for leg in legs:
self.legs[leg.vt_symbol] = leg
if leg.vt_symbol == active_symbol:
self.active_leg = leg
else:
self.passive_legs.append(leg)
# Spread data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.net_pos: float = 0
self.datetime: datetime = None
def calculate_price(self):
""""""
self.clear_price()
# Go through all legs to calculate price
for n, leg in enumerate(self.legs.values()):
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return
# Calculate price
if leg.price_multiplier > 0:
self.bid_price += leg.bid_price * leg.price_multiplier
self.ask_price += leg.ask_price * leg.price_multiplier
else:
self.bid_price += leg.ask_price * leg.price_multiplier
self.ask_price += leg.bid_price * leg.price_multiplier
# Calculate volume
if leg.trading_multiplier > 0:
adjusted_bid_volume = floor(
leg.bid_volume / leg.trading_multiplier)
adjusted_ask_volume = floor(
leg.ask_volume / leg.trading_multiplier)
else:
adjusted_bid_volume = floor(
leg.ask_volume / abs(leg.trading_multiplier))
adjusted_ask_volume = floor(
leg.bid_volume / abs(leg.trading_multiplier))
# For the first leg, just initialize
if not n:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Update calculate time
self.datetime = datetime.now()
def calculate_pos(self):
""""""
self.net_pos = 0
for n, leg in enumerate(self.legs.values()):
adjusted_net_pos = leg.net_pos / leg.trading_multiplier
if adjusted_net_pos > 0:
adjusted_net_pos = floor(adjusted_net_pos)
else:
adjusted_net_pos = ceil(adjusted_net_pos)
if not n:
self.net_pos = adjusted_net_pos
else:
if adjusted_net_pos > 0:
self.net_pos = min(self.net_pos, adjusted_net_pos)
else:
self.net_pos = max(self.net_pos, adjusted_net_pos)
def clear_price(self):
""""""
self.bid_price = 0
self.ask_price = 0
self.bid_volume = 0
self.ask_volume = 0

View File

@ -1,6 +1,13 @@
from typing import List, Dict
from collections import defaultdict
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.event import EVENT_TICK, EVENT_POSITION
from vnpy.trader.utility import load_json, save_json
from .base import LegData, SpreadData
APP_NAME = "SpreadTrading"
@ -11,3 +18,141 @@ class SpreadEngine(BaseEngine):
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
"""Constructor"""
super().__init__(main_engine, event_engine, APP_NAME)
def write_log(self, msg: str):
""""""
pass
class SpreadDataEngine:
""""""
setting_filename = "spread_trading_setting.json"
def __init__(self, spread_engine: SpreadEngine):
""""""
self.spread_engine: SpreadEngine = spread_engine
self.main_engine: MainEngine = spread_engine.main_engine
self.event_engine: EventEngine = spread_engine.event_engine
self.write_log = spread_engine.write_log
self.legs: Dict[str, LegData] = {} # vt_symbol: leg
self.spreads: Dict[str, SpreadData] = {} # name: spread
self.symbol_spread_map: Dict[str, List[SpreadData]] = defaultdict(list)
self.load_setting()
self.register_event()
def load_setting(self):
""""""
setting = load_json(self.setting_filename)
for spread_setting in setting:
self.add_spread(
spread_setting["name"],
spread_setting["leg_settings"],
spread_setting["active_symbol"],
save=False
)
def save_setting(self):
""""""
setting = []
for spread in self.spreads.values():
leg_settings = []
for leg in spread.legs:
leg_setting = {
"vt_symbol": leg.vt_symbol,
"price_multiplier": leg.price_multiplier,
"trading_multiplier": leg.trading_multiplier
}
leg_settings.append(leg_setting)
spread_setting = {
"name": spread.name,
"leg_settings": leg_settings,
"active_symbol": spread.active_leg.vt_symbol
}
setting.append(spread_setting)
save_json(self.setting_filename, setting)
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
def process_tick_event(self, event: Event):
""""""
tick = event.data
leg = self.legs.get(tick.vt_symbol, None)
if not leg:
return
leg.update_tick(tick)
for spread in self.symbol_spread_map[tick.vt_symbol]:
spread.calculate_price()
def process_position_event(self, event: Event):
""""""
position = event.data
leg = self.legs.get(position.vt_symbol, None)
if not leg:
return
leg.update_position(position)
for spread in self.symbol_spread_map[position.vt_symbol]:
spread.calculate_pos()
def add_spread(
self,
name: str,
leg_settings: List[Dict],
active_symbol: str,
save: bool = True
):
""""""
if name in self.spreads:
self.write_log("价差创建失败,名称重复:{}".format(name))
return
legs: List[LegData] = []
for leg_setting in leg_settings:
vt_symbol = leg_setting["vt_symbol"]
leg = self.legs.get(vt_symbol, None)
if not leg:
leg = LegData(
vt_symbol,
leg_setting["price_multiplier"],
leg_setting["trading_multiplier"]
)
self.legs[vt_symbol] = leg
legs.append(leg)
spread = SpreadData(name, legs, active_symbol)
self.spreads[name] = spread
for leg in spread.legs:
self.symbol_spread_map[leg.vt_symbol].append(spread)
if save:
self.save_setting()
self.write_log("价差创建成功:{}".format(name))
def remove_spread(self, name: str):
""""""
if name not in self.spreads:
return
spread = self.spreads.pop(name)
for leg in spread.legs:
self.symbol_spread_map[leg.vt_symbol].remove(spread)
self.write_log("价差删除成功:{}".format(name))