[Mod] DataRecorder: record spread data tick/bar

This commit is contained in:
vn.py 2019-10-21 16:29:52 +08:00
parent 216a5d3f5a
commit 9e92d989d7
2 changed files with 52 additions and 26 deletions

View File

@ -37,7 +37,7 @@ from vnpy.app.cta_strategy import CtaStrategyApp
# from vnpy.app.csv_loader import CsvLoaderApp # from vnpy.app.csv_loader import CsvLoaderApp
# from vnpy.app.algo_trading import AlgoTradingApp # from vnpy.app.algo_trading import AlgoTradingApp
from vnpy.app.cta_backtester import CtaBacktesterApp from vnpy.app.cta_backtester import CtaBacktesterApp
# from vnpy.app.data_recorder import DataRecorderApp from vnpy.app.data_recorder import DataRecorderApp
# from vnpy.app.risk_manager import RiskManagerApp # from vnpy.app.risk_manager import RiskManagerApp
from vnpy.app.script_trader import ScriptTraderApp from vnpy.app.script_trader import ScriptTraderApp
from vnpy.app.rpc_service import RpcServiceApp from vnpy.app.rpc_service import RpcServiceApp
@ -61,7 +61,7 @@ def main():
# main_engine.add_gateway(FemasGateway) # main_engine.add_gateway(FemasGateway)
# main_engine.add_gateway(IbGateway) # main_engine.add_gateway(IbGateway)
# main_engine.add_gateway(FutuGateway) # main_engine.add_gateway(FutuGateway)
# main_engine.add_gateway(BitmexGateway) main_engine.add_gateway(BitmexGateway)
# main_engine.add_gateway(TigerGateway) # main_engine.add_gateway(TigerGateway)
# main_engine.add_gateway(OesGateway) # main_engine.add_gateway(OesGateway)
# main_engine.add_gateway(OkexGateway) # main_engine.add_gateway(OkexGateway)
@ -85,7 +85,7 @@ def main():
main_engine.add_app(CtaBacktesterApp) main_engine.add_app(CtaBacktesterApp)
# main_engine.add_app(CsvLoaderApp) # main_engine.add_app(CsvLoaderApp)
# main_engine.add_app(AlgoTradingApp) # main_engine.add_app(AlgoTradingApp)
# main_engine.add_app(DataRecorderApp) main_engine.add_app(DataRecorderApp)
# main_engine.add_app(RiskManagerApp) # main_engine.add_app(RiskManagerApp)
# main_engine.add_app(ScriptTraderApp) # main_engine.add_app(ScriptTraderApp)
# main_engine.add_app(RpcServiceApp) # main_engine.add_app(RpcServiceApp)

View File

@ -6,6 +6,7 @@ from copy import copy
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.constant import Exchange
from vnpy.trader.object import ( from vnpy.trader.object import (
SubscribeRequest, SubscribeRequest,
TickData, TickData,
@ -15,6 +16,7 @@ from vnpy.trader.object import (
from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT
from vnpy.trader.utility import load_json, save_json, BarGenerator from vnpy.trader.utility import load_json, save_json, BarGenerator
from vnpy.trader.database import database_manager from vnpy.trader.database import database_manager
from vnpy.app.spread_trading.base import EVENT_SPREAD_DATA, SpreadData
APP_NAME = "DataRecorder" APP_NAME = "DataRecorder"
@ -91,18 +93,22 @@ class RecorderEngine(BaseEngine):
self.write_log(f"已在K线记录列表中{vt_symbol}") self.write_log(f"已在K线记录列表中{vt_symbol}")
return return
contract = self.main_engine.get_contract(vt_symbol) if Exchange.LOCAL.value not in vt_symbol:
if not contract: contract = self.main_engine.get_contract(vt_symbol)
self.write_log(f"找不到合约:{vt_symbol}") if not contract:
return self.write_log(f"找不到合约:{vt_symbol}")
return
self.bar_recordings[vt_symbol] = { self.bar_recordings[vt_symbol] = {
"symbol": contract.symbol, "symbol": contract.symbol,
"exchange": contract.exchange.value, "exchange": contract.exchange.value,
"gateway_name": contract.gateway_name "gateway_name": contract.gateway_name
} }
self.subscribe(contract)
else:
self.tick_recordings[vt_symbol] = {}
self.subscribe(contract)
self.save_setting() self.save_setting()
self.put_event() self.put_event()
@ -114,18 +120,24 @@ class RecorderEngine(BaseEngine):
self.write_log(f"已在Tick记录列表中{vt_symbol}") self.write_log(f"已在Tick记录列表中{vt_symbol}")
return return
contract = self.main_engine.get_contract(vt_symbol) # For normal contract
if not contract: if Exchange.LOCAL.value not in vt_symbol:
self.write_log(f"找不到合约:{vt_symbol}") contract = self.main_engine.get_contract(vt_symbol)
return if not contract:
self.write_log(f"找不到合约:{vt_symbol}")
return
self.tick_recordings[vt_symbol] = { self.tick_recordings[vt_symbol] = {
"symbol": contract.symbol, "symbol": contract.symbol,
"exchange": contract.exchange.value, "exchange": contract.exchange.value,
"gateway_name": contract.gateway_name "gateway_name": contract.gateway_name
} }
self.subscribe(contract)
# No need to subscribe for spread data
else:
self.tick_recordings[vt_symbol] = {}
self.subscribe(contract)
self.save_setting() self.save_setting()
self.put_event() self.put_event()
@ -159,11 +171,11 @@ class RecorderEngine(BaseEngine):
"""""" """"""
self.event_engine.register(EVENT_TICK, self.process_tick_event) self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event) self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
self.event_engine.register(
EVENT_SPREAD_DATA, self.process_spread_event)
def process_tick_event(self, event: Event): def update_tick(self, tick: TickData):
"""""" """"""
tick = event.data
if tick.vt_symbol in self.tick_recordings: if tick.vt_symbol in self.tick_recordings:
self.record_tick(tick) self.record_tick(tick)
@ -171,6 +183,11 @@ class RecorderEngine(BaseEngine):
bg = self.get_bar_generator(tick.vt_symbol) bg = self.get_bar_generator(tick.vt_symbol)
bg.update_tick(tick) bg.update_tick(tick)
def process_tick_event(self, event: Event):
""""""
tick = event.data
self.update_tick(tick)
def process_contract_event(self, event: Event): def process_contract_event(self, event: Event):
"""""" """"""
contract = event.data contract = event.data
@ -179,6 +196,15 @@ class RecorderEngine(BaseEngine):
if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings): if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
self.subscribe(contract) self.subscribe(contract)
def process_spread_event(self, event: Event):
""""""
spread: SpreadData = event.data
tick = spread.to_tick()
# Filter not inited spread data
if tick.datetime:
self.update_tick(tick)
def write_log(self, msg: str): def write_log(self, msg: str):
"""""" """"""
event = Event( event = Event(