[Add] use rqdata client for query history data in cta engine

This commit is contained in:
vn.py 2019-02-18 13:10:00 +08:00
parent d32c3100af
commit 144ca19b08
2 changed files with 90 additions and 20 deletions

View File

@ -8,7 +8,9 @@ from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from datetime import datetime, timedelta from datetime import datetime, timedelta
from threading import Thread from threading import Thread
from queue import Queue, Empty from queue import Queue
import rqdatac
from vnpy.event import Event, EventEngine from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine from vnpy.trader.engine import BaseEngine, MainEngine
@ -17,11 +19,13 @@ from vnpy.trader.object import (
SubscribeRequest, SubscribeRequest,
LogData, LogData,
TickData, TickData,
BarData
) )
from vnpy.trader.event import EVENT_TICK, EVENT_ORDER, EVENT_TRADE from vnpy.trader.event import EVENT_TICK, EVENT_ORDER, EVENT_TRADE
from vnpy.trader.constant import Direction, PriceType, Interval from vnpy.trader.constant import Direction, PriceType, Interval, Exchange
from vnpy.trader.utility import load_json, save_json from vnpy.trader.utility import load_json, save_json
from vnpy.trader.database import DbTickData, DbBarData from vnpy.trader.database import DbTickData, DbBarData
from vnpy.trader.setting import SETTINGS
from .base import ( from .base import (
EVENT_CTA_LOG, EVENT_CTA_LOG,
@ -69,9 +73,13 @@ class CtaEngine(BaseEngine):
self.init_thread = None self.init_thread = None
self.init_queue = Queue() self.init_queue = Queue()
self.rq_client = None
self.rq_symbols = set()
def init_engine(self): def init_engine(self):
""" """
""" """
self.init_rqdata()
self.load_strategy_class() self.load_strategy_class()
self.load_strategy_setting() self.load_strategy_setting()
self.load_strategy_data() self.load_strategy_data()
@ -88,6 +96,65 @@ class CtaEngine(BaseEngine):
self.event_engine.register(EVENT_ORDER, self.process_order_event) self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event) self.event_engine.register(EVENT_TRADE, self.process_trade_event)
def init_rqdata(self):
"""
Init RQData client.
"""
username = SETTINGS["rqdata.username"]
password = SETTINGS["rqdata.password"]
if not username or not password:
return
self.rq_client = rqdatac
self.rq_client.init(username, password,
('rqdatad-pro.ricequant.com', 16011))
try:
df = self.rq_client.all_instruments(
type='Future', date=datetime.now())
for ix, row in df.iterrows():
self.rq_symbols.add(row['order_book_id'])
except RuntimeError:
pass
self.write_log("RQData数据接口初始化成功")
def query_bar_from_rq(
self, vt_symbol: str, interval: Interval, start: datetime, end: datetime
):
"""
Query bar data from RQData.
"""
symbol, exchange_str = vt_symbol.split(".")
if symbol.upper() not in self.rq_symbols:
return None
df = self.rq_client.get_price(
symbol.upper(),
frequency=interval.value,
fields=["open", "high", "low", "close", "volume"],
start_date=start,
end_date=end
)
data = []
for ix, row in df.iterrows():
bar = BarData(
symbol=symbol,
exchange=Exchange(exchange_str),
interval=interval,
datetime=row.name.to_pydatetime(),
open_price=row["open"],
high_price=row["high"],
low_price=row["low"],
close_price=row["close"],
volume=row["volume"],
gateway_name="RQ"
)
data.append(bar)
return data
def process_tick_event(self, event: Event): def process_tick_event(self, event: Event):
"""""" """"""
tick = event.data tick = event.data
@ -111,7 +178,7 @@ class CtaEngine(BaseEngine):
return return
# Remove vt_orderid if order is no longer active. # Remove vt_orderid if order is no longer active.
vt_orderids = self.strategy_orderid_map[strategy.name] vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if order.vt_orderid in vt_orderids and not order.is_active(): if order.vt_orderid in vt_orderids and not order.is_active():
vt_orderids.remove(order.vt_orderid) vt_orderids.remove(order.vt_orderid)
@ -172,7 +239,7 @@ class CtaEngine(BaseEngine):
# Remove from relation map. # Remove from relation map.
self.stop_orders.pop(stop_order.stop_orderid) self.stop_orders.pop(stop_order.stop_orderid)
vt_orderids = self.strategy_orderid_map[strategy.name] vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if stop_order.stop_orderid in vt_orderids: if stop_order.stop_orderid in vt_orderids:
vt_orderids.remove(stop_order.stop_orderid) vt_orderids.remove(stop_order.stop_orderid)
@ -217,7 +284,7 @@ class CtaEngine(BaseEngine):
# Save relationship between orderid and strategy. # Save relationship between orderid and strategy.
self.orderid_strategy_map[vt_orderid] = strategy self.orderid_strategy_map[vt_orderid] = strategy
vt_orderids = self.strategy_orderid_map[strategy.name] vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
vt_orderids.add(vt_orderid) vt_orderids.add(vt_orderid)
return vt_orderid return vt_orderid
@ -248,7 +315,7 @@ class CtaEngine(BaseEngine):
self.stop_orders[stop_orderid] = stop_order self.stop_orders[stop_orderid] = stop_order
vt_orderids = self.strategy_orderid_map[strategy.name] vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
vt_orderids.add(stop_orderid) vt_orderids.add(stop_orderid)
self.call_strategy_func(strategy, strategy.on_stop_order, stop_order) self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
@ -279,7 +346,7 @@ class CtaEngine(BaseEngine):
# Remove from relation map. # Remove from relation map.
self.stop_orders.pop(stop_orderid) self.stop_orders.pop(stop_orderid)
vt_orderids = self.strategy_orderid_map[strategy.name] vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if stop_orderid in vt_orderids: if stop_orderid in vt_orderids:
vt_orderids.remove(stop_orderid) vt_orderids.remove(stop_orderid)
@ -315,7 +382,7 @@ class CtaEngine(BaseEngine):
""" """
Cancel all active orders of a strategy. Cancel all active orders of a strategy.
""" """
vt_orderids = self.strategy_orderid_map[strategy.name] vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if not vt_orderids: if not vt_orderids:
return return
@ -333,7 +400,10 @@ class CtaEngine(BaseEngine):
end = datetime.now() end = datetime.now()
start = end - timedelta(days) start = end - timedelta(days)
s = ( # Query data from RQData by default, if not found, load from database.
data = self.query_bar_from_rq(vt_symbol, interval, start, end)
if not data:
data = (
DbBarData.select() DbBarData.select()
.where( .where(
(DbBarData.vt_symbol == vt_symbol) & (DbBarData.vt_symbol == vt_symbol) &
@ -344,7 +414,7 @@ class CtaEngine(BaseEngine):
.order_by(DbBarData.datetime) .order_by(DbBarData.datetime)
) )
for bar in s: for bar in data:
callback(bar) callback(bar)
def load_tick(self, vt_symbol: str, days: int, callback: Callable): def load_tick(self, vt_symbol: str, days: int, callback: Callable):
@ -676,7 +746,7 @@ class CtaEngine(BaseEngine):
Send email to default receiver. Send email to default receiver.
""" """
if strategy: if strategy:
subject = f"{strategy.name}" subject = f"{strategy.strategy_name}"
else: else:
subject = "CTA策略引擎" subject = "CTA策略引擎"

View File

@ -193,7 +193,7 @@ class CtaTemplate(ABC):
""" """
Write a log message. Write a log message.
""" """
self.cta_engine.write_log(self, msg) self.cta_engine.write_log(msg, self)
def get_engine_type(self): def get_engine_type(self):
""" """