Merge pull request #2093 from vnpy/dev-spreadtrading

Dev spreadtrading
This commit is contained in:
vn.py 2019-09-17 22:37:35 +08:00 committed by GitHub
commit c06240f8f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 2976 additions and 7 deletions

View File

@ -5,7 +5,7 @@ from vnpy.trader.engine import MainEngine
from vnpy.trader.ui import MainWindow, create_qapp
# from vnpy.gateway.binance import BinanceGateway
# from vnpy.gateway.bitmex import BitmexGateway
from vnpy.gateway.bitmex import BitmexGateway
# from vnpy.gateway.futu import FutuGateway
# from vnpy.gateway.ib import IbGateway
# from vnpy.gateway.ctp import CtpGateway
@ -38,6 +38,7 @@ from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.risk_manager import RiskManagerApp
from vnpy.app.script_trader import ScriptTraderApp
from vnpy.app.rpc_service import RpcServiceApp
from vnpy.app.spread_trading import SpreadTradingApp
def main():
@ -57,7 +58,7 @@ def main():
# main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway)
# main_engine.add_gateway(BitmexGateway)
main_engine.add_gateway(BitmexGateway)
# main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(OesGateway)
# main_engine.add_gateway(OkexGateway)
@ -75,13 +76,14 @@ def main():
main_engine.add_gateway(CoinbaseGateway)
main_engine.add_app(CtaStrategyApp)
main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CsvLoaderApp)
# main_engine.add_app(AlgoTradingApp)
# main_engine.add_app(DataRecorderApp)
# main_engine.add_app(RiskManagerApp)
main_engine.add_app(ScriptTraderApp)
main_engine.add_app(RpcServiceApp)
# main_engine.add_app(ScriptTraderApp)
# main_engine.add_app(RpcServiceApp)
main_engine.add_app(SpreadTradingApp)
main_window = MainWindow(main_engine, event_engine)
main_window.showMaximized()

View File

@ -38,6 +38,7 @@ from vnpy.trader.constant import (
from vnpy.trader.utility import load_json, save_json, extract_vt_symbol, round_to
from vnpy.trader.database import database_manager
from vnpy.trader.rqdata import rqdata_client
from vnpy.trader.converter import OffsetConverter
from .base import (
APP_NAME,
@ -50,7 +51,6 @@ from .base import (
STOPORDER_PREFIX
)
from .template import CtaTemplate
from .converter import OffsetConverter
STOP_STATUS_MAP = {

View File

@ -0,0 +1,28 @@
from pathlib import Path
from vnpy.trader.app import BaseApp
from vnpy.trader.object import (
OrderData,
TradeData
)
from .engine import (
SpreadEngine,
APP_NAME,
SpreadData,
LegData,
SpreadStrategyTemplate,
SpreadAlgoTemplate
)
class SpreadTradingApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "价差交易"
engine_class = SpreadEngine
widget_name = "SpreadManager"
icon_name = "spread.ico"

View File

@ -0,0 +1,141 @@
from typing import Any
from vnpy.trader.constant import Direction
from vnpy.trader.object import (TickData, OrderData, TradeData)
from .template import SpreadAlgoTemplate
from .base import SpreadData
class SpreadTakerAlgo(SpreadAlgoTemplate):
""""""
algo_name = "SpreadTaker"
def __init__(
self,
algo_engine: Any,
algoid: str,
spread: SpreadData,
direction: Direction,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool
):
""""""
super().__init__(
algo_engine, algoid, spread, direction,
price, volume, payup, interval, lock
)
self.cancel_interval: int = 2
self.timer_count: int = 0
def on_tick(self, tick: TickData):
""""""
# Return if tick not inited
if not self.spread.bid_volume or not self.spread.ask_volume:
return
# Return if there are any existing orders
if not self.check_order_finished():
return
# Hedge if active leg is not fully hedged
if not self.check_hedge_finished():
self.hedge_passive_legs()
return
# Otherwise check if should take active leg
if self.direction == Direction.LONG:
if self.spread.ask_price <= self.price:
self.take_active_leg()
else:
if self.spread.bid_price >= self.price:
self.take_active_leg()
def on_order(self, order: OrderData):
""""""
# Only care active leg order update
if order.vt_symbol != self.spread.active_leg.vt_symbol:
return
# Do nothing if still any existing orders
if not self.check_order_finished():
return
# Hedge passive legs if necessary
if not self.check_hedge_finished():
self.hedge_passive_legs()
def on_trade(self, trade: TradeData):
""""""
pass
def on_interval(self):
""""""
if not self.check_order_finished():
self.cancel_all_order()
def take_active_leg(self):
""""""
# Calculate spread order volume of new round trade
spread_volume_left = self.target - self.traded
if self.direction == Direction.LONG:
spread_order_volume = self.spread.ask_volume
spread_order_volume = min(spread_order_volume, spread_volume_left)
else:
spread_order_volume = -self.spread.bid_volume
spread_order_volume = max(spread_order_volume, spread_volume_left)
# Calculate active leg order volume
leg_order_volume = self.spread.calculate_leg_volume(
self.spread.active_leg.vt_symbol,
spread_order_volume
)
# Send active leg order
self.send_leg_order(
self.spread.active_leg.vt_symbol,
leg_order_volume
)
def hedge_passive_legs(self):
"""
Send orders to hedge all passive legs.
"""
# Calcualte spread volume to hedge
active_leg = self.spread.active_leg
active_traded = self.leg_traded[active_leg.vt_symbol]
hedge_volume = self.spread.calculate_spread_volume(
active_leg.vt_symbol,
active_traded
)
# Calculate passive leg target volume and do hedge
for leg in self.spread.passive_legs:
passive_traded = self.leg_traded[leg.vt_symbol]
passive_target = self.spread.calculate_leg_volume(
leg.vt_symbol,
hedge_volume
)
leg_order_volume = passive_target - passive_traded
if leg_order_volume:
self.send_leg_order(leg.vt_symbol, leg_order_volume)
def send_leg_order(self, vt_symbol: str, leg_volume: float):
""""""
leg = self.spread.legs[vt_symbol]
leg_tick = self.get_tick(vt_symbol)
leg_contract = self.get_contract(vt_symbol)
if leg_volume > 0:
price = leg_tick.ask_price_1 + leg_contract.pricetick * self.payup
self.send_long_order(leg.vt_symbol, price, abs(leg_volume))
elif leg_volume < 0:
price = leg_tick.bid_price_1 - leg_contract.pricetick * self.payup
self.send_short_order(leg.vt_symbol, price, abs(leg_volume))

View File

@ -0,0 +1,235 @@
from typing import Dict, List
from math import floor, ceil
from datetime import datetime
from vnpy.trader.object import TickData, PositionData, TradeData
from vnpy.trader.constant import Direction, Offset, Exchange
EVENT_SPREAD_DATA = "eSpreadData"
EVENT_SPREAD_POS = "eSpreadPos"
EVENT_SPREAD_LOG = "eSpreadLog"
EVENT_SPREAD_ALGO = "eSpreadAlgo"
EVENT_SPREAD_STRATEGY = "eSpreadStrategy"
class LegData:
""""""
def __init__(self, vt_symbol: str):
""""""
self.vt_symbol: str = vt_symbol
# Price and position data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.long_pos: float = 0
self.short_pos: float = 0
self.net_pos: float = 0
# Tick data buf
self.tick: TickData = None
def update_tick(self, tick: TickData):
""""""
self.bid_price = tick.bid_price_1
self.ask_price = tick.ask_price_1
self.bid_volume = tick.bid_volume_1
self.ask_volume = tick.ask_volume_1
self.tick = tick
def update_position(self, position: PositionData):
""""""
if position.direction == Direction.NET:
self.net_pos = position.volume
else:
if position.direction == Direction.LONG:
self.long_pos = position.volume
else:
self.short_pos = position.volume
self.net_pos = self.long_pos - self.short_pos
def update_trade(self, trade: TradeData):
""""""
if trade.direction == Direction.LONG:
if trade.offset == Offset.OPEN:
self.long_pos += trade.volume
else:
self.short_pos -= trade.volume
else:
if trade.offset == Offset.OPEN:
self.short_pos += trade.volume
else:
self.long_pos -= trade.volume
self.net_pos = self.long_pos - self.net_pos
class SpreadData:
""""""
def __init__(
self,
name: str,
legs: List[LegData],
price_multipliers: Dict[str, int],
trading_multipliers: Dict[str, int],
active_symbol: str
):
""""""
self.name: str = name
self.legs: Dict[str, LegData] = {}
self.active_leg: LegData = None
self.passive_legs: List[LegData] = []
# For calculating spread price
self.price_multipliers: Dict[str: int] = price_multipliers
# For calculating spread pos and sending orders
self.trading_multipliers: Dict[str: int] = trading_multipliers
self.price_formula: str = ""
self.trading_formula: str = ""
for leg in legs:
self.legs[leg.vt_symbol] = leg
if leg.vt_symbol == active_symbol:
self.active_leg = leg
else:
self.passive_legs.append(leg)
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.price_formula += f"+{price_multiplier}*{leg.vt_symbol}"
else:
self.price_formula += f"{price_multiplier}*{leg.vt_symbol}"
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if trading_multiplier > 0:
self.trading_formula += f"+{trading_multiplier}*{leg.vt_symbol}"
else:
self.trading_formula += f"{trading_multiplier}*{leg.vt_symbol}"
# Spread data
self.bid_price: float = 0
self.ask_price: float = 0
self.bid_volume: float = 0
self.ask_volume: float = 0
self.net_pos: float = 0
self.datetime: datetime = None
def calculate_price(self):
""""""
self.clear_price()
# Go through all legs to calculate price
for n, leg in enumerate(self.legs.values()):
# Filter not all leg price data has been received
if not leg.bid_volume or not leg.ask_volume:
self.clear_price()
return
# Calculate price
price_multiplier = self.price_multipliers[leg.vt_symbol]
if price_multiplier > 0:
self.bid_price += leg.bid_price * price_multiplier
self.ask_price += leg.ask_price * price_multiplier
else:
self.bid_price += leg.ask_price * price_multiplier
self.ask_price += leg.bid_price * price_multiplier
# Calculate volume
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
if trading_multiplier > 0:
adjusted_bid_volume = floor(
leg.bid_volume / trading_multiplier)
adjusted_ask_volume = floor(
leg.ask_volume / trading_multiplier)
else:
adjusted_bid_volume = floor(
leg.ask_volume / abs(trading_multiplier))
adjusted_ask_volume = floor(
leg.bid_volume / abs(trading_multiplier))
# For the first leg, just initialize
if not n:
self.bid_volume = adjusted_bid_volume
self.ask_volume = adjusted_ask_volume
# For following legs, use min value of each leg quoting volume
else:
self.bid_volume = min(self.bid_volume, adjusted_bid_volume)
self.ask_volume = min(self.ask_volume, adjusted_ask_volume)
# Update calculate time
self.datetime = datetime.now()
def calculate_pos(self):
""""""
self.net_pos = 0
for n, leg in enumerate(self.legs.values()):
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
adjusted_net_pos = leg.net_pos / trading_multiplier
if adjusted_net_pos > 0:
adjusted_net_pos = floor(adjusted_net_pos)
else:
adjusted_net_pos = ceil(adjusted_net_pos)
if not n:
self.net_pos = adjusted_net_pos
else:
if adjusted_net_pos > 0:
self.net_pos = min(self.net_pos, adjusted_net_pos)
else:
self.net_pos = max(self.net_pos, adjusted_net_pos)
def clear_price(self):
""""""
self.bid_price = 0
self.ask_price = 0
self.bid_volume = 0
self.ask_volume = 0
def calculate_leg_volume(self, vt_symbol: str, spread_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
leg_volume = spread_volume * trading_multiplier
return leg_volume
def calculate_spread_volume(self, vt_symbol: str, leg_volume: float) -> float:
""""""
leg = self.legs[vt_symbol]
trading_multiplier = self.trading_multipliers[leg.vt_symbol]
spread_volume = leg_volume / trading_multiplier
if spread_volume > 0:
spread_volume = floor(spread_volume)
else:
spread_volume = ceil(spread_volume)
return spread_volume
def to_tick(self):
""""""
tick = TickData(
symbol=self.name,
exchange=Exchange.LOCAL,
datetime=self.datetime,
name=self.name,
last_price=(self.bid_price + self.ask_price) / 2,
bid_price_1=self.bid_price,
ask_price_1=self.ask_price,
bid_volume_1=self.bid_volume,
ask_volume_1=self.ask_volume,
gateway_name="SPREAD"
)
return tick

View File

@ -0,0 +1,982 @@
import traceback
import importlib
import os
from typing import List, Dict, Set, Callable, Any, Type
from collections import defaultdict
from copy import copy
from pathlib import Path
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.event import (
EVENT_TICK, EVENT_POSITION, EVENT_CONTRACT,
EVENT_ORDER, EVENT_TRADE, EVENT_TIMER
)
from vnpy.trader.utility import load_json, save_json
from vnpy.trader.object import (
TickData, ContractData, LogData,
SubscribeRequest, OrderRequest
)
from vnpy.trader.constant import Direction, Offset, OrderType
from vnpy.trader.converter import OffsetConverter
from .base import (
LegData, SpreadData,
EVENT_SPREAD_DATA, EVENT_SPREAD_POS,
EVENT_SPREAD_ALGO, EVENT_SPREAD_LOG,
EVENT_SPREAD_STRATEGY
)
from .template import SpreadAlgoTemplate, SpreadStrategyTemplate
from .algo import SpreadTakerAlgo
APP_NAME = "SpreadTrading"
class SpreadEngine(BaseEngine):
""""""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
"""Constructor"""
super().__init__(main_engine, event_engine, APP_NAME)
self.active = False
self.data_engine: SpreadDataEngine = SpreadDataEngine(self)
self.algo_engine: SpreadAlgoEngine = SpreadAlgoEngine(self)
self.strategy_engine: SpreadStrategyEngine = SpreadStrategyEngine(self)
self.add_spread = self.data_engine.add_spread
self.remove_spread = self.data_engine.remove_spread
self.get_spread = self.data_engine.get_spread
self.get_all_spreads = self.data_engine.get_all_spreads
self.start_algo = self.algo_engine.start_algo
self.stop_algo = self.algo_engine.stop_algo
def start(self):
""""""
if self.active:
return
self.active = True
self.data_engine.start()
self.algo_engine.start()
self.strategy_engine.start()
def stop(self):
""""""
self.data_engine.stop()
self.algo_engine.stop()
self.strategy_engine.stop()
def write_log(self, msg: str):
""""""
log = LogData(
msg=msg,
gateway_name=APP_NAME
)
event = Event(EVENT_SPREAD_LOG, log)
self.event_engine.put(event)
class SpreadDataEngine:
""""""
setting_filename = "spread_trading_setting.json"
def __init__(self, spread_engine: SpreadEngine):
""""""
self.spread_engine: SpreadEngine = spread_engine
self.main_engine: MainEngine = spread_engine.main_engine
self.event_engine: EventEngine = spread_engine.event_engine
self.write_log = spread_engine.write_log
self.legs: Dict[str, LegData] = {} # vt_symbol: leg
self.spreads: Dict[str, SpreadData] = {} # name: spread
self.symbol_spread_map: Dict[str, List[SpreadData]] = defaultdict(list)
def start(self):
""""""
self.load_setting()
self.register_event()
self.write_log("价差数据引擎启动成功")
def stop(self):
""""""
pass
def load_setting(self) -> None:
""""""
setting = load_json(self.setting_filename)
for spread_setting in setting:
self.add_spread(
spread_setting["name"],
spread_setting["leg_settings"],
spread_setting["active_symbol"],
save=False
)
def save_setting(self) -> None:
""""""
setting = []
for spread in self.spreads.values():
leg_settings = []
for leg in spread.legs.values():
price_multiplier = spread.price_multipliers[leg.vt_symbol]
trading_multiplier = spread.trading_multipliers[leg.vt_symbol]
leg_setting = {
"vt_symbol": leg.vt_symbol,
"price_multiplier": price_multiplier,
"trading_multiplier": trading_multiplier
}
leg_settings.append(leg_setting)
spread_setting = {
"name": spread.name,
"leg_settings": leg_settings,
"active_symbol": spread.active_leg.vt_symbol
}
setting.append(spread_setting)
save_json(self.setting_filename, setting)
def register_event(self) -> None:
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
def process_tick_event(self, event: Event) -> None:
""""""
tick = event.data
leg = self.legs.get(tick.vt_symbol, None)
if not leg:
return
leg.update_tick(tick)
for spread in self.symbol_spread_map[tick.vt_symbol]:
spread.calculate_price()
self.put_data_event(spread)
def process_position_event(self, event: Event) -> None:
""""""
position = event.data
leg = self.legs.get(position.vt_symbol, None)
if not leg:
return
leg.update_position(position)
for spread in self.symbol_spread_map[position.vt_symbol]:
spread.calculate_pos()
self.put_pos_event(spread)
def process_trade_event(self, event: Event) -> None:
""""""
trade = event.data
leg = self.legs.get(trade.vt_symbol, None)
if not leg:
return
leg.update_trade(trade)
for spread in self.symbol_spread_map[trade.vt_symbol]:
spread.calculate_pos()
self.put_pos_event(spread)
def process_contract_event(self, event: Event) -> None:
""""""
contract = event.data
if contract.vt_symbol in self.legs:
req = SubscribeRequest(
contract.symbol, contract.exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
def put_data_event(self, spread: SpreadData) -> None:
""""""
event = Event(EVENT_SPREAD_DATA, spread)
self.event_engine.put(event)
def put_pos_event(self, spread: SpreadData) -> None:
""""""
event = Event(EVENT_SPREAD_POS, spread)
self.event_engine.put(event)
def add_spread(
self,
name: str,
leg_settings: List[Dict],
active_symbol: str,
save: bool = True
) -> None:
""""""
if name in self.spreads:
self.write_log("价差创建失败,名称重复:{}".format(name))
return
legs: List[LegData] = []
price_multipliers: Dict[str, int] = {}
trading_multipliers: Dict[str, int] = {}
for leg_setting in leg_settings:
vt_symbol = leg_setting["vt_symbol"]
leg = self.legs.get(vt_symbol, None)
if not leg:
leg = LegData(vt_symbol)
self.legs[vt_symbol] = leg
legs.append(leg)
price_multipliers[vt_symbol] = leg_setting["price_multiplier"]
trading_multipliers[vt_symbol] = leg_setting["trading_multiplier"]
spread = SpreadData(
name,
legs,
price_multipliers,
trading_multipliers,
active_symbol
)
self.spreads[name] = spread
for leg in spread.legs.values():
self.symbol_spread_map[leg.vt_symbol].append(spread)
if save:
self.save_setting()
self.write_log("价差创建成功:{}".format(name))
self.put_data_event(spread)
def remove_spread(self, name: str) -> None:
""""""
if name not in self.spreads:
return
spread = self.spreads.pop(name)
for leg in spread.legs.values():
self.symbol_spread_map[leg.vt_symbol].remove(spread)
self.save_setting()
self.write_log("价差移除成功:{},重启后生效".format(name))
def get_spread(self, name: str) -> SpreadData:
""""""
spread = self.spreads.get(name, None)
return spread
def get_all_spreads(self) -> List[SpreadData]:
""""""
return list(self.spreads.values())
class SpreadAlgoEngine:
""""""
algo_class = SpreadTakerAlgo
def __init__(self, spread_engine: SpreadEngine):
""""""
self.spread_engine: SpreadEngine = spread_engine
self.main_engine: MainEngine = spread_engine.main_engine
self.event_engine: EventEngine = spread_engine.event_engine
self.write_log = spread_engine.write_log
self.spreads: Dict[str: SpreadData] = {}
self.algos: Dict[str: SpreadAlgoTemplate] = {}
self.order_algo_map: dict[str: SpreadAlgoTemplate] = {}
self.symbol_algo_map: dict[str: SpreadAlgoTemplate] = defaultdict(list)
self.algo_count: int = 0
self.vt_tradeids: Set = set()
self.offset_converter: OffsetConverter = OffsetConverter(
self.main_engine
)
def start(self):
""""""
self.register_event()
self.write_log("价差算法引擎启动成功")
def stop(self):
""""""
for algo in self.algos.values():
self.stop_algo(algo)
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
self.event_engine.register(
EVENT_SPREAD_DATA, self.process_spread_event
)
def process_spread_event(self, event: Event):
""""""
spread: SpreadData = event.data
self.spreads[spread.name] = spread
def process_tick_event(self, event: Event):
""""""
tick = event.data
algos = self.symbol_algo_map[tick.vt_symbol]
if not algos:
return
buf = copy(algos)
for algo in buf:
if not algo.is_active():
algos.remove(algo)
else:
algo.update_tick(tick)
def process_order_event(self, event: Event):
""""""
order = event.data
self.offset_converter.update_order(order)
algo = self.order_algo_map.get(order.vt_orderid, None)
if algo and algo.is_active():
algo.update_order(order)
def process_trade_event(self, event: Event):
""""""
trade = event.data
# Filter duplicate trade push
if trade.vt_tradeid in self.vt_tradeids:
return
self.vt_tradeids.add(trade.vt_tradeid)
self.offset_converter.update_trade(trade)
algo = self.order_algo_map.get(trade.vt_orderid, None)
if algo and algo.is_active():
algo.update_trade(trade)
def process_position_event(self, event: Event):
""""""
position = event.data
self.offset_converter.update_position(position)
def process_timer_event(self, event: Event):
""""""
buf = list(self.algos.values())
for algo in buf:
if not algo.is_active():
self.algos.pop(algo.algoid)
else:
algo.update_timer()
def start_algo(
self,
spread_name: str,
direction: Direction,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool
) -> str:
# Find spread object
spread = self.spreads.get(spread_name, None)
if not spread:
self.write_log("创建价差算法失败,找不到价差:{}".format(spread_name))
return ""
# Generate algoid str
self.algo_count += 1
algo_count_str = str(self.algo_count).rjust(6, "0")
algoid = f"{self.algo_class.algo_name}_{algo_count_str}"
# Create algo object
algo = self.algo_class(
self,
algoid,
spread,
direction,
price,
volume,
payup,
interval,
lock
)
self.algos[algoid] = algo
# Generate map between vt_symbol and algo
for leg in spread.legs.values():
self.symbol_algo_map[leg.vt_symbol].append(algo)
# Put event to update GUI
self.put_algo_event(algo)
return algoid
def stop_algo(
self,
algoid: str
):
""""""
algo = self.algos.get(algoid, None)
if not algo:
self.write_log("停止价差算法失败,找不到算法:{}".format(algoid))
return
algo.stop()
def put_algo_event(self, algo: SpreadAlgoTemplate) -> None:
""""""
event = Event(EVENT_SPREAD_ALGO, algo)
self.event_engine.put(event)
def write_algo_log(self, algo: SpreadAlgoTemplate, msg: str) -> None:
""""""
msg = f"{algo.algoid}{msg}"
self.write_log(msg)
def send_order(
self,
algo: SpreadAlgoTemplate,
vt_symbol: str,
price: float,
volume: float,
direction: Direction,
lock: bool
) -> List[str]:
""""""
holding = self.offset_converter.get_position_holding(vt_symbol)
contract = self.main_engine.get_contract(vt_symbol)
if direction == Direction.LONG:
available = holding.short_pos - holding.short_pos_frozen
else:
available = holding.long_pos - holding.long_pos_frozen
# If no position to close, just open new
if not available:
offset = Offset.OPEN
# If enougth position to close, just close old
elif volume < available:
offset = Offset.CLOSE
# Otherwise, just close existing position
else:
volume = available
offset = Offset.CLOSE
original_req = OrderRequest(
symbol=contract.symbol,
exchange=contract.exchange,
direction=direction,
offset=offset,
type=OrderType.LIMIT,
price=price,
volume=volume
)
# Convert with offset converter
req_list = self.offset_converter.convert_order_request(
original_req, lock)
# Send Orders
vt_orderids = []
for req in req_list:
vt_orderid = self.main_engine.send_order(
req, contract.gateway_name)
# Check if sending order successful
if not vt_orderid:
continue
vt_orderids.append(vt_orderid)
self.offset_converter.update_order_request(req, vt_orderid)
# Save relationship between orderid and algo.
self.order_algo_map[vt_orderid] = algo
return vt_orderids
def cancel_order(self, algo: SpreadAlgoTemplate, vt_orderid: str) -> None:
""""""
order = self.main_engine.get_order(vt_orderid)
if not order:
self.write_algo_log(algo, "撤单失败,找不到委托{}".format(vt_orderid))
return
req = order.create_cancel_request()
self.main_engine.cancel_order(req, order.gateway_name)
def get_tick(self, vt_symbol: str) -> TickData:
""""""
return self.main_engine.get_tick(vt_symbol)
def get_contract(self, vt_symbol: str) -> ContractData:
""""""
return self.main_engine.get_contract(vt_symbol)
class SpreadStrategyEngine:
""""""
setting_filename = "spraed_trading_strategy.json"
def __init__(self, spread_engine: SpreadEngine):
""""""
self.spread_engine: SpreadEngine = spread_engine
self.main_engine: MainEngine = spread_engine.main_engine
self.event_engine: EventEngine = spread_engine.event_engine
self.write_log = spread_engine.write_log
self.strategy_setting: Dict[str: Dict] = {}
self.classes: Dict[str: Type[SpreadStrategyTemplate]] = {}
self.strategies: Dict[str: SpreadStrategyTemplate] = {}
self.order_strategy_map: dict[str: SpreadStrategyTemplate] = {}
self.algo_strategy_map: dict[str: SpreadStrategyTemplate] = {}
self.spread_strategy_map: dict[str: SpreadStrategyTemplate] = defaultdict(
list)
self.vt_tradeids: Set = set()
self.load_strategy_class()
def start(self):
""""""
self.load_strategy_setting()
self.register_event()
self.write_log("价差策略引擎启动成功")
def close(self):
""""""
self.stop_all_strategies()
def load_strategy_class(self):
"""
Load strategy class from source code.
"""
path1 = Path(__file__).parent.joinpath("strategies")
self.load_strategy_class_from_folder(
path1, "vnpy.app.spread_trading.strategies")
path2 = Path.cwd().joinpath("strategies")
self.load_strategy_class_from_folder(path2, "strategies")
def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
"""
Load strategy class from certain folder.
"""
for dirpath, dirnames, filenames in os.walk(str(path)):
for filename in filenames:
if filename.endswith(".py"):
strategy_module_name = ".".join(
[module_name, filename.replace(".py", "")])
elif filename.endswith(".pyd"):
strategy_module_name = ".".join(
[module_name, filename.split(".")[0]])
self.load_strategy_class_from_module(strategy_module_name)
def load_strategy_class_from_module(self, module_name: str):
"""
Load strategy class from module file.
"""
try:
module = importlib.import_module(module_name)
for name in dir(module):
value = getattr(module, name)
if (isinstance(value, type) and issubclass(value, SpreadStrategyTemplate) and value is not SpreadStrategyTemplate):
self.classes[value.__name__] = value
except: # noqa
msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
self.write_log(msg)
def get_all_strategy_class_names(self):
""""""
return list(self.classes.keys())
def load_strategy_setting(self):
"""
Load setting file.
"""
self.strategy_setting = load_json(self.setting_filename)
for strategy_name, strategy_config in self.strategy_setting.items():
self.add_strategy(
strategy_config["class_name"],
strategy_name,
strategy_config["spread_name"],
strategy_config["setting"]
)
def update_strategy_setting(self, strategy_name: str, setting: dict):
"""
Update setting file.
"""
strategy = self.strategies[strategy_name]
self.strategy_setting[strategy_name] = {
"class_name": strategy.__class__.__name__,
"spread_name": strategy.spread_name,
"setting": setting,
}
save_json(self.setting_filename, self.strategy_setting)
def remove_strategy_setting(self, strategy_name: str):
"""
Update setting file.
"""
if strategy_name not in self.strategy_setting:
return
self.strategy_setting.pop(strategy_name)
save_json(self.setting_filename, self.strategy_setting)
def register_event(self):
""""""
ee = self.event_engine
ee.register(EVENT_ORDER, self.process_order_event)
ee.register(EVENT_TRADE, self.process_trade_event)
ee.register(EVENT_SPREAD_DATA, self.process_spread_data_event)
ee.register(EVENT_SPREAD_POS, self.process_spread_pos_event)
ee.register(EVENT_SPREAD_ALGO, self.process_spread_algo_event)
def process_spread_data_event(self, event: Event):
""""""
spread = event.data
strategies = self.spread_strategy_map[spread.name]
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_spread_data)
def process_spread_pos_event(self, event: Event):
""""""
spread = event.data
strategies = self.spread_strategy_map[spread.name]
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_spread_pos)
def process_spread_algo_event(self, event: Event):
""""""
algo = event.data
strategy = self.algo_strategy_map.get(algo.algoid, None)
if strategy:
self.call_strategy_func(strategy, strategy.update_spread_algo, algo)
def process_order_event(self, event: Event):
""""""
order = event.data
strategy = self.order_strategy_map.get(order.vt_orderid, None)
if strategy:
self.call_strategy_func(strategy, strategy.update_order, order)
def process_trade_event(self, event: Event):
""""""
trade = event.data
strategy = self.order_strategy_map.get(trade.vt_orderid, None)
if strategy:
self.call_strategy_func(strategy, strategy.on_trade, trade)
def call_strategy_func(
self, strategy: SpreadStrategyTemplate, func: Callable, params: Any = None
):
"""
Call function of a strategy and catch any exception raised.
"""
try:
if params:
func(params)
else:
func()
except Exception:
strategy.trading = False
strategy.inited = False
msg = f"触发异常已停止\n{traceback.format_exc()}"
self.write_strategy_log(strategy, msg)
def add_strategy(
self, class_name: str, strategy_name: str, spread_name: str, setting: dict
):
"""
Add a new strategy.
"""
if strategy_name in self.strategies:
self.write_log(f"创建策略失败,存在重名{strategy_name}")
return
strategy_class = self.classes.get(class_name, None)
if not strategy_class:
self.write_log(f"创建策略失败,找不到策略类{class_name}")
return
spread = self.spread_engine.get_spread(spread_name)
if not spread:
self.write_log(f"创建策略失败,找不到价差{spread_name}")
return
strategy = strategy_class(self, strategy_name, spread, setting)
self.strategies[strategy_name] = strategy
# Add vt_symbol to strategy map.
strategies = self.spread_strategy_map[spread_name]
strategies.append(strategy)
# Update to setting file.
self.update_strategy_setting(strategy_name, setting)
self.put_strategy_event(strategy)
def edit_strategy(self, strategy_name: str, setting: dict):
"""
Edit parameters of a strategy.
"""
strategy = self.strategies[strategy_name]
strategy.update_setting(setting)
self.update_strategy_setting(strategy_name, setting)
self.put_strategy_event(strategy)
def remove_strategy(self, strategy_name: str):
"""
Remove a strategy.
"""
strategy = self.strategies[strategy_name]
if strategy.trading:
self.write_log(f"策略{strategy.strategy_name}移除失败,请先停止")
return
# Remove setting
self.remove_strategy_setting(strategy_name)
# Remove from symbol strategy map
strategies = self.spread_strategy_map[strategy.spread_name]
strategies.remove(strategy)
# Remove from strategies
self.strategies.pop(strategy_name)
return True
def init_strategy(self, strategy_name: str):
""""""
strategy = self.strategies[strategy_name]
if strategy.inited:
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
return
self.call_strategy_func(strategy, strategy.on_init)
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")
def start_strategy(self, strategy_name: str):
""""""
strategy = self.strategies[strategy_name]
if not strategy.inited:
self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
return
if strategy.trading:
self.write_log(f"{strategy_name}已经启动,请勿重复操作")
return
self.call_strategy_func(strategy, strategy.on_start)
strategy.trading = True
self.put_strategy_event(strategy)
def stop_strategy(self, strategy_name: str):
""""""
strategy = self.strategies[strategy_name]
if not strategy.trading:
return
self.call_strategy_func(strategy, strategy.on_stop)
strategy.stop_all_algos()
strategy.cancel_all_orders()
strategy.trading = False
self.put_strategy_event(strategy)
def init_all_strategies(self):
""""""
for strategy in self.strategies.keys():
self.init_strategy(strategy)
def start_all_strategies(self):
""""""
for strategy in self.strategies.keys():
self.start_strategy(strategy)
def stop_all_strategies(self):
""""""
for strategy in self.strategies.keys():
self.stop_strategy(strategy)
def get_strategy_class_parameters(self, class_name: str):
"""
Get default parameters of a strategy class.
"""
strategy_class = self.classes[class_name]
parameters = {}
for name in strategy_class.parameters:
parameters[name] = getattr(strategy_class, name)
return parameters
def get_strategy_parameters(self, strategy_name):
"""
Get parameters of a strategy.
"""
strategy = self.strategies[strategy_name]
return strategy.get_parameters()
def start_algo(
self,
strategy: SpreadStrategyTemplate,
spread_name: str,
direction: Direction,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool
) -> str:
""""""
algoid = self.spread_engine.start_algo(
spread_name,
direction,
price,
volume,
payup,
interval,
lock
)
self.algo_strategy_map[algoid] = strategy
return algoid
def stop_algo(self, strategy: SpreadStrategyTemplate, algoid: str):
""""""
self.spread_engine.stop_algo(algoid)
def stop_all_algos(self, strategy: SpreadStrategyTemplate):
""""""
pass
def send_order(
self,
strategy: SpreadStrategyTemplate,
vt_symbol: str,
price: float,
volume: float,
direction: Direction,
offset: Offset,
lock: bool
) -> List[str]:
contract = self.main_engine.get_contract(vt_symbol)
original_req = OrderRequest(
symbol=contract.symbol,
exchange=contract.exchange,
direction=direction,
offset=offset,
type=OrderType.LIMIT,
price=price,
volume=volume
)
# Convert with offset converter
req_list = self.offset_converter.convert_order_request(
original_req, lock)
# Send Orders
vt_orderids = []
for req in req_list:
vt_orderid = self.main_engine.send_order(
req, contract.gateway_name)
# Check if sending order successful
if not vt_orderid:
continue
vt_orderids.append(vt_orderid)
self.offset_converter.update_order_request(req, vt_orderid)
# Save relationship between orderid and strategy.
self.order_strategy_map[vt_orderid] = strategy
return vt_orderids
def cancel_order(self, strategy: SpreadStrategyTemplate, vt_orderid: str):
""""""
order = self.main_engine.get_order(vt_orderid)
if not order:
self.write_strategy_log(
strategy, "撤单失败,找不到委托{}".format(vt_orderid))
return
req = order.create_cancel_request()
self.main_engine.cancel_order(req, order.gateway_name)
def cancel_all_orders(self, strategy: SpreadStrategyTemplate):
""""""
pass
def put_strategy_event(self, strategy: SpreadStrategyTemplate):
""""""
data = strategy.get_data()
event = Event(EVENT_SPREAD_STRATEGY, data)
self.event_engine.put(event)
def write_strategy_log(self, strategy: SpreadStrategyTemplate, msg: str):
""""""
msg = f"{strategy.strategy_name}{msg}"
self.write_log(msg)
def send_strategy_email(self, strategy: SpreadStrategyTemplate, msg: str):
""""""
if strategy:
subject = f"{strategy.strategy_name}"
else:
subject = "价差策略引擎"
self.main_engine.send_email(subject, msg)

View File

@ -0,0 +1,168 @@
from vnpy.app.spread_trading import (
SpreadStrategyTemplate,
SpreadAlgoTemplate,
SpreadData,
OrderData,
TradeData
)
class BasicSpreadStrategy(SpreadStrategyTemplate):
""""""
author = "用Python的交易员"
buy_price = 0.0
sell_price = 0.0
cover_price = 0.0
short_price = 0.0
max_pos = 0.0
payup = 10
interval = 5
spread_pos = 0.0
buy_algoid = ""
sell_algoid = ""
short_algoid = ""
cover_algoid = ""
parameters = [
"buy_price",
"sell_price",
"cover_price",
"short_price",
"max_pos",
"payup",
"interval"
]
variables = [
"spread_pos",
"buy_algoid",
"sell_algoid",
"short_algoid",
"cover_algoid",
]
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
super().__init__(
strategy_engine, strategy_name, spread, setting
)
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
self.buy_algoid = ""
self.sell_algoid = ""
self.short_algoid = ""
self.cover_algoid = ""
self.put_event()
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
self.spread_pos = self.get_spread_pos()
# No position
if not self.spread_pos:
# Start open algos
if not self.buy_algoid:
self.buy_algoid = self.start_long_algo(
self.buy_price, self.max_pos, self.payup, self.interval
)
if not self.short_algoid:
self.short_algoid = self.start_short_algo(
self.short_price, self.max_pos, self.payup, self.interval
)
# Stop close algos
if self.sell_algoid:
self.stop_algo(self.sell_algoid)
if self.cover_algoid:
self.stop_algo(self.cover_algoid)
# Long position
elif self.spread_pos > 0:
# Start sell close algo
if not self.sell_algoid:
self.sell_algoid = self.start_short_algo(
self.sell_price, self.spread_pos, self.payup, self.interval
)
# Stop buy open algo
if self.buy_algoid:
self.stop_algo(self.buy_algoid)
# Short position
elif self.spread_pos < 0:
# Start cover close algo
if not self.cover_algoid:
self.cover_algoid = self.start_long_algo(
self.cover_price, abs(
self.spread_pos), self.payup, self.interval
)
# Stop short open algo
if self.short_algoid:
self.stop_algo(self.short_algoid)
self.put_event()
def on_spread_pos(self):
"""
Callback when spread position is updated.
"""
self.spread_pos = self.get_spread_pos()
self.put_event()
def on_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
if not algo.is_active():
if self.buy_algoid == algo.algoid:
self.buy_algoid = ""
elif self.sell_algoid == algo.algoid:
self.sell_algoid = ""
elif self.short_algoid == algo.algoid:
self.short_algoid = ""
else:
self.cover_algoid = ""
self.put_event()
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
pass

View File

@ -0,0 +1,592 @@
from collections import defaultdict
from typing import Dict, List, Set
from math import floor, ceil
from copy import copy
from vnpy.trader.object import TickData, TradeData, OrderData, ContractData
from vnpy.trader.constant import Direction, Status, Offset
from vnpy.trader.utility import virtual
from .base import SpreadData
class SpreadAlgoTemplate:
"""
Template for implementing spread trading algos.
"""
algo_name = "AlgoTemplate"
def __init__(
self,
algo_engine,
algoid: str,
spread: SpreadData,
direction: Direction,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool
):
""""""
self.algo_engine = algo_engine
self.algoid: str = algoid
self.spread: SpreadData = spread
self.spread_name: str = spread.name
self.direction: Direction = direction
self.price: float = price
self.volume: float = volume
self.payup: int = payup
self.interval = interval
self.lock = lock
if direction == Direction.LONG:
self.target = volume
else:
self.target = -volume
self.status: Status = Status.NOTTRADED # Algo status
self.count: int = 0 # Timer count
self.traded: float = 0 # Volume traded
self.traded_volume: float = 0 # Volume traded (Abs value)
self.leg_traded: Dict[str, float] = defaultdict(int)
self.leg_orders: Dict[str, List[str]] = defaultdict(list)
self.write_log("算法已启动")
def is_active(self):
""""""
if self.status not in [Status.CANCELLED, Status.ALLTRADED]:
return True
else:
return False
def check_order_finished(self):
""""""
finished = True
for leg in self.spread.legs.values():
vt_orderids = self.leg_orders[leg.vt_symbol]
if vt_orderids:
finished = False
break
return finished
def check_hedge_finished(self):
""""""
active_symbol = self.spread.active_leg.vt_symbol
active_traded = self.leg_traded[active_symbol]
spread_volume = self.spread.calculate_spread_volume(
active_symbol, active_traded
)
finished = True
for leg in self.spread.passive_legs:
passive_symbol = leg.vt_symbol
leg_target = self.spread.calculate_leg_volume(
passive_symbol, spread_volume
)
leg_traded = self.leg_traded[passive_symbol]
if leg_traded != leg_target:
finished = False
break
return finished
def stop(self):
""""""
if self.is_active():
self.cancel_all_order()
self.status = Status.CANCELLED
self.write_log("算法已停止")
self.put_event()
def update_tick(self, tick: TickData):
""""""
self.on_tick(tick)
def update_trade(self, trade: TradeData):
""""""
if trade.direction == Direction.LONG:
self.leg_traded[trade.vt_symbol] += trade.volume
else:
self.leg_traded[trade.vt_symbol] -= trade.volume
msg = "委托成交,{}{}{}@{}".format(
trade.vt_symbol,
trade.direction,
trade.volume,
trade.price
)
self.write_log(msg)
self.calculate_traded()
self.put_event()
self.on_trade(trade)
def update_order(self, order: OrderData):
""""""
if not order.is_active():
vt_orderids = self.leg_orders[order.vt_symbol]
if order.vt_orderid in vt_orderids:
vt_orderids.remove(order.vt_orderid)
self.on_order(order)
def update_timer(self):
""""""
self.count += 1
if self.count > self.interval:
self.count = 0
self.on_interval()
self.put_event()
def put_event(self):
""""""
self.algo_engine.put_algo_event(self)
def write_log(self, msg: str):
""""""
self.algo_engine.write_algo_log(self, msg)
def send_long_order(self, vt_symbol: str, price: float, volume: float):
""""""
self.send_order(vt_symbol, price, volume, Direction.LONG)
def send_short_order(self, vt_symbol: str, price: float, volume: float):
""""""
self.send_order(vt_symbol, price, volume, Direction.SHORT)
def send_order(
self,
vt_symbol: str,
price: float,
volume: float,
direction: Direction,
):
""""""
vt_orderids = self.algo_engine.send_order(
self,
vt_symbol,
price,
volume,
direction,
self.lock
)
self.leg_orders[vt_symbol].extend(vt_orderids)
msg = "发出委托,{}{}{}@{}".format(
vt_symbol,
direction,
volume,
price
)
self.write_log(msg)
def cancel_leg_order(self, vt_symbol: str):
""""""
for vt_orderid in self.leg_orders[vt_symbol]:
self.algo_engine.cancel_order(self, vt_orderid)
def cancel_all_order(self):
""""""
for vt_symbol in self.leg_orders.keys():
self.cancel_leg_order(vt_symbol)
def calculate_traded(self):
""""""
self.traded = 0
for n, leg in enumerate(self.spread.legs.values()):
leg_traded = self.leg_traded[leg.vt_symbol]
trading_multiplier = self.spread.trading_multipliers[
leg.vt_symbol]
adjusted_leg_traded = leg_traded / trading_multiplier
if adjusted_leg_traded > 0:
adjusted_leg_traded = floor(adjusted_leg_traded)
else:
adjusted_leg_traded = ceil(adjusted_leg_traded)
if not n:
self.traded = adjusted_leg_traded
else:
if adjusted_leg_traded > 0:
self.traded = min(self.traded, adjusted_leg_traded)
elif adjusted_leg_traded < 0:
self.traded = max(self.traded, adjusted_leg_traded)
else:
self.traded = 0
self.traded_volume = abs(self.traded)
if self.traded == self.target:
self.status = Status.ALLTRADED
elif not self.traded:
self.status = Status.NOTTRADED
else:
self.status = Status.PARTTRADED
def get_tick(self, vt_symbol: str) -> TickData:
""""""
return self.algo_engine.get_tick(vt_symbol)
def get_contract(self, vt_symbol: str) -> ContractData:
""""""
return self.algo_engine.get_contract(vt_symbol)
@virtual
def on_tick(self, tick: TickData):
""""""
pass
@virtual
def on_order(self, order: OrderData):
""""""
pass
@virtual
def on_trade(self, trade: TradeData):
""""""
pass
@virtual
def on_interval(self):
""""""
pass
class SpreadStrategyTemplate:
"""
Template for implementing spread trading strategies.
"""
author: str = ""
parameters: List[str] = []
variables: List[str] = []
def __init__(
self,
strategy_engine,
strategy_name: str,
spread: SpreadData,
setting: dict
):
""""""
self.strategy_engine = strategy_engine
self.strategy_name = strategy_name
self.spread = spread
self.spread_name = spread.name
self.inited = False
self.trading = False
self.variables = copy(self.variables)
self.variables.insert(0, "inited")
self.variables.insert(1, "trading")
self.vt_orderids: Set[str] = set()
self.algoids: Set[str] = set()
self.update_setting(setting)
def update_setting(self, setting: dict):
"""
Update strategy parameter wtih value in setting dict.
"""
for name in self.parameters:
if name in setting:
setattr(self, name, setting[name])
@classmethod
def get_class_parameters(cls):
"""
Get default parameters dict of strategy class.
"""
class_parameters = {}
for name in cls.parameters:
class_parameters[name] = getattr(cls, name)
return class_parameters
def get_parameters(self):
"""
Get strategy parameters dict.
"""
strategy_parameters = {}
for name in self.parameters:
strategy_parameters[name] = getattr(self, name)
return strategy_parameters
def get_variables(self):
"""
Get strategy variables dict.
"""
strategy_variables = {}
for name in self.variables:
strategy_variables[name] = getattr(self, name)
return strategy_variables
def get_data(self):
"""
Get strategy data.
"""
strategy_data = {
"strategy_name": self.strategy_name,
"spread_name": self.spread_name,
"class_name": self.__class__.__name__,
"author": self.author,
"parameters": self.get_parameters(),
"variables": self.get_variables(),
}
return strategy_data
def update_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
if not algo.is_active() and algo.algoid in self.algoids:
self.algoids.remove(algo.algoid)
self.on_spread_algo(algo)
def update_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
if not order.is_active() and order.vt_orderid in self.vt_orderids:
self.vt_orderids.remove(order.vt_orderid)
self.on_order(order)
@virtual
def on_init(self):
"""
Callback when strategy is inited.
"""
pass
@virtual
def on_start(self):
"""
Callback when strategy is started.
"""
pass
@virtual
def on_stop(self):
"""
Callback when strategy is stopped.
"""
pass
@virtual
def on_spread_data(self):
"""
Callback when spread price is updated.
"""
pass
@virtual
def on_spread_pos(self):
"""
Callback when spread position is updated.
"""
pass
@virtual
def on_spread_algo(self, algo: SpreadAlgoTemplate):
"""
Callback when algo status is updated.
"""
pass
@virtual
def on_order(self, order: OrderData):
"""
Callback when order status is updated.
"""
pass
@virtual
def on_trade(self, trade: TradeData):
"""
Callback when new trade data is received.
"""
pass
def start_algo(
self,
direction: Direction,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool
) -> str:
""""""
if not self.trading:
return ""
algoid: str = self.strategy_engine.start_algo(
self,
self.spread_name,
direction,
price,
volume,
payup,
interval,
lock
)
self.algoids.add(algoid)
return algoid
def start_long_algo(
self,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool = False
) -> str:
""""""
return self.start_algo(Direction.LONG, price, volume, payup, interval, lock)
def start_short_algo(
self,
price: float,
volume: float,
payup: int,
interval: int,
lock: bool = False
) -> str:
""""""
return self.start_algo(Direction.SHORT, price, volume, payup, interval, lock)
def stop_algo(self, algoid: str):
""""""
if not self.trading:
return
self.strategy_engine.stop_algo(self, algoid)
def stop_all_algos(self):
""""""
for algoid in self.algoids:
self.stop_algo(algoid)
def buy(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.OPEN, lock)
def sell(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.CLOSE, lock)
def short(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.SHORT, Offset.OPEN, lock)
def cover(self, vt_symbol: str, price: float, volume: float, lock: bool = False) -> List[str]:
""""""
return self.send_order(vt_symbol, price, volume, Direction.LONG, Offset.CLOSE, lock)
def send_order(
self,
vt_symbol: str,
price: float,
volume: float,
direction: Direction,
offset: Offset,
lock: bool
) -> List[str]:
""""""
if not self.trading:
return []
vt_orderids: List[str] = self.strategy_engine.send_order(
self,
vt_symbol,
price,
volume,
direction,
offset,
lock
)
for vt_orderid in vt_orderids:
self.vt_orderids.add(vt_orderid)
return vt_orderids
def cancel_order(self, vt_orderid: str):
""""""
if not self.trading:
return
self.strategy_engine.cancel_order(self, vt_orderid)
def cancel_all_orders(self):
""""""
for vt_orderid in self.vt_orderids:
self.cancel_order(vt_orderid)
def put_event(self):
""""""
self.strategy_engine.put_strategy_event(self)
def write_log(self, msg: str):
""""""
self.strategy_engine.write_strategy_log(self, msg)
def get_spread_tick(self) -> TickData:
""""""
return self.spread.to_tick()
def get_spread_pos(self) -> float:
""""""
return self.spread.net_pos
def get_leg_tick(self, vt_symbol: str) -> TickData:
""""""
leg = self.spread.legs.get(vt_symbol, None)
if not leg:
return None
return leg.tick
def get_leg_pos(self, vt_symbol: str, direction: Direction = Direction.NET) -> float:
""""""
leg = self.spread.legs.get(vt_symbol, None)
if not leg:
return None
if direction == Direction.NET:
return leg.net_pos
elif direction == Direction.LONG:
return leg.long_pos
else:
return leg.short_pos
def send_email(self, msg: str):
"""
Send email to default receiver.
"""
if self.inited:
self.strategy_engine.send_email(msg, self)

View File

@ -0,0 +1 @@
from .widget import SpreadManager

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -0,0 +1,810 @@
"""
Widget for spread trading.
"""
from vnpy.event import EventEngine, Event
from vnpy.trader.engine import MainEngine
from vnpy.trader.constant import Direction
from vnpy.trader.ui import QtWidgets, QtCore, QtGui
from vnpy.trader.ui.widget import (
BaseMonitor, BaseCell,
BidCell, AskCell,
TimeCell, PnlCell,
DirectionCell, EnumCell,
)
from ..engine import (
SpreadEngine,
SpreadStrategyEngine,
APP_NAME,
EVENT_SPREAD_DATA,
EVENT_SPREAD_POS,
EVENT_SPREAD_LOG,
EVENT_SPREAD_ALGO,
EVENT_SPREAD_STRATEGY
)
class SpreadManager(QtWidgets.QWidget):
""""""
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.spread_engine = main_engine.get_engine(APP_NAME)
self.init_ui()
def init_ui(self):
""""""
self.setWindowTitle("价差交易")
self.algo_dialog = SpreadAlgoWidget(self.spread_engine)
algo_group = self.create_group("交易", self.algo_dialog)
algo_group.setMaximumWidth(300)
self.data_monitor = SpreadDataMonitor(
self.main_engine,
self.event_engine
)
self.log_monitor = SpreadLogMonitor(
self.main_engine,
self.event_engine
)
self.algo_monitor = SpreadAlgoMonitor(
self.spread_engine
)
self.strategy_monitor = SpreadStrategyMonitor(
self.spread_engine
)
grid = QtWidgets.QGridLayout()
grid.addWidget(self.create_group("价差", self.data_monitor), 0, 0)
grid.addWidget(self.create_group("日志", self.log_monitor), 1, 0)
grid.addWidget(self.create_group("算法", self.algo_monitor), 0, 1)
grid.addWidget(self.create_group("策略", self.strategy_monitor), 1, 1)
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(algo_group)
hbox.addLayout(grid)
self.setLayout(hbox)
def show(self):
""""""
self.spread_engine.start()
self.algo_dialog.update_class_combo()
self.showMaximized()
def create_group(self, title: str, widget: QtWidgets.QWidget):
""""""
group = QtWidgets.QGroupBox()
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(widget)
group.setLayout(vbox)
group.setTitle(title)
return group
class SpreadDataMonitor(BaseMonitor):
"""
Monitor for spread data.
"""
event_type = EVENT_SPREAD_DATA
data_key = "name"
sorting = False
headers = {
"name": {"display": "名称", "cell": BaseCell, "update": False},
"bid_volume": {"display": "买量", "cell": BidCell, "update": True},
"bid_price": {"display": "买价", "cell": BidCell, "update": True},
"ask_price": {"display": "卖价", "cell": AskCell, "update": True},
"ask_volume": {"display": "卖量", "cell": AskCell, "update": True},
"net_pos": {"display": "净仓", "cell": PnlCell, "update": True},
"datetime": {"display": "时间", "cell": TimeCell, "update": True},
"price_formula": {"display": "定价", "cell": BaseCell, "update": False},
"trading_formula": {"display": "交易", "cell": BaseCell, "update": False},
}
def register_event(self):
"""
Register event handler into event engine.
"""
super().register_event()
self.event_engine.register(EVENT_SPREAD_POS, self.signal.emit)
class SpreadLogMonitor(QtWidgets.QTextEdit):
"""
Monitor for log data.
"""
signal = QtCore.pyqtSignal(Event)
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__()
self.main_engine = main_engine
self.event_engine = event_engine
self.init_ui()
self.register_event()
def init_ui(self):
""""""
self.setReadOnly(True)
def register_event(self):
""""""
self.signal.connect(self.process_log_event)
self.event_engine.register(EVENT_SPREAD_LOG, self.signal.emit)
def process_log_event(self, event: Event):
""""""
log = event.data
msg = f"{log.time.strftime('%H:%M:%S')}\t{log.msg}"
self.append(msg)
class SpreadAlgoMonitor(BaseMonitor):
"""
Monitor for algo status.
"""
event_type = EVENT_SPREAD_ALGO
data_key = "algoid"
sorting = False
headers = {
"algoid": {"display": "算法", "cell": BaseCell, "update": False},
"spread_name": {"display": "价差", "cell": BaseCell, "update": False},
"direction": {"display": "方向", "cell": DirectionCell, "update": False},
"price": {"display": "价格", "cell": BaseCell, "update": False},
"payup": {"display": "超价", "cell": BaseCell, "update": False},
"volume": {"display": "数量", "cell": BaseCell, "update": False},
"traded_volume": {"display": "成交", "cell": BaseCell, "update": True},
"interval": {"display": "间隔", "cell": BaseCell, "update": False},
"count": {"display": "计数", "cell": BaseCell, "update": True},
"status": {"display": "状态", "cell": EnumCell, "update": True},
}
def __init__(self, spread_engine: SpreadEngine):
""""""
super().__init__(spread_engine.main_engine, spread_engine.event_engine)
self.spread_engine = spread_engine
def init_ui(self):
"""
Connect signal.
"""
super().init_ui()
self.setToolTip("双击单元格停止算法")
self.itemDoubleClicked.connect(self.stop_algo)
def stop_algo(self, cell):
"""
Stop algo if cell double clicked.
"""
algo = cell.get_data()
self.spread_engine.stop_algo(algo.algoid)
class SpreadAlgoWidget(QtWidgets.QFrame):
""""""
def __init__(self, spread_engine: SpreadEngine):
""""""
super().__init__()
self.spread_engine: SpreadEngine = spread_engine
self.strategy_engine: SpreadStrategyEngine = spread_engine.strategy_engine
self.init_ui()
def init_ui(self):
""""""
self.setWindowTitle("启动算法")
self.setFrameShape(self.Box)
self.setLineWidth(1)
self.name_line = QtWidgets.QLineEdit()
self.direction_combo = QtWidgets.QComboBox()
self.direction_combo.addItems(
[Direction.LONG.value, Direction.SHORT.value]
)
float_validator = QtGui.QDoubleValidator()
self.price_line = QtWidgets.QLineEdit()
self.price_line.setValidator(float_validator)
self.volume_line = QtWidgets.QLineEdit()
self.volume_line.setValidator(float_validator)
int_validator = QtGui.QIntValidator()
self.payup_line = QtWidgets.QLineEdit()
self.payup_line.setValidator(int_validator)
self.interval_line = QtWidgets.QLineEdit()
self.interval_line.setValidator(int_validator)
button_start = QtWidgets.QPushButton("启动")
button_start.clicked.connect(self.start_algo)
self.lock_combo = QtWidgets.QComboBox()
self.lock_combo.addItems(
["", ""]
)
self.class_combo = QtWidgets.QComboBox()
add_button = QtWidgets.QPushButton("添加策略")
add_button.clicked.connect(self.add_strategy)
init_button = QtWidgets.QPushButton("全部初始化")
init_button.clicked.connect(self.strategy_engine.init_all_strategies)
start_button = QtWidgets.QPushButton("全部启动")
start_button.clicked.connect(self.strategy_engine.start_all_strategies)
stop_button = QtWidgets.QPushButton("全部停止")
stop_button.clicked.connect(self.strategy_engine.stop_all_strategies)
add_spread_button = QtWidgets.QPushButton("创建价差")
add_spread_button.clicked.connect(self.add_spread)
remove_spread_button = QtWidgets.QPushButton("移除价差")
remove_spread_button.clicked.connect(self.remove_spread)
form = QtWidgets.QFormLayout()
form.addRow("价差", self.name_line)
form.addRow("方向", self.direction_combo)
form.addRow("价格", self.price_line)
form.addRow("数量", self.volume_line)
form.addRow("超价", self.payup_line)
form.addRow("间隔", self.interval_line)
form.addRow("锁仓", self.lock_combo)
form.addRow(button_start)
vbox = QtWidgets.QVBoxLayout()
vbox.addLayout(form)
vbox.addStretch()
vbox.addWidget(self.class_combo)
vbox.addWidget(add_button)
vbox.addWidget(init_button)
vbox.addWidget(start_button)
vbox.addWidget(stop_button)
vbox.addStretch()
vbox.addWidget(add_spread_button)
vbox.addWidget(remove_spread_button)
self.setLayout(vbox)
def start_algo(self):
""""""
name = self.name_line.text()
direction = Direction(self.direction_combo.currentText())
price = float(self.price_line.text())
volume = float(self.volume_line.text())
payup = int(self.payup_line.text())
interval = int(self.interval_line.text())
lock_str = self.lock_combo.currentText()
if lock_str == "":
lock = True
else:
lock = False
self.spread_engine.start_algo(
name, direction, price, volume, payup, interval, lock
)
def add_spread(self):
""""""
dialog = SpreadDataDialog(self.spread_engine)
dialog.exec_()
def remove_spread(self):
""""""
dialog = SpreadRemoveDialog(self.spread_engine)
dialog.exec_()
def update_class_combo(self):
""""""
self.class_combo.addItems(
self.strategy_engine.get_all_strategy_class_names()
)
def remove_strategy(self, strategy_name):
""""""
manager = self.managers.pop(strategy_name)
manager.deleteLater()
def add_strategy(self):
""""""
class_name = str(self.class_combo.currentText())
if not class_name:
return
parameters = self.strategy_engine.get_strategy_class_parameters(
class_name)
editor = SettingEditor(parameters, class_name=class_name)
n = editor.exec_()
if n == editor.Accepted:
setting = editor.get_setting()
spread_name = setting.pop("spread_name")
strategy_name = setting.pop("strategy_name")
self.strategy_engine.add_strategy(
class_name, strategy_name, spread_name, setting
)
class SpreadDataDialog(QtWidgets.QDialog):
""""""
def __init__(self, spread_engine: SpreadEngine):
""""""
super().__init__()
self.spread_engine: SpreadEngine = spread_engine
self.leg_widgets = []
self.init_ui()
def init_ui(self):
""""""
self.setWindowTitle("创建价差")
self.name_line = QtWidgets.QLineEdit()
self.active_line = QtWidgets.QLineEdit()
self.grid = QtWidgets.QGridLayout()
button_add = QtWidgets.QPushButton("创建价差")
button_add.clicked.connect(self.add_spread)
Label = QtWidgets.QLabel
grid = QtWidgets.QGridLayout()
grid.addWidget(Label("价差名称"), 0, 0)
grid.addWidget(self.name_line, 0, 1, 1, 3)
grid.addWidget(Label("主动腿代码"), 1, 0)
grid.addWidget(self.active_line, 1, 1, 1, 3)
grid.addWidget(Label(""), 2, 0)
grid.addWidget(Label("本地代码"), 3, 1)
grid.addWidget(Label("价格乘数"), 3, 2)
grid.addWidget(Label("交易乘数"), 3, 3)
int_validator = QtGui.QIntValidator()
leg_count = 5
for i in range(leg_count):
symbol_line = QtWidgets.QLineEdit()
price_line = QtWidgets.QLineEdit()
price_line.setValidator(int_validator)
trading_line = QtWidgets.QLineEdit()
trading_line.setValidator(int_validator)
grid.addWidget(Label("{}".format(i + 1)), 4 + i, 0)
grid.addWidget(symbol_line, 4 + i, 1)
grid.addWidget(price_line, 4 + i, 2)
grid.addWidget(trading_line, 4 + i, 3)
d = {
"symbol": symbol_line,
"price": price_line,
"trading": trading_line
}
self.leg_widgets.append(d)
grid.addWidget(Label(""), 4 + leg_count, 0,)
grid.addWidget(button_add, 5 + leg_count, 0, 1, 4)
self.setLayout(grid)
def add_spread(self):
""""""
spread_name = self.name_line.text()
if not spread_name:
QtWidgets.QMessageBox.warning(
self,
"创建失败",
"请输入价差名称",
QtWidgets.QMessageBox.Ok
)
return
active_symbol = self.active_line.text()
leg_settings = {}
for d in self.leg_widgets:
try:
spread_name = d["symbol"].text()
price_multiplier = int(d["price"].text())
trading_multiplier = int(d["trading"].text())
leg_settings[spread_name] = {
"spread_name": spread_name,
"price_multiplier": price_multiplier,
"trading_multiplier": trading_multiplier
}
except ValueError:
pass
if len(leg_settings) < 2:
QtWidgets.QMessageBox.warning(
self,
"创建失败",
"价差最少需要2条腿",
QtWidgets.QMessageBox.Ok
)
return
if active_symbol not in leg_settings:
QtWidgets.QMessageBox.warning(
self,
"创建失败",
"各条腿中找不到主动腿代码",
QtWidgets.QMessageBox.Ok
)
return
self.spread_engine.add_spread(
spread_name,
list(leg_settings.values()),
active_symbol
)
self.accept()
class SpreadRemoveDialog(QtWidgets.QDialog):
""""""
def __init__(self, spread_engine: SpreadEngine):
""""""
super().__init__()
self.spread_engine: SpreadEngine = spread_engine
self.init_ui()
def init_ui(self):
""""""
self.setWindowTitle("移除价差")
self.setMinimumWidth(300)
self.name_combo = QtWidgets.QComboBox()
spreads = self.spread_engine.get_all_spreads()
for spread in spreads:
self.name_combo.addItem(spread.name)
button_remove = QtWidgets.QPushButton("移除")
button_remove.clicked.connect(self.remove_spread)
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(self.name_combo)
hbox.addWidget(button_remove)
self.setLayout(hbox)
def remove_spread(self):
""""""
spread_name = self.name_combo.currentText()
self.spread_engine.remove_spread(spread_name)
self.accept()
class SpreadStrategyMonitor(QtWidgets.QWidget):
""""""
signal_strategy = QtCore.pyqtSignal(Event)
def __init__(self, spread_engine: SpreadEngine):
super().__init__()
self.strategy_engine = spread_engine.strategy_engine
self.main_engine = spread_engine.main_engine
self.event_engine = spread_engine.event_engine
self.managers = {}
self.init_ui()
self.register_event()
def init_ui(self):
""""""
self.scroll_layout = QtWidgets.QVBoxLayout()
self.scroll_layout.addStretch()
scroll_widget = QtWidgets.QWidget()
scroll_widget.setLayout(self.scroll_layout)
scroll_area = QtWidgets.QScrollArea()
scroll_area.setWidgetResizable(True)
scroll_area.setWidget(scroll_widget)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(scroll_area)
self.setLayout(vbox)
def register_event(self):
""""""
self.signal_strategy.connect(self.process_strategy_event)
self.event_engine.register(
EVENT_SPREAD_STRATEGY, self.signal_strategy.emit
)
def process_strategy_event(self, event):
"""
Update strategy status onto its monitor.
"""
data = event.data
strategy_name = data["strategy_name"]
if strategy_name in self.managers:
manager = self.managers[strategy_name]
manager.update_data(data)
else:
manager = SpreadStrategyWidget(self, self.strategy_engine, data)
self.scroll_layout.insertWidget(0, manager)
self.managers[strategy_name] = manager
def remove_strategy(self, strategy_name):
""""""
manager = self.managers.pop(strategy_name)
manager.deleteLater()
class SpreadStrategyWidget(QtWidgets.QFrame):
"""
Manager for a strategy
"""
def __init__(
self,
strategy_monitor: SpreadStrategyMonitor,
strategy_engine: SpreadStrategyEngine,
data: dict
):
""""""
super().__init__()
self.strategy_monitor = strategy_monitor
self.strategy_engine = strategy_engine
self.strategy_name = data["strategy_name"]
self._data = data
self.init_ui()
def init_ui(self):
""""""
self.setFixedHeight(300)
self.setFrameShape(self.Box)
self.setLineWidth(1)
init_button = QtWidgets.QPushButton("初始化")
init_button.clicked.connect(self.init_strategy)
start_button = QtWidgets.QPushButton("启动")
start_button.clicked.connect(self.start_strategy)
stop_button = QtWidgets.QPushButton("停止")
stop_button.clicked.connect(self.stop_strategy)
edit_button = QtWidgets.QPushButton("编辑")
edit_button.clicked.connect(self.edit_strategy)
remove_button = QtWidgets.QPushButton("移除")
remove_button.clicked.connect(self.remove_strategy)
strategy_name = self._data["strategy_name"]
spread_name = self._data["spread_name"]
class_name = self._data["class_name"]
author = self._data["author"]
label_text = (
f"{strategy_name} - {spread_name} ({class_name} by {author})"
)
label = QtWidgets.QLabel(label_text)
label.setAlignment(QtCore.Qt.AlignCenter)
self.parameters_monitor = StrategyDataMonitor(self._data["parameters"])
self.variables_monitor = StrategyDataMonitor(self._data["variables"])
hbox = QtWidgets.QHBoxLayout()
hbox.addWidget(init_button)
hbox.addWidget(start_button)
hbox.addWidget(stop_button)
hbox.addWidget(edit_button)
hbox.addWidget(remove_button)
vbox = QtWidgets.QVBoxLayout()
vbox.addWidget(label)
vbox.addLayout(hbox)
vbox.addWidget(self.parameters_monitor)
vbox.addWidget(self.variables_monitor)
self.setLayout(vbox)
def update_data(self, data: dict):
""""""
self._data = data
self.parameters_monitor.update_data(data["parameters"])
self.variables_monitor.update_data(data["variables"])
def init_strategy(self):
""""""
self.strategy_engine.init_strategy(self.strategy_name)
def start_strategy(self):
""""""
self.strategy_engine.start_strategy(self.strategy_name)
def stop_strategy(self):
""""""
self.strategy_engine.stop_strategy(self.strategy_name)
def edit_strategy(self):
""""""
strategy_name = self._data["strategy_name"]
parameters = self.strategy_engine.get_strategy_parameters(
strategy_name)
editor = SettingEditor(parameters, strategy_name=strategy_name)
n = editor.exec_()
if n == editor.Accepted:
setting = editor.get_setting()
self.strategy_engine.edit_strategy(strategy_name, setting)
def remove_strategy(self):
""""""
result = self.strategy_engine.remove_strategy(self.strategy_name)
# Only remove strategy gui manager if it has been removed from engine
if result:
self.strategy_monitor.remove_strategy(self.strategy_name)
class StrategyDataMonitor(QtWidgets.QTableWidget):
"""
Table monitor for parameters and variables.
"""
def __init__(self, data: dict):
""""""
super().__init__()
self._data = data
self.cells = {}
self.init_ui()
def init_ui(self):
""""""
labels = list(self._data.keys())
self.setColumnCount(len(labels))
self.setHorizontalHeaderLabels(labels)
self.setRowCount(1)
self.verticalHeader().setSectionResizeMode(
QtWidgets.QHeaderView.Stretch
)
self.verticalHeader().setVisible(False)
self.setEditTriggers(self.NoEditTriggers)
for column, name in enumerate(self._data.keys()):
value = self._data[name]
cell = QtWidgets.QTableWidgetItem(str(value))
cell.setTextAlignment(QtCore.Qt.AlignCenter)
self.setItem(0, column, cell)
self.cells[name] = cell
def update_data(self, data: dict):
""""""
for name, value in data.items():
cell = self.cells[name]
cell.setText(str(value))
class SettingEditor(QtWidgets.QDialog):
"""
For creating new strategy and editing strategy parameters.
"""
def __init__(
self, parameters: dict, strategy_name: str = "", class_name: str = ""
):
""""""
super(SettingEditor, self).__init__()
self.parameters = parameters
self.strategy_name = strategy_name
self.class_name = class_name
self.edits = {}
self.init_ui()
def init_ui(self):
""""""
form = QtWidgets.QFormLayout()
# Add spread_name and name edit if add new strategy
if self.class_name:
self.setWindowTitle(f"添加策略:{self.class_name}")
button_text = "添加"
parameters = {"strategy_name": "", "spread_name": ""}
parameters.update(self.parameters)
else:
self.setWindowTitle(f"参数编辑:{self.strategy_name}")
button_text = "确定"
parameters = self.parameters
for name, value in parameters.items():
type_ = type(value)
edit = QtWidgets.QLineEdit(str(value))
if type_ is int:
validator = QtGui.QIntValidator()
edit.setValidator(validator)
elif type_ is float:
validator = QtGui.QDoubleValidator()
edit.setValidator(validator)
form.addRow(f"{name} {type_}", edit)
self.edits[name] = (edit, type_)
button = QtWidgets.QPushButton(button_text)
button.clicked.connect(self.accept)
form.addRow(button)
self.setLayout(form)
def get_setting(self):
""""""
setting = {}
if self.class_name:
setting["class_name"] = self.class_name
for name, tp in self.edits.items():
edit, type_ = tp
value_text = edit.text()
if type_ == bool:
if value_text == "True":
value = True
else:
value = False
else:
value = type_(value_text)
setting[name] = value
return setting

View File

@ -471,7 +471,10 @@ class BitmexRestApi(RestClient):
headers = request.response.headers
self.rate_limit_remaining = int(headers["x-ratelimit-remaining"])
self.rate_limit_sleep = int(headers.get("Retry-After", 0)) + 1 # 1 extra second sleep
self.rate_limit_sleep = int(headers.get("Retry-After", 0))
if self.rate_limit_sleep:
self.rate_limit_sleep += 1 # 1 extra second sleep
def reset_rate_limit(self):
"""

View File

@ -117,6 +117,10 @@ class Exchange(Enum):
BINANCE = "BINANCE"
COINBASE = "COINBASE"
# Special Function
LOCAL = "LOCAL" # For local generated data
class Currency(Enum):
"""
Currency.

View File

@ -156,6 +156,9 @@ class TimeCell(BaseCell):
"""
Time format is 12:12:12.5
"""
if content is None:
return
timestamp = content.strftime("%H:%M:%S")
millisecond = int(content.microsecond / 1000)