初步完成dataRecord的全过程时序

This commit is contained in:
zhu4ling3 2018-04-12 23:57:33 -04:00
parent c36a58d5e3
commit 29dd68e883
8 changed files with 345 additions and 0 deletions

View File

@ -0,0 +1,8 @@
# encoding: UTF-8
from hdcEngine import HdcEngine
appName = 'HistDataCollector'
appDisplayName = u'HistData记录'
appEngine = HdcEngine

View File

@ -0,0 +1,19 @@
# encoding: UTF-8
'''
本文件中包含的数据格式和CTA模块通用用户有必要可以自行添加格式
'''
from __future__ import division
# 数据库名称
SETTING_DB_NAME = 'VnTrader_Setting_Db'
TICK_DB_NAME = 'VnTrader_Tick_Db'
DAILY_DB_NAME = 'VnTrader_Daily_Db'
MINUTE_DB_NAME = 'VnTrader_1Min_Db'
# 行情记录模块事件
EVENT_DATARECORDER_LOG = 'eHistDataCollectorLog' # 行情记录日志更新事件
# CTA引擎中涉及的数据类定义
from vnpy.trader.vtConstant import EMPTY_UNICODE, EMPTY_STRING, EMPTY_FLOAT, EMPTY_INT

View File

@ -0,0 +1,274 @@
# encoding: UTF-8
'''
本文件中实现了行情数据记录引擎用于汇总TICK数据并生成K线插入数据库
使用DR_setting.json来配置需要收集的合约以及主力合约代码
'''
import json
import csv
import os
import copy
import traceback
from collections import OrderedDict
from datetime import datetime, timedelta
from Queue import Queue, Empty
from threading import Thread
from pymongo.errors import DuplicateKeyError
from vnpy.event import Event
from vnpy.trader.vtEvent import *
from vnpy.trader.vtFunction import todayDate, getJsonPath
from vnpy.trader.vtObject import VtSubscribeReq, VtLogData, VtBarData, VtTickData
from vnpy.trader.app.ctaStrategy.ctaTemplate import BarGenerator
from .hdcBase import *
from .language import text
########################################################################
class HdcEngine(object):
"""数据记录引擎"""
settingFileName = 'HD_setting.json'
settingFilePath = getJsonPath(settingFileName, __file__)
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
# 当前日期
self.today = todayDate()
# 主力合约代码映射字典key为具体的合约代码如IF1604value为主力合约代码如IF0000
self.activeSymbolDict = {}
# Tick对象字典
self.tickSymbolSet = set()
# K线合成器字典
self.bgDict = {}
# 配置字典
self.settingDict = OrderedDict()
# 负责执行数据库插入的单独线程相关
self.active = False # 工作状态
self.queue = Queue() # 队列
self.thread = Thread(target=self.run) # 线程
# 载入设置,订阅行情
self.loadSetting()
# 启动数据插入线程
self.start()
# 注册事件监听
self.registerEvent()
#----------------------------------------------------------------------
def loadSetting(self):
"""加载配置"""
with open(self.settingFilePath) as f:
drSetting = json.load(f)
# 如果working设为False则不启动行情记录功能
working = drSetting['working']
if not working:
return
# Tick记录配置
if 'tick' in drSetting:
l = drSetting['tick']
for setting in l:
symbol = setting[0]
gateway = setting[1]
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = setting[0]
# 针对LTS和IB接口订阅行情需要交易所代码
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
# 针对IB接口订阅行情需要货币和产品类型
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, gateway)
#tick = VtTickData() # 该tick实例可以用于缓存部分数据目前未使用
#self.tickDict[vtSymbol] = tick
self.tickSymbolSet.add(vtSymbol)
# 保存到配置字典中
if vtSymbol not in self.settingDict:
d = {
'symbol': symbol,
'gateway': gateway,
'tick': True
}
self.settingDict[vtSymbol] = d
else:
d = self.settingDict[vtSymbol]
d['tick'] = True
# 分钟线记录配置
if 'bar' in drSetting:
l = drSetting['bar']
for setting in l:
symbol = setting[0]
gateway = setting[1]
vtSymbol = symbol
req = VtSubscribeReq()
req.symbol = symbol
if len(setting)>=3:
req.exchange = setting[2]
vtSymbol = '.'.join([symbol, req.exchange])
if len(setting)>=5:
req.currency = setting[3]
req.productClass = setting[4]
self.mainEngine.subscribe(req, gateway)
# 保存到配置字典中
if vtSymbol not in self.settingDict:
d = {
'symbol': symbol,
'gateway': gateway,
'bar': True
}
self.settingDict[vtSymbol] = d
else:
d = self.settingDict[vtSymbol]
d['bar'] = True
# 创建BarManager对象
self.bgDict[vtSymbol] = BarGenerator(self.onBar)
# 主力合约记录配置
if 'active' in drSetting:
d = drSetting['active']
self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()}
#----------------------------------------------------------------------
def getSetting(self):
"""获取配置"""
return self.settingDict, self.activeSymbolDict
#----------------------------------------------------------------------
def procecssTickEvent(self, event):
"""处理行情事件"""
tick = event.dict_['data']
vtSymbol = tick.vtSymbol
# 生成datetime对象
if not tick.datetime:
tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')
self.onTick(tick)
bm = self.bgDict.get(vtSymbol, None)
if bm:
bm.updateTick(tick)
#----------------------------------------------------------------------
def onTick(self, tick):
"""Tick更新"""
vtSymbol = tick.vtSymbol
if vtSymbol in self.tickSymbolSet:
self.insertData(TICK_DB_NAME, vtSymbol, tick)
if vtSymbol in self.activeSymbolDict:
activeSymbol = self.activeSymbolDict[vtSymbol]
self.insertData(TICK_DB_NAME, activeSymbol, tick)
self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol,
time=tick.time,
last=tick.lastPrice,
bid=tick.bidPrice1,
ask=tick.askPrice1))
#----------------------------------------------------------------------
def onBar(self, bar):
"""分钟线更新"""
vtSymbol = bar.vtSymbol
self.insertData(MINUTE_DB_NAME, vtSymbol, bar)
if vtSymbol in self.activeSymbolDict:
activeSymbol = self.activeSymbolDict[vtSymbol]
self.insertData(MINUTE_DB_NAME, activeSymbol, bar)
self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol,
time=bar.time,
open=bar.open,
high=bar.high,
low=bar.low,
close=bar.close))
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.register(EVENT_TICK, self.procecssTickEvent)
#----------------------------------------------------------------------
def insertData(self, dbName, collectionName, data):
"""插入数据到数据库这里的data可以是VtTickData或者VtBarData"""
self.queue.put((dbName, collectionName, data.__dict__))
#----------------------------------------------------------------------
def run(self):
"""运行插入线程"""
while self.active:
try:
dbName, collectionName, d = self.queue.get(block=True, timeout=1)
# 这里采用MongoDB的update模式更新数据在记录tick数据时会由于查询
# 过于频繁导致CPU占用和硬盘读写过高后系统卡死因此不建议使用
#flt = {'datetime': d['datetime']}
#self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True)
# 使用insert模式更新数据可能存在时间戳重复的情况需要用户自行清洗
try:
self.mainEngine.dbInsert(dbName, collectionName, d)
except DuplicateKeyError:
self.writeDrLog(u'键值重复插入失败,报错信息:' %traceback.format_exc())
except Empty:
pass
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.active = True
self.thread.start()
#----------------------------------------------------------------------
def stop(self):
"""退出"""
if self.active:
self.active = False
self.thread.join()
#----------------------------------------------------------------------
def writeDrLog(self, content):
"""快速发出日志事件"""
log = VtLogData()
log.logContent = content
event = Event(type_=EVENT_DATARECORDER_LOG)
event.dict_['data'] = log
self.eventEngine.put(event)

View File

@ -0,0 +1,13 @@
# encoding: UTF-8
import json
import os
import traceback
# 默认设置
from chinese import text
# 是否要使用英文
from vnpy.trader.vtGlobal import globalSetting
if globalSetting['language'] == 'english':
from english import text

View File

@ -0,0 +1,16 @@
# encoding: UTF-8
DATA_RECORDER = u'行情记录'
TICK_RECORD = u'Tick记录'
BAR_RECORD = u'Bar记录'
TICK_RECORD = u'Tick记录'
CONTRACT_SYMBOL = u'合约代码'
GATEWAY = u'接口'
DOMINANT_CONTRACT = u'主力合约'
DOMINANT_SYMBOL = u'主力代码'
TICK_LOGGING_MESSAGE = u'记录Tick数据{symbol},时间:{time}, last:{last}, bid:{bid}, ask:{ask}'
BAR_LOGGING_MESSAGE = u'记录分钟线数据{symbol},时间:{time}, O:{open}, H:{high}, L:{low}, C:{close}'

View File

@ -0,0 +1,15 @@
# encoding: UTF-8
DATA_RECORDER = u'Data Recorder'
TICK_RECORD = u'Tick Record'
BAR_RECORD = u'Bar Record'
CONTRACT_SYMBOL = u'Contract Symbol'
GATEWAY = u'Gateway'
DOMINANT_CONTRACT = u'Dominant Contract'
DOMINANT_SYMBOL = u'Dominant Symbol'
TICK_LOGGING_MESSAGE = u'Record Tick Data {symbol}, Time:{time}, last:{last}, bid:{bid}, ask:{ask}'
BAR_LOGGING_MESSAGE = u'Record Bar Data {symbol}, Time:{time}, O:{open}, H:{high}, L:{low}, C:{close}'