vnpy/vn.trader/dataRecorder/drEngine.py

233 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding: UTF-8
'''
本文件中实现了行情数据记录引擎用于汇总TICK数据并生成K线插入数据库。
使用DR_setting.json来配置需要收集的合约以及主力合约代码。
'''
import json
import os
import copy
from collections import OrderedDict
from datetime import datetime, timedelta
from Queue import Queue
from threading import Thread
from eventEngine import *
from vtGateway import VtSubscribeReq, VtLogData
from drBase import *
from vtFunction import todayDate
########################################################################
class DrEngine(object):
"""数据记录引擎"""
settingFileName = 'DR_setting.json'
path = os.path.abspath(os.path.dirname(__file__))
settingFileName = os.path.join(path, settingFileName)
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 当前日期
self.today = todayDate()
# 主力合约代码映射字典key为具体的合约代码如IF1604value为主力合约代码如IF0000
self.activeSymbolDict = {}
# Tick对象字典
self.tickDict = {}
# K线对象字典
self.barDict = {}
# 负责执行数据库插入的单独线程相关
self.active = False # 工作状态
self.queue = Queue() # 队列
self.thread = Thread(target=self.run) # 线程
# 载入设置,订阅行情
self.loadSetting()
#----------------------------------------------------------------------
def loadSetting(self):
"""载入设置"""
with open(self.settingFileName) as f:
drSetting = json.load(f)
# 如果working设为False则不启动行情记录功能
working = drSetting['working']
if not working:
return
if 'tick' in drSetting:
l = drSetting['tick']
for setting in l:
symbol = setting[0]
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = setting[0]
# 针对LTS和IB接口订阅行情需要交易所代码
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
# 针对IB接口订阅行情需要货币和产品类型
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, setting[1])
drTick = DrTickData() # 该tick实例可以用于缓存部分数据目前未使用
self.tickDict[vtSymbol] = drTick
if 'bar' in drSetting:
l = drSetting['bar']
for setting in l:
symbol = setting[0]
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = symbol
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, setting[1])
bar = DrBarData()
self.barDict[vtSymbol] = bar
if 'active' in drSetting:
d = drSetting['active']
# 注意这里的vtSymbol对于IB和LTS接口应该后缀.交易所
for activeSymbol, vtSymbol in d.items():
self.activeSymbolDict[vtSymbol] = activeSymbol
# 启动数据插入线程
self.start()
# 注册事件监听
self.registerEvent()
#----------------------------------------------------------------------
def procecssTickEvent(self, event):
"""处理行情推送"""
tick = event.dict_['data']
vtSymbol = tick.vtSymbol
# 转化Tick格式
drTick = DrTickData()
d = drTick.__dict__
for key in d.keys():
if key != 'datetime':
d[key] = tick.__getattribute__(key)
drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
# 更新Tick数据
if vtSymbol in self.tickDict:
self.insertData(TICK_DB_NAME, vtSymbol, drTick)
if vtSymbol in self.activeSymbolDict:
activeSymbol = self.activeSymbolDict[vtSymbol]
self.insertData(TICK_DB_NAME, activeSymbol, drTick)
# 发出日志
self.writeDrLog(u'记录Tick数据%s,时间:%s, last:%s, bid:%s, ask:%s'
%(drTick.vtSymbol, drTick.time, drTick.lastPrice, drTick.bidPrice1, drTick.askPrice1))
# 更新分钟线数据
if vtSymbol in self.barDict:
bar = self.barDict[vtSymbol]
# 如果第一个TICK或者新的一分钟
if not bar.datetime or bar.datetime.minute != drTick.datetime.minute:
if bar.vtSymbol:
newBar = copy.copy(bar)
self.insertData(MINUTE_DB_NAME, vtSymbol, newBar)
if vtSymbol in self.activeSymbolDict:
activeSymbol = self.activeSymbolDict[vtSymbol]
self.insertData(MINUTE_DB_NAME, activeSymbol, newBar)
self.writeDrLog(u'记录分钟线数据%s,时间:%s, O:%s, H:%s, L:%s, C:%s'
%(bar.vtSymbol, bar.time, bar.open, bar.high,
bar.low, bar.close))
bar.vtSymbol = drTick.vtSymbol
bar.symbol = drTick.symbol
bar.exchange = drTick.exchange
bar.open = drTick.lastPrice
bar.high = drTick.lastPrice
bar.low = drTick.lastPrice
bar.close = drTick.lastPrice
bar.date = drTick.date
bar.time = drTick.time
bar.datetime = drTick.datetime
bar.volume = drTick.volume
bar.openInterest = drTick.openInterest
# 否则继续累加新的K线
else:
bar.high = max(bar.high, drTick.lastPrice)
bar.low = min(bar.low, drTick.lastPrice)
bar.close = drTick.lastPrice
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
#----------------------------------------------------------------------
def insertData(self, dbName, collectionName, data):
"""插入数据到数据库这里的data可以是CtaTickData或者CtaBarData"""
self.queue.put((dbName, collectionName, data.__dict__))
#----------------------------------------------------------------------
def run(self):
"""运行插入线程"""
while self.active:
try:
dbName, collectionName, d = self.queue.get(block=True, timeout=1)
self.mainEngine.dbInsert(dbName, collectionName, d)
except Empty:
pass
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.active = True
self.thread.start()
#----------------------------------------------------------------------
def stop(self):
"""退出"""
if self.active:
self.active = False
self.thread.join()
#----------------------------------------------------------------------
def writeDrLog(self, content):
"""快速发出日志事件"""
log = VtLogData()
log.logContent = content
event = Event(type_=EVENT_DATARECORDER_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)