1. 修复CTA回测功能里TICK回测的撮合bug(之前偷懒忘记了)

2. 行情记录引擎增加单独的数据插入线程和队列,用于解决事件引擎线程阻塞的问题
This commit is contained in:
chenxy123 2016-06-13 22:48:23 +08:00
parent c47c27db51
commit 005df0bcd6
4 changed files with 49 additions and 12 deletions

View File

@ -279,13 +279,15 @@ class BacktestingEngine(object):
"""基于最新数据撮合限价单""" """基于最新数据撮合限价单"""
# 先确定会撮合成交的价格 # 先确定会撮合成交的价格
if self.mode == self.BAR_MODE: if self.mode == self.BAR_MODE:
buyCrossPrice = self.bar.low # 若买入方向限价单价格高于该价格,则会成交 buyCrossPrice = self.bar.low # 若买入方向限价单价格高于该价格,则会成交
sellCrossPrice = self.bar.high # 若卖出方向限价单价格低于该价格,则会成交 sellCrossPrice = self.bar.high # 若卖出方向限价单价格低于该价格,则会成交
bestCrossPrice = self.bar.open # 在当前时间点前发出的委托可能的最优成交价 buyBestCrossPrice = self.bar.open # 在当前时间点前发出的买入委托可能的最优成交价
sellBestCrossPrice = self.bar.open # 在当前时间点前发出的卖出委托可能的最优成交价
else: else:
buyCrossPrice = self.tick.lastPrice buyCrossPrice = self.tick.askPrice
sellCrossPrice = self.tick.lastPrice sellCrossPrice = self.tick.bidPrice
bestCrossPrice = self.tick.lastPrice buyBestCrossPrice = self.tick.askPrice
sellBestCrossPrice = self.tick.bidPrice
# 遍历限价单字典中的所有限价单 # 遍历限价单字典中的所有限价单
for orderID, order in self.workingLimitOrderDict.items(): for orderID, order in self.workingLimitOrderDict.items():
@ -312,10 +314,10 @@ class BacktestingEngine(object):
# 2. 假设在上一根K线结束(也是当前K线开始)的时刻策略发出的委托为限价105 # 2. 假设在上一根K线结束(也是当前K线开始)的时刻策略发出的委托为限价105
# 3. 则在实际中的成交价会是100而不是105因为委托发出时市场的最优价格是100 # 3. 则在实际中的成交价会是100而不是105因为委托发出时市场的最优价格是100
if buyCross: if buyCross:
trade.price = min(order.price, bestCrossPrice) trade.price = min(order.price, buyBestCrossPrice)
self.strategy.pos += order.totalVolume self.strategy.pos += order.totalVolume
else: else:
trade.price = max(order.price, bestCrossPrice) trade.price = max(order.price, sellBestCrossPrice)
self.strategy.pos -= order.totalVolume self.strategy.pos -= order.totalVolume
trade.volume = order.totalVolume trade.volume = order.totalVolume

View File

@ -3,7 +3,7 @@
"tick": "tick":
[ [
["IF1605", "SGIT"], ["m1609", "XSPEED"],
["IF1606", "SGIT"], ["IF1606", "SGIT"],
["IH1606", "SGIT"], ["IH1606", "SGIT"],
["IH1606", "SGIT"], ["IH1606", "SGIT"],

View File

@ -11,6 +11,8 @@ import os
import copy import copy
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime, timedelta from datetime import datetime, timedelta
from Queue import Queue
from threading import Thread
from eventEngine import * from eventEngine import *
from vtGateway import VtSubscribeReq, VtLogData from vtGateway import VtSubscribeReq, VtLogData
@ -43,6 +45,11 @@ class DrEngine(object):
# K线对象字典 # K线对象字典
self.barDict = {} self.barDict = {}
# 负责执行数据库插入的单独线程相关
self.active = False # 工作状态
self.queue = Queue() # 队列
self.thread = Thread(target=self.run) # 线程
# 载入设置,订阅行情 # 载入设置,订阅行情
self.loadSetting() self.loadSetting()
@ -111,9 +118,12 @@ class DrEngine(object):
# 注意这里的vtSymbol对于IB和LTS接口应该后缀.交易所 # 注意这里的vtSymbol对于IB和LTS接口应该后缀.交易所
for activeSymbol, vtSymbol in d.items(): for activeSymbol, vtSymbol in d.items():
self.activeSymbolDict[vtSymbol] = activeSymbol self.activeSymbolDict[vtSymbol] = activeSymbol
# 启动数据插入线程
self.start()
# 注册事件监听 # 注册事件监听
self.registerEvent() self.registerEvent()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def procecssTickEvent(self, event): def procecssTickEvent(self, event):
@ -187,7 +197,29 @@ class DrEngine(object):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def insertData(self, dbName, collectionName, data): def insertData(self, dbName, collectionName, data):
"""插入数据到数据库这里的data可以是CtaTickData或者CtaBarData""" """插入数据到数据库这里的data可以是CtaTickData或者CtaBarData"""
self.mainEngine.dbInsert(dbName, collectionName, data.__dict__) self.queue.put((dbName, collectionName, data.__dict__))
#----------------------------------------------------------------------
def run(self):
"""运行插入线程"""
while self.active:
try:
dbName, collectionName, d = self.queue.get(block=True, timeout=1)
self.mainEngine.dbInsert(dbName, collectionName, d)
except Empty:
pass
#----------------------------------------------------------------------
def start(self):
"""启动"""
self.active = True
self.thread.start()
#----------------------------------------------------------------------
def stop(self):
"""退出"""
if self.active:
self.active = False
self.thread.join()
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def writeDrLog(self, content): def writeDrLog(self, content):

View File

@ -188,6 +188,9 @@ class MainEngine(object):
# 停止事件引擎 # 停止事件引擎
self.eventEngine.stop() self.eventEngine.stop()
# 停止数据记录引擎
self.drEngine.stop()
# 保存数据引擎里的合约数据到硬盘 # 保存数据引擎里的合约数据到硬盘
self.dataEngine.saveContracts() self.dataEngine.saveContracts()