From 29dd68e88304fcc223cf73f00542c23dc3dcbaca Mon Sep 17 00:00:00 2001 From: zhu4ling3 Date: Thu, 12 Apr 2018 23:57:33 -0400 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90dataRecord?= =?UTF-8?q?=E7=9A=84=E5=85=A8=E8=BF=87=E7=A8=8B=E6=97=B6=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/trader/app/histDataCollector/__init__.py | 8 + vnpy/trader/app/histDataCollector/hdcBase.py | 19 ++ .../trader/app/histDataCollector/hdcEngine.py | 274 ++++++++++++++++++ .../histDataCollector/language/__init__.py | 13 + .../language/chinese/__init__.py | 0 .../language/chinese/text.py | 16 + .../language/english/__init__.py | 0 .../language/english/text.py | 15 + 8 files changed, 345 insertions(+) create mode 100644 vnpy/trader/app/histDataCollector/__init__.py create mode 100644 vnpy/trader/app/histDataCollector/hdcBase.py create mode 100644 vnpy/trader/app/histDataCollector/hdcEngine.py create mode 100644 vnpy/trader/app/histDataCollector/language/__init__.py create mode 100644 vnpy/trader/app/histDataCollector/language/chinese/__init__.py create mode 100644 vnpy/trader/app/histDataCollector/language/chinese/text.py create mode 100644 vnpy/trader/app/histDataCollector/language/english/__init__.py create mode 100644 vnpy/trader/app/histDataCollector/language/english/text.py diff --git a/vnpy/trader/app/histDataCollector/__init__.py b/vnpy/trader/app/histDataCollector/__init__.py new file mode 100644 index 00000000..c177142d --- /dev/null +++ b/vnpy/trader/app/histDataCollector/__init__.py @@ -0,0 +1,8 @@ +# encoding: UTF-8 + +from hdcEngine import HdcEngine + + +appName = 'HistDataCollector' +appDisplayName = u'HistData记录' +appEngine = HdcEngine diff --git a/vnpy/trader/app/histDataCollector/hdcBase.py b/vnpy/trader/app/histDataCollector/hdcBase.py new file mode 100644 index 00000000..c7249443 --- /dev/null +++ b/vnpy/trader/app/histDataCollector/hdcBase.py @@ -0,0 +1,19 @@ +# encoding: UTF-8 + +''' +本文件中包含的数据格式和CTA模块通用,用户有必要可以自行添加格式。 +''' + +from __future__ import division + +# 数据库名称 +SETTING_DB_NAME = 'VnTrader_Setting_Db' +TICK_DB_NAME = 'VnTrader_Tick_Db' +DAILY_DB_NAME = 'VnTrader_Daily_Db' +MINUTE_DB_NAME = 'VnTrader_1Min_Db' + +# 行情记录模块事件 +EVENT_DATARECORDER_LOG = 'eHistDataCollectorLog' # 行情记录日志更新事件 + +# CTA引擎中涉及的数据类定义 +from vnpy.trader.vtConstant import EMPTY_UNICODE, EMPTY_STRING, EMPTY_FLOAT, EMPTY_INT diff --git a/vnpy/trader/app/histDataCollector/hdcEngine.py b/vnpy/trader/app/histDataCollector/hdcEngine.py new file mode 100644 index 00000000..703059af --- /dev/null +++ b/vnpy/trader/app/histDataCollector/hdcEngine.py @@ -0,0 +1,274 @@ +# encoding: UTF-8 + +''' +本文件中实现了行情数据记录引擎,用于汇总TICK数据,并生成K线插入数据库。 + +使用DR_setting.json来配置需要收集的合约,以及主力合约代码。 +''' + +import json +import csv +import os +import copy +import traceback +from collections import OrderedDict +from datetime import datetime, timedelta +from Queue import Queue, Empty +from threading import Thread +from pymongo.errors import DuplicateKeyError + +from vnpy.event import Event +from vnpy.trader.vtEvent import * +from vnpy.trader.vtFunction import todayDate, getJsonPath +from vnpy.trader.vtObject import VtSubscribeReq, VtLogData, VtBarData, VtTickData +from vnpy.trader.app.ctaStrategy.ctaTemplate import BarGenerator + +from .hdcBase import * +from .language import text + + +######################################################################## +class HdcEngine(object): + """数据记录引擎""" + + settingFileName = 'HD_setting.json' + settingFilePath = getJsonPath(settingFileName, __file__) + + #---------------------------------------------------------------------- + def __init__(self, mainEngine, eventEngine): + """Constructor""" + self.mainEngine = mainEngine + self.eventEngine = eventEngine + + # 当前日期 + self.today = todayDate() + + # 主力合约代码映射字典,key为具体的合约代码(如IF1604),value为主力合约代码(如IF0000) + self.activeSymbolDict = {} + + # Tick对象字典 + self.tickSymbolSet = set() + + # K线合成器字典 + self.bgDict = {} + + # 配置字典 + self.settingDict = OrderedDict() + + # 负责执行数据库插入的单独线程相关 + self.active = False # 工作状态 + self.queue = Queue() # 队列 + self.thread = Thread(target=self.run) # 线程 + + # 载入设置,订阅行情 + self.loadSetting() + + # 启动数据插入线程 + self.start() + + # 注册事件监听 + self.registerEvent() + + #---------------------------------------------------------------------- + def loadSetting(self): + """加载配置""" + with open(self.settingFilePath) as f: + drSetting = json.load(f) + + # 如果working设为False则不启动行情记录功能 + working = drSetting['working'] + if not working: + return + + # Tick记录配置 + if 'tick' in drSetting: + l = drSetting['tick'] + + for setting in l: + symbol = setting[0] + gateway = setting[1] + vtSymbol = symbol + + req = VtSubscribeReq() + req.symbol = setting[0] + + # 针对LTS和IB接口,订阅行情需要交易所代码 + if len(setting)>=3: + req.exchange = setting[2] + vtSymbol = '.'.join([symbol, req.exchange]) + + # 针对IB接口,订阅行情需要货币和产品类型 + if len(setting)>=5: + req.currency = setting[3] + req.productClass = setting[4] + + self.mainEngine.subscribe(req, gateway) + + #tick = VtTickData() # 该tick实例可以用于缓存部分数据(目前未使用) + #self.tickDict[vtSymbol] = tick + self.tickSymbolSet.add(vtSymbol) + + # 保存到配置字典中 + if vtSymbol not in self.settingDict: + d = { + 'symbol': symbol, + 'gateway': gateway, + 'tick': True + } + self.settingDict[vtSymbol] = d + else: + d = self.settingDict[vtSymbol] + d['tick'] = True + + # 分钟线记录配置 + if 'bar' in drSetting: + l = drSetting['bar'] + + for setting in l: + symbol = setting[0] + gateway = setting[1] + vtSymbol = symbol + + req = VtSubscribeReq() + req.symbol = symbol + + if len(setting)>=3: + req.exchange = setting[2] + vtSymbol = '.'.join([symbol, req.exchange]) + + if len(setting)>=5: + req.currency = setting[3] + req.productClass = setting[4] + + self.mainEngine.subscribe(req, gateway) + + # 保存到配置字典中 + if vtSymbol not in self.settingDict: + d = { + 'symbol': symbol, + 'gateway': gateway, + 'bar': True + } + self.settingDict[vtSymbol] = d + else: + d = self.settingDict[vtSymbol] + d['bar'] = True + + # 创建BarManager对象 + self.bgDict[vtSymbol] = BarGenerator(self.onBar) + + # 主力合约记录配置 + if 'active' in drSetting: + d = drSetting['active'] + self.activeSymbolDict = {vtSymbol:activeSymbol for activeSymbol, vtSymbol in d.items()} + + #---------------------------------------------------------------------- + def getSetting(self): + """获取配置""" + return self.settingDict, self.activeSymbolDict + + #---------------------------------------------------------------------- + def procecssTickEvent(self, event): + """处理行情事件""" + tick = event.dict_['data'] + vtSymbol = tick.vtSymbol + + # 生成datetime对象 + if not tick.datetime: + tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f') + + self.onTick(tick) + + bm = self.bgDict.get(vtSymbol, None) + if bm: + bm.updateTick(tick) + + #---------------------------------------------------------------------- + def onTick(self, tick): + """Tick更新""" + vtSymbol = tick.vtSymbol + + if vtSymbol in self.tickSymbolSet: + self.insertData(TICK_DB_NAME, vtSymbol, tick) + + if vtSymbol in self.activeSymbolDict: + activeSymbol = self.activeSymbolDict[vtSymbol] + self.insertData(TICK_DB_NAME, activeSymbol, tick) + + + self.writeDrLog(text.TICK_LOGGING_MESSAGE.format(symbol=tick.vtSymbol, + time=tick.time, + last=tick.lastPrice, + bid=tick.bidPrice1, + ask=tick.askPrice1)) + + #---------------------------------------------------------------------- + def onBar(self, bar): + """分钟线更新""" + vtSymbol = bar.vtSymbol + + self.insertData(MINUTE_DB_NAME, vtSymbol, bar) + + if vtSymbol in self.activeSymbolDict: + activeSymbol = self.activeSymbolDict[vtSymbol] + self.insertData(MINUTE_DB_NAME, activeSymbol, bar) + + self.writeDrLog(text.BAR_LOGGING_MESSAGE.format(symbol=bar.vtSymbol, + time=bar.time, + open=bar.open, + high=bar.high, + low=bar.low, + close=bar.close)) + + #---------------------------------------------------------------------- + def registerEvent(self): + """注册事件监听""" + self.eventEngine.register(EVENT_TICK, self.procecssTickEvent) + + #---------------------------------------------------------------------- + def insertData(self, dbName, collectionName, data): + """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)""" + self.queue.put((dbName, collectionName, data.__dict__)) + + #---------------------------------------------------------------------- + def run(self): + """运行插入线程""" + while self.active: + try: + dbName, collectionName, d = self.queue.get(block=True, timeout=1) + + # 这里采用MongoDB的update模式更新数据,在记录tick数据时会由于查询 + # 过于频繁,导致CPU占用和硬盘读写过高后系统卡死,因此不建议使用 + #flt = {'datetime': d['datetime']} + #self.mainEngine.dbUpdate(dbName, collectionName, d, flt, True) + + # 使用insert模式更新数据,可能存在时间戳重复的情况,需要用户自行清洗 + try: + self.mainEngine.dbInsert(dbName, collectionName, d) + except DuplicateKeyError: + self.writeDrLog(u'键值重复插入失败,报错信息:' %traceback.format_exc()) + except Empty: + pass + + #---------------------------------------------------------------------- + def start(self): + """启动""" + self.active = True + self.thread.start() + + #---------------------------------------------------------------------- + def stop(self): + """退出""" + if self.active: + self.active = False + self.thread.join() + + #---------------------------------------------------------------------- + def writeDrLog(self, content): + """快速发出日志事件""" + log = VtLogData() + log.logContent = content + event = Event(type_=EVENT_DATARECORDER_LOG) + event.dict_['data'] = log + self.eventEngine.put(event) + \ No newline at end of file diff --git a/vnpy/trader/app/histDataCollector/language/__init__.py b/vnpy/trader/app/histDataCollector/language/__init__.py new file mode 100644 index 00000000..ecf72358 --- /dev/null +++ b/vnpy/trader/app/histDataCollector/language/__init__.py @@ -0,0 +1,13 @@ +# encoding: UTF-8 + +import json +import os +import traceback + +# 默认设置 +from chinese import text + +# 是否要使用英文 +from vnpy.trader.vtGlobal import globalSetting +if globalSetting['language'] == 'english': + from english import text \ No newline at end of file diff --git a/vnpy/trader/app/histDataCollector/language/chinese/__init__.py b/vnpy/trader/app/histDataCollector/language/chinese/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/trader/app/histDataCollector/language/chinese/text.py b/vnpy/trader/app/histDataCollector/language/chinese/text.py new file mode 100644 index 00000000..fdb2179f --- /dev/null +++ b/vnpy/trader/app/histDataCollector/language/chinese/text.py @@ -0,0 +1,16 @@ +# encoding: UTF-8 + +DATA_RECORDER = u'行情记录' + +TICK_RECORD = u'Tick记录' +BAR_RECORD = u'Bar记录' +TICK_RECORD = u'Tick记录' + +CONTRACT_SYMBOL = u'合约代码' +GATEWAY = u'接口' + +DOMINANT_CONTRACT = u'主力合约' +DOMINANT_SYMBOL = u'主力代码' + +TICK_LOGGING_MESSAGE = u'记录Tick数据{symbol},时间:{time}, last:{last}, bid:{bid}, ask:{ask}' +BAR_LOGGING_MESSAGE = u'记录分钟线数据{symbol},时间:{time}, O:{open}, H:{high}, L:{low}, C:{close}' \ No newline at end of file diff --git a/vnpy/trader/app/histDataCollector/language/english/__init__.py b/vnpy/trader/app/histDataCollector/language/english/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/trader/app/histDataCollector/language/english/text.py b/vnpy/trader/app/histDataCollector/language/english/text.py new file mode 100644 index 00000000..2230e6ca --- /dev/null +++ b/vnpy/trader/app/histDataCollector/language/english/text.py @@ -0,0 +1,15 @@ +# encoding: UTF-8 + +DATA_RECORDER = u'Data Recorder' + +TICK_RECORD = u'Tick Record' +BAR_RECORD = u'Bar Record' + +CONTRACT_SYMBOL = u'Contract Symbol' +GATEWAY = u'Gateway' + +DOMINANT_CONTRACT = u'Dominant Contract' +DOMINANT_SYMBOL = u'Dominant Symbol' + +TICK_LOGGING_MESSAGE = u'Record Tick Data {symbol}, Time:{time}, last:{last}, bid:{bid}, ask:{ask}' +BAR_LOGGING_MESSAGE = u'Record Bar Data {symbol}, Time:{time}, O:{open}, H:{high}, L:{low}, C:{close}' \ No newline at end of file