diff --git a/examples/DataRecording/DR_setting.json b/examples/DataRecording/DR_setting.json index 5ec30169..05ae8279 100644 --- a/examples/DataRecording/DR_setting.json +++ b/examples/DataRecording/DR_setting.json @@ -1,5 +1,6 @@ { "working": true, + "marketCloseTime": "15:05:00", "tick": [ diff --git a/vnpy/trader/app/ctaStrategy/ctaTemplate.py b/vnpy/trader/app/ctaStrategy/ctaTemplate.py index 5f829d0f..f6b6a465 100644 --- a/vnpy/trader/app/ctaStrategy/ctaTemplate.py +++ b/vnpy/trader/app/ctaStrategy/ctaTemplate.py @@ -449,6 +449,13 @@ class BarGenerator(object): # 清空老K线缓存对象 self.xminBar = None + #---------------------------------------------------------------------- + def generate(self): + """手动强制立即完成K线合成""" + self.onBar(self.bar) + self.bar = None + + ######################################################################## class ArrayManager(object): diff --git a/vnpy/trader/app/dataRecorder/drEngine.py b/vnpy/trader/app/dataRecorder/drEngine.py index ba6049af..dee29801 100644 --- a/vnpy/trader/app/dataRecorder/drEngine.py +++ b/vnpy/trader/app/dataRecorder/drEngine.py @@ -12,7 +12,7 @@ import os import copy import traceback from collections import OrderedDict -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time from Queue import Queue, Empty from threading import Thread from pymongo.errors import DuplicateKeyError @@ -60,6 +60,11 @@ class DrEngine(object): self.queue = Queue() # 队列 self.thread = Thread(target=self.run) # 线程 + # 收盘相关 + self.marketCloseTime = None # 收盘时间 + self.timerCount = 0 # 定时器计数 + self.lastTimerTime = None # 上一次记录时间 + # 载入设置,订阅行情 self.loadSetting() @@ -79,6 +84,11 @@ class DrEngine(object): working = drSetting['working'] if not working: return + + # 加载收盘时间 + if 'marketCloseTime' in drSetting: + timestamp = drSetting['marketCloseTime'] + self.marketCloseTime = datetime.strptime(timestamp, '%H:%M:%S').time() # Tick记录配置 if 'tick' in drSetting: @@ -185,7 +195,37 @@ class DrEngine(object): bm = self.bgDict.get(vtSymbol, None) if bm: bm.updateTick(tick) + + #---------------------------------------------------------------------- + def processTimerEvent(self, event): + """处理定时事件""" + # 如果没有设置收盘时间,则无需处理 + if not self.marketCloseTime: + return + # 10秒检查一次 + self.timerCount += 1 + if self.timerCount < 10: + return + self.timerCount = 0 + + # 获取当前时间 + currentTime = datetime.now().time() + + if not self.lastTimerTime: + self.lastTimerTime = currentTime + return + + # 上一个时间戳尚未到收盘时间,且当前时间戳已经到收盘时间 + if (self.lastTimerTime < self.marketCloseTime and + currentTime >= self.marketCloseTime): + # 强制所有的K线生成器立即完成K线 + for bg in self.bgDict.values(): + bg.generate() + + # 记录新的时间 + self.lastTimerTime = currentTime + #---------------------------------------------------------------------- def onTick(self, tick): """Tick更新""" @@ -227,6 +267,7 @@ class DrEngine(object): def registerEvent(self): """注册事件监听""" self.eventEngine.register(EVENT_TICK, self.procecssTickEvent) + self.eventEngine.register(EVENT_TIMER, self.processTimerEvent) #---------------------------------------------------------------------- def insertData(self, dbName, collectionName, data):