vnpy/trader/app/dataRecorder/drEngine.py

233 lines
8.8 KiB
Python
Raw Normal View History

2016-07-02 03:12:44 +00:00
# encoding: UTF-8
'''
本文件中实现了行情数据记录引擎用于汇总TICK数据并生成K线插入数据库
使用DR_setting.json来配置需要收集的合约以及主力合约代码
'''
import json
import os
import copy
from collections import OrderedDict
from datetime import datetime, timedelta
from Queue import Queue
from threading import Thread
2016-07-02 03:12:44 +00:00
2017-06-11 13:41:04 +00:00
from trader.eventEngine import *
from trader.vtGateway import VtSubscribeReq, VtLogData
from trader.vtFunction import todayDate
2016-07-02 03:12:44 +00:00
2017-06-11 13:41:04 +00:00
from drBase import *
2016-07-02 03:12:44 +00:00
########################################################################
class DrEngine(object):
"""数据记录引擎"""
settingFileName = 'DR_setting.json'
2017-06-11 13:41:04 +00:00
path = os.path.abspath(os.path.dirname(__file__))
settingFileName = os.path.join(path, settingFileName)
2016-07-02 03:12:44 +00:00
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 当前日期
self.today = todayDate()
# 主力合约代码映射字典key为具体的合约代码如IF1604value为主力合约代码如IF0000
self.activeSymbolDict = {}
# Tick对象字典
self.tickDict = {}
# K线对象字典
self.barDict = {}
# 负责执行数据库插入的单独线程相关
self.active = False # 工作状态
self.queue = Queue() # 队列
self.thread = Thread(target=self.run) # 线程
2016-07-02 03:12:44 +00:00
# 载入设置,订阅行情
self.loadSetting()
#----------------------------------------------------------------------
def loadSetting(self):
"""载入设置"""
with open(self.settingFileName) as f:
drSetting = json.load(f)
# 如果working设为False则不启动行情记录功能
working = drSetting['working']
if not working:
return
if 'tick' in drSetting:
l = drSetting['tick']
for setting in l:
symbol = setting[0]
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = setting[0]
# 针对LTS和IB接口订阅行情需要交易所代码
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
# 针对IB接口订阅行情需要货币和产品类型
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, setting[1])
drTick = DrTickData() # 该tick实例可以用于缓存部分数据目前未使用
self.tickDict[vtSymbol] = drTick
if 'bar' in drSetting:
l = drSetting['bar']
for setting in l:
symbol = setting[0]
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = symbol
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, setting[1])
bar = DrBarData()
self.barDict[vtSymbol] = bar
if 'active' in drSetting:
d = drSetting['active']
# 注意这里的vtSymbol对于IB和LTS接口应该后缀.交易所
for activeSymbol, vtSymbol in d.items():
self.activeSymbolDict[vtSymbol] = activeSymbol
# 启动数据插入线程
self.start()
2016-07-02 03:12:44 +00:00
# 注册事件监听
self.registerEvent()
2016-07-02 03:12:44 +00:00
#----------------------------------------------------------------------
def procecssTickEvent(self, event):
"""处理行情推送"""
tick = event.dict_['data']
vtSymbol = tick.vtSymbol
# 转化Tick格式
drTick = DrTickData()
d = drTick.__dict__
for key in d.keys():
if key != 'datetime':
d[key] = tick.__getattribute__(key)
drTick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
# 更新Tick数据
if vtSymbol in self.tickDict:
self.insertData(TICK_DB_NAME, vtSymbol, drTick)
if vtSymbol in self.activeSymbolDict:
activeSymbol = self.activeSymbolDict[vtSymbol]
self.insertData(TICK_DB_NAME, activeSymbol, drTick)
# 发出日志
self.writeDrLog(u'记录Tick数据%s,时间:%s, last:%s, bid:%s, ask:%s'
%(drTick.vtSymbol, drTick.time, drTick.lastPrice, drTick.bidPrice1, drTick.askPrice1))
# 更新分钟线数据
if vtSymbol in self.barDict:
bar = self.barDict[vtSymbol]
# 如果第一个TICK或者新的一分钟
if not bar.datetime or bar.datetime.minute != drTick.datetime.minute:
if bar.vtSymbol:
newBar = copy.copy(bar)
self.insertData(MINUTE_DB_NAME, vtSymbol, newBar)
if vtSymbol in self.activeSymbolDict:
activeSymbol = self.activeSymbolDict[vtSymbol]
self.insertData(MINUTE_DB_NAME, activeSymbol, newBar)
self.writeDrLog(u'记录分钟线数据%s,时间:%s, O:%s, H:%s, L:%s, C:%s'
%(bar.vtSymbol, bar.time, bar.open, bar.high,
bar.low, bar.close))
bar.vtSymbol = drTick.vtSymbol
bar.symbol = drTick.symbol
bar.exchange = drTick.exchange
bar.open = drTick.lastPrice
bar.high = drTick.lastPrice
bar.low = drTick.lastPrice
bar.close = drTick.lastPrice
bar.date = drTick.date
bar.time = drTick.time
bar.datetime = drTick.datetime
bar.volume = drTick.volume
bar.openInterest = drTick.openInterest
# 否则继续累加新的K线
else:
bar.high = max(bar.high, drTick.lastPrice)
bar.low = min(bar.low, drTick.lastPrice)
bar.close = drTick.lastPrice
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
#----------------------------------------------------------------------
def insertData(self, dbName, collectionName, data):
"""插入数据到数据库这里的data可以是CtaTickData或者CtaBarData"""
self.queue.put((dbName, collectionName, data.__dict__))
#----------------------------------------------------------------------
def run(self):
"""运行插入线程"""
while self.active:
try:
dbName, collectionName, d = self.queue.get(block=True, timeout=1)
self.mainEngine.dbInsert(dbName, collectionName, d)
except Empty:
pass
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.active = True
self.thread.start()
#----------------------------------------------------------------------
def stop(self):
"""退出"""
if self.active:
self.active = False
self.thread.join()
2016-07-02 03:12:44 +00:00
#----------------------------------------------------------------------
def writeDrLog(self, content):
"""快速发出日志事件"""
log = VtLogData()
log.logContent = content
event = Event(type_=EVENT_DATARECORDER_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)