From 144ca19b08d8ddcb7cbe5af169eb9d1948ad2981 Mon Sep 17 00:00:00 2001 From: "vn.py" Date: Mon, 18 Feb 2019 13:10:00 +0800 Subject: [PATCH] [Add] use rqdata client for query history data in cta engine --- vnpy/app/cta_strategy/engine.py | 108 ++++++++++++++++++++++++------ vnpy/app/cta_strategy/template.py | 2 +- 2 files changed, 90 insertions(+), 20 deletions(-) diff --git a/vnpy/app/cta_strategy/engine.py b/vnpy/app/cta_strategy/engine.py index f7a0ce93..1abb44d9 100644 --- a/vnpy/app/cta_strategy/engine.py +++ b/vnpy/app/cta_strategy/engine.py @@ -8,7 +8,9 @@ from pathlib import Path from typing import Any, Callable from datetime import datetime, timedelta from threading import Thread -from queue import Queue, Empty +from queue import Queue + +import rqdatac from vnpy.event import Event, EventEngine from vnpy.trader.engine import BaseEngine, MainEngine @@ -17,11 +19,13 @@ from vnpy.trader.object import ( SubscribeRequest, LogData, TickData, + BarData ) from vnpy.trader.event import EVENT_TICK, EVENT_ORDER, EVENT_TRADE -from vnpy.trader.constant import Direction, PriceType, Interval +from vnpy.trader.constant import Direction, PriceType, Interval, Exchange from vnpy.trader.utility import load_json, save_json from vnpy.trader.database import DbTickData, DbBarData +from vnpy.trader.setting import SETTINGS from .base import ( EVENT_CTA_LOG, @@ -69,9 +73,13 @@ class CtaEngine(BaseEngine): self.init_thread = None self.init_queue = Queue() + self.rq_client = None + self.rq_symbols = set() + def init_engine(self): """ """ + self.init_rqdata() self.load_strategy_class() self.load_strategy_setting() self.load_strategy_data() @@ -88,6 +96,65 @@ class CtaEngine(BaseEngine): self.event_engine.register(EVENT_ORDER, self.process_order_event) self.event_engine.register(EVENT_TRADE, self.process_trade_event) + def init_rqdata(self): + """ + Init RQData client. + """ + username = SETTINGS["rqdata.username"] + password = SETTINGS["rqdata.password"] + if not username or not password: + return + + self.rq_client = rqdatac + self.rq_client.init(username, password, + ('rqdatad-pro.ricequant.com', 16011)) + + try: + df = self.rq_client.all_instruments( + type='Future', date=datetime.now()) + for ix, row in df.iterrows(): + self.rq_symbols.add(row['order_book_id']) + except RuntimeError: + pass + + self.write_log("RQData数据接口初始化成功") + + def query_bar_from_rq( + self, vt_symbol: str, interval: Interval, start: datetime, end: datetime + ): + """ + Query bar data from RQData. + """ + symbol, exchange_str = vt_symbol.split(".") + if symbol.upper() not in self.rq_symbols: + return None + + df = self.rq_client.get_price( + symbol.upper(), + frequency=interval.value, + fields=["open", "high", "low", "close", "volume"], + start_date=start, + end_date=end + ) + + data = [] + for ix, row in df.iterrows(): + bar = BarData( + symbol=symbol, + exchange=Exchange(exchange_str), + interval=interval, + datetime=row.name.to_pydatetime(), + open_price=row["open"], + high_price=row["high"], + low_price=row["low"], + close_price=row["close"], + volume=row["volume"], + gateway_name="RQ" + ) + data.append(bar) + + return data + def process_tick_event(self, event: Event): """""" tick = event.data @@ -111,7 +178,7 @@ class CtaEngine(BaseEngine): return # Remove vt_orderid if order is no longer active. - vt_orderids = self.strategy_orderid_map[strategy.name] + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] if order.vt_orderid in vt_orderids and not order.is_active(): vt_orderids.remove(order.vt_orderid) @@ -172,7 +239,7 @@ class CtaEngine(BaseEngine): # Remove from relation map. self.stop_orders.pop(stop_order.stop_orderid) - vt_orderids = self.strategy_orderid_map[strategy.name] + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] if stop_order.stop_orderid in vt_orderids: vt_orderids.remove(stop_order.stop_orderid) @@ -217,7 +284,7 @@ class CtaEngine(BaseEngine): # Save relationship between orderid and strategy. self.orderid_strategy_map[vt_orderid] = strategy - vt_orderids = self.strategy_orderid_map[strategy.name] + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] vt_orderids.add(vt_orderid) return vt_orderid @@ -248,7 +315,7 @@ class CtaEngine(BaseEngine): self.stop_orders[stop_orderid] = stop_order - vt_orderids = self.strategy_orderid_map[strategy.name] + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] vt_orderids.add(stop_orderid) self.call_strategy_func(strategy, strategy.on_stop_order, stop_order) @@ -279,7 +346,7 @@ class CtaEngine(BaseEngine): # Remove from relation map. self.stop_orders.pop(stop_orderid) - vt_orderids = self.strategy_orderid_map[strategy.name] + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] if stop_orderid in vt_orderids: vt_orderids.remove(stop_orderid) @@ -315,7 +382,7 @@ class CtaEngine(BaseEngine): """ Cancel all active orders of a strategy. """ - vt_orderids = self.strategy_orderid_map[strategy.name] + vt_orderids = self.strategy_orderid_map[strategy.strategy_name] if not vt_orderids: return @@ -333,18 +400,21 @@ class CtaEngine(BaseEngine): end = datetime.now() start = end - timedelta(days) - s = ( - DbBarData.select() - .where( - (DbBarData.vt_symbol == vt_symbol) & - (DbBarData.interval == interval) & - (DbBarData.datetime >= start) & - (DbBarData.datetime <= end) + # Query data from RQData by default, if not found, load from database. + data = self.query_bar_from_rq(vt_symbol, interval, start, end) + if not data: + data = ( + DbBarData.select() + .where( + (DbBarData.vt_symbol == vt_symbol) & + (DbBarData.interval == interval) & + (DbBarData.datetime >= start) & + (DbBarData.datetime <= end) + ) + .order_by(DbBarData.datetime) ) - .order_by(DbBarData.datetime) - ) - for bar in s: + for bar in data: callback(bar) def load_tick(self, vt_symbol: str, days: int, callback: Callable): @@ -676,7 +746,7 @@ class CtaEngine(BaseEngine): Send email to default receiver. """ if strategy: - subject = f"{strategy.name}" + subject = f"{strategy.strategy_name}" else: subject = "CTA策略引擎" diff --git a/vnpy/app/cta_strategy/template.py b/vnpy/app/cta_strategy/template.py index 2b5ab498..1489f205 100644 --- a/vnpy/app/cta_strategy/template.py +++ b/vnpy/app/cta_strategy/template.py @@ -193,7 +193,7 @@ class CtaTemplate(ABC): """ Write a log message. """ - self.cta_engine.write_log(self, msg) + self.cta_engine.write_log(msg, self) def get_engine_type(self): """