[Add]CTA策略实盘交易引擎加入对RQData的数据支持

This commit is contained in:
vn.py 2018-12-06 13:00:19 +08:00
parent 02ac91af6f
commit 50f1524c12
5 changed files with 91 additions and 7 deletions

View File

@ -2,18 +2,18 @@
{ {
"name": "double ema", "name": "double ema",
"className": "DoubleMaStrategy", "className": "DoubleMaStrategy",
"vtSymbol": "rb1805" "vtSymbol": "rb1905"
}, },
{ {
"name": "atr rsi", "name": "atr rsi",
"className": "AtrRsiStrategy", "className": "AtrRsiStrategy",
"vtSymbol": "IC1802" "vtSymbol": "IC1901"
}, },
{ {
"name": "king keltner", "name": "king keltner",
"className": "KkStrategy", "className": "KkStrategy",
"vtSymbol": "IH1802" "vtSymbol": "IH1901"
} }
] ]

View File

@ -16,5 +16,8 @@
"tdPenalty": ["IF", "IH", "IC"], "tdPenalty": ["IF", "IH", "IC"],
"maxDecimal": 4 "maxDecimal": 4,
"rqUsername": "",
"rqPassword": ""
} }

View File

@ -16,5 +16,8 @@
"tdPenalty": ["IF", "IH", "IC"], "tdPenalty": ["IF", "IH", "IC"],
"maxDecimal": 4 "maxDecimal": 4,
"rqUsername": "",
"rqPassword": ""
} }

View File

@ -20,6 +20,7 @@ from vnpy.trader.vtObject import VtTickData, VtBarData
from vnpy.trader.vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData from vnpy.trader.vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData
from vnpy.trader.vtFunction import todayDate, getJsonPath from vnpy.trader.vtFunction import todayDate, getJsonPath
from vnpy.trader.app import AppEngine from vnpy.trader.app import AppEngine
from vnpy.trader.vtGlobal import globalSetting
from .ctaBase import * from .ctaBase import *
from .strategy import STRATEGY_CLASS from .strategy import STRATEGY_CLASS
@ -74,6 +75,15 @@ class CtaEngine(AppEngine):
# 引擎类型为实盘 # 引擎类型为实盘
self.engineType = ENGINETYPE_TRADING self.engineType = ENGINETYPE_TRADING
# RQData数据服务
self.rq = None
# RQData能获取的合约代码列表
self.rqSymbolSet = set()
# 初始化RQData服务
self.initRqData()
# 注册日式事件类型 # 注册日式事件类型
self.mainEngine.registerLogEvent(EVENT_CTA_LOG) self.mainEngine.registerLogEvent(EVENT_CTA_LOG)
@ -343,6 +353,12 @@ class CtaEngine(AppEngine):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def loadBar(self, dbName, collectionName, days): def loadBar(self, dbName, collectionName, days):
"""从数据库中读取Bar数据startDate是datetime对象""" """从数据库中读取Bar数据startDate是datetime对象"""
# 优先尝试从RQData获取数据
if dbName == MINUTE_DB_NAME and collectionName.upper() in self.rqSymbolSet:
l = self.loadRqBar(collectionName, days)
return l
# 如果没有则从数据库中读取数据
startDate = self.today - timedelta(days) startDate = self.today - timedelta(days)
d = {'datetime':{'$gte':startDate}} d = {'datetime':{'$gte':startDate}}
@ -663,4 +679,63 @@ class CtaEngine(AppEngine):
return contract.priceTick return contract.priceTick
return 0 return 0
#----------------------------------------------------------------------
def initRqData(self):
"""初始化RQData客户端"""
# 检查是否填写了RQData配置
username = globalSetting.get('rqUsername')
password = globalSetting.get('rqPassword')
if not username or not password:
print globalSetting
return
# 加载RQData
try:
import rqdatac as rq
except ImportError:
print 'import fail'
return
# 登录RQData
self.rq = rq
self.rq.init(username, password)
# 获取本日可交易合约代码
try:
df = self.rq.all_instruments(type='Future', date=datetime.now())
for ix, row in df.iterrows():
self.rqSymbolSet.add(row['order_book_id'])
except RuntimeError:
print 'download fail'
pass
#----------------------------------------------------------------------
def loadRqBar(self, symbol, days):
"""从RQData加载K线数据"""
endDate = datetime.now()
startDate = endDate - timedelta(days)
df = self.rq.get_price(symbol.upper(),
frequency='1m',
fields=['open', 'high', 'low', 'close', 'volume'],
start_date=startDate,
end_date=endDate)
l = []
for ix, row in df.iterrows():
bar = VtBarData()
bar.symbol = symbol
bar.vtSymbol = symbol
bar.open = row['open']
bar.high = row['high']
bar.low = row['low']
bar.close = row['close']
bar.volume = row['volume']
bar.datetime = row.name
bar.date = bar.datetime.strftime("%Y%m%d")
bar.time = bar.datetime.strftime("%H:%M:%S")
l.append(bar)
return l

View File

@ -113,6 +113,8 @@ class MainEngine(object):
if gateway: if gateway:
gateway.connect() gateway.connect()
self.dbConnect()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def subscribe(self, subscribeReq, gatewayName): def subscribe(self, subscribeReq, gatewayName):
@ -196,7 +198,7 @@ class MainEngine(object):
# 读取MongoDB的设置 # 读取MongoDB的设置
try: try:
# 设置MongoDB操作的超时时间为0.5秒 # 设置MongoDB操作的超时时间为0.5秒
self.dbClient = MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'], connectTimeoutMS=500) self.dbClient = MongoClient(globalSetting['mongoHost'], globalSetting['mongoPort'], serverSelectionTimeoutMS=500)
# 调用server_info查询服务器状态防止服务器异常并未连接成功 # 调用server_info查询服务器状态防止服务器异常并未连接成功
self.dbClient.server_info() self.dbClient.server_info()
@ -208,6 +210,7 @@ class MainEngine(object):
self.eventEngine.register(EVENT_LOG, self.dbLogging) self.eventEngine.register(EVENT_LOG, self.dbLogging)
except ConnectionFailure: except ConnectionFailure:
self.dbClient = None
self.writeLog(text.DATABASE_CONNECTING_FAILED) self.writeLog(text.DATABASE_CONNECTING_FAILED)
#---------------------------------------------------------------------- #----------------------------------------------------------------------