初步完成回测引擎

This commit is contained in:
WOLF 2015-05-28 13:52:59 +08:00
parent 6e4aa7eb8c
commit ae633f5ca0
7 changed files with 532 additions and 17 deletions

View File

@ -6,4 +6,14 @@
完成了一个演示性的双指数均线策略填入账号、密码等信息后直接运行demoStrategy.py就可以启动。
##2015/5/28
完成了一个简单的回测引擎可以读取MongoDB中的历史TICK数据并通过回放的方式完全模拟真实情况下的策略交易情况。
回测结束后将所有的成交记录输出到一个基于shelve的二进制文件中用户可以在IPython或者Spyder之类的交互式环境中读取该文件的数据进行绩效分析。
##计划
与其说是一个成熟的产品该模块更多应该被视作一个Python策略自动交易的Demo目前只是非常粗糙的实现了实盘策略交易和基于TICK的仿实盘回测功能其他诸如交易结果分析之类的工作都需要用户自行完成。
未来将会加入回测结果输出等等的功能模块回测完成后直接生成报表便于用户查看以及基于K线数据的回测TICK数据还是挺难弄到的

View File

@ -0,0 +1,224 @@
# encoding: UTF-8
import shelve
from eventEngine import *
from pymongo import Connection
from pymongo.errors import *
from strategyEngine import *
########################################################################
class LimitOrder(object):
"""限价单对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol
self.price = 0
self.volume = 0
self.direction = None
self.offset = None
########################################################################
class BacktestingEngine(object):
"""
回测引擎作用
1. 从数据库中读取数据并回放
2. 作为StrategyEngine创建时的参数传入
"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.eventEngine = EventEngine()
# 策略引擎
self.strategyEngine = None
# TICK历史数据列表由于要使用For循环来实现仿真回放
# 使用list的速度比Numpy和Pandas都要更快
self.listDataHistory = []
# 限价单字典
self.dictOrder = {}
# 最新的TICK数据
self.currentData = None
# 回测的成交字典
self.listTrade = []
# 报单编号
self.orderRef = 0
# 成交编号
self.tradeID = 0
#----------------------------------------------------------------------
def setStrategyEngine(self, engine):
"""设置策略引擎"""
self.strategyEngine = engine
self.writeLog(u'策略引擎设置完成')
#----------------------------------------------------------------------
def connectMongo(self):
"""连接MongoDB数据库"""
try:
self.__mongoConnection = Connection()
self.__mongoConnected = True
self.__mongoTickDB = self.__mongoConnection['TickDB']
self.writeLog(u'回测引擎连接MongoDB成功')
except ConnectionFailure:
self.writeLog(u'回测引擎连接MongoDB失败')
#----------------------------------------------------------------------
def loadDataHistory(self, symbol, startDate, endDate):
"""载入历史TICK数据"""
if self.__mongoConnected:
collection = self.__mongoTickDB[symbol]
# 如果输入了读取TICK的最后日期
if endDate:
cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}})
elif startDate:
cx = collection.find({'date':{'$gte':startDate}})
else:
cx = collection.find()
# 将TICK数据读入内存
self.listDataHistory = [data for data in cx]
self.writeLog(u'历史TICK数据载入完成')
else:
self.writeLog(u'MongoDB未连接请检查')
#----------------------------------------------------------------------
def processLimitOrder(self):
"""处理限价单"""
for ref, order in self.dictOrder.items():
# 如果是买单且限价大于等于当前TICK的卖一价则假设成交
if order.direction == DIRECTION_BUY and \
order.price >= self.currentData['AskPrice1']:
self.executeLimitOrder(ref, order, self.currentData['AskPrice1'])
# 如果是卖单且限价低于当前TICK的买一价则假设全部成交
if order.direction == DIRECTION_SELL and \
order.price <= self.currentData['BidPrice1']:
self.executeLimitOrder(ref, order, self.currentData['BidPrice1'])
#----------------------------------------------------------------------
def executeLimitOrder(self, ref, order, price):
"""限价单成交处理"""
# 成交回报
self.tradeID = self.tradeID + 1
tradeData = {}
tradeData['InstrumentID'] = order.symbol
tradeData['OrderRef'] = ref
tradeData['TradeID'] = str(self.tradeID)
tradeData['Direction'] = order.direction
tradeData['OffsetFlag'] = order.offset
tradeData['Price'] = price
tradeData['Volume'] = order.volume
tradeEvent = Event()
tradeEvent.dict_['data'] = tradeData
self.strategyEngine.updateTrade(tradeEvent)
# 报单回报
orderData = {}
orderData['InstrumentID'] = order.symbol
orderData['OrderRef'] = ref
orderData['Direction'] = order.direction
orderData['CombOffsetFlag'] = order.offset
orderData['LimitPrice'] = price
orderData['VolumeTotalOriginal'] = order.volume
orderData['VolumeTraded'] = order.volume
orderData['InsertTime'] = ''
orderData['CancelTime'] = ''
orderData['FrontID'] = ''
orderData['SessionID'] = ''
orderData['OrderStatus'] = ''
orderEvent = Event()
orderEvent.dict_['data'] = orderData
self.strategyEngine.updateOrder(orderEvent)
# 记录该成交到列表中
self.listTrade.append(tradeData)
# 删除该限价单
del self.dictOrder[ref]
#----------------------------------------------------------------------
def startBacktesting(self):
"""开始回测"""
self.writeLog(u'开始回测')
for data in self.listDataHistory:
# 记录最新的TICK数据
self.currentData = data
# 处理限价单
self.processLimitOrder()
# 推送到策略引擎中
event = Event()
event.dict_['data'] = data
self.strategyEngine.updateMarketData(event)
self.saveTradeData()
self.writeLog(u'回测结束')
#----------------------------------------------------------------------
def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset):
"""回测发单"""
order = LimitOrder(instrumentid)
order.price = price
order.direction = direction
order.volume = volume
order.offset = offset
self.orderRef = self.orderRef + 1
self.dictOrder[str(self.orderRef)] = order
return str(self.orderRef)
#----------------------------------------------------------------------
def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid):
"""回测撤单"""
try:
del self.dictOrder[orderref]
except KeyError:
pass
#----------------------------------------------------------------------
def writeLog(self, log):
"""写日志"""
print log
#----------------------------------------------------------------------
def selectInstrument(self, symbol):
"""读取合约数据"""
d = {}
d['ExchangeID'] = 'BackTesting'
return d
#----------------------------------------------------------------------
def saveTradeData(self):
"""保存交易记录"""
f = shelve.open('result.vn')
f['listTrade'] = self.listTrade
f.close()
#----------------------------------------------------------------------
def subscribe(self, symbol, exchange):
"""仿真订阅合约"""
pass

View File

@ -0,0 +1,6 @@
# strategydemo说明
该文件夹下:
- demoStrategy包含了一个简单的双EMA均线交易策略填入账号密码等资料后可直接运行
- demoBacktesting包含了一个基于以上策略的回测脚本结果会输出到"result.vn"二进制文件中可以使用Python的shelve模块打开

View File

@ -0,0 +1,224 @@
# encoding: UTF-8
import shelve
from eventEngine import *
from pymongo import Connection
from pymongo.errors import *
from strategyEngine import *
########################################################################
class LimitOrder(object):
"""限价单对象"""
#----------------------------------------------------------------------
def __init__(self, symbol):
"""Constructor"""
self.symbol = symbol
self.price = 0
self.volume = 0
self.direction = None
self.offset = None
########################################################################
class BacktestingEngine(object):
"""
回测引擎作用
1. 从数据库中读取数据并回放
2. 作为StrategyEngine创建时的参数传入
"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
self.eventEngine = EventEngine()
# 策略引擎
self.strategyEngine = None
# TICK历史数据列表由于要使用For循环来实现仿真回放
# 使用list的速度比Numpy和Pandas都要更快
self.listDataHistory = []
# 限价单字典
self.dictOrder = {}
# 最新的TICK数据
self.currentData = None
# 回测的成交字典
self.listTrade = []
# 报单编号
self.orderRef = 0
# 成交编号
self.tradeID = 0
#----------------------------------------------------------------------
def setStrategyEngine(self, engine):
"""设置策略引擎"""
self.strategyEngine = engine
self.writeLog(u'策略引擎设置完成')
#----------------------------------------------------------------------
def connectMongo(self):
"""连接MongoDB数据库"""
try:
self.__mongoConnection = Connection()
self.__mongoConnected = True
self.__mongoTickDB = self.__mongoConnection['TickDB']
self.writeLog(u'回测引擎连接MongoDB成功')
except ConnectionFailure:
self.writeLog(u'回测引擎连接MongoDB失败')
#----------------------------------------------------------------------
def loadDataHistory(self, symbol, startDate, endDate):
"""载入历史TICK数据"""
if self.__mongoConnected:
collection = self.__mongoTickDB[symbol]
# 如果输入了读取TICK的最后日期
if endDate:
cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}})
elif startDate:
cx = collection.find({'date':{'$gte':startDate}})
else:
cx = collection.find()
# 将TICK数据读入内存
self.listDataHistory = [data for data in cx]
self.writeLog(u'历史TICK数据载入完成')
else:
self.writeLog(u'MongoDB未连接请检查')
#----------------------------------------------------------------------
def processLimitOrder(self):
"""处理限价单"""
for ref, order in self.dictOrder.items():
# 如果是买单且限价大于等于当前TICK的卖一价则假设成交
if order.direction == DIRECTION_BUY and \
order.price >= self.currentData['AskPrice1']:
self.executeLimitOrder(ref, order, self.currentData['AskPrice1'])
# 如果是卖单且限价低于当前TICK的买一价则假设全部成交
if order.direction == DIRECTION_SELL and \
order.price <= self.currentData['BidPrice1']:
self.executeLimitOrder(ref, order, self.currentData['BidPrice1'])
#----------------------------------------------------------------------
def executeLimitOrder(self, ref, order, price):
"""限价单成交处理"""
# 成交回报
self.tradeID = self.tradeID + 1
tradeData = {}
tradeData['InstrumentID'] = order.symbol
tradeData['OrderRef'] = ref
tradeData['TradeID'] = str(self.tradeID)
tradeData['Direction'] = order.direction
tradeData['OffsetFlag'] = order.offset
tradeData['Price'] = price
tradeData['Volume'] = order.volume
tradeEvent = Event()
tradeEvent.dict_['data'] = tradeData
self.strategyEngine.updateTrade(tradeEvent)
# 报单回报
orderData = {}
orderData['InstrumentID'] = order.symbol
orderData['OrderRef'] = ref
orderData['Direction'] = order.direction
orderData['CombOffsetFlag'] = order.offset
orderData['LimitPrice'] = price
orderData['VolumeTotalOriginal'] = order.volume
orderData['VolumeTraded'] = order.volume
orderData['InsertTime'] = ''
orderData['CancelTime'] = ''
orderData['FrontID'] = ''
orderData['SessionID'] = ''
orderData['OrderStatus'] = ''
orderEvent = Event()
orderEvent.dict_['data'] = orderData
self.strategyEngine.updateOrder(orderEvent)
# 记录该成交到列表中
self.listTrade.append(tradeData)
# 删除该限价单
del self.dictOrder[ref]
#----------------------------------------------------------------------
def startBacktesting(self):
"""开始回测"""
self.writeLog(u'开始回测')
for data in self.listDataHistory:
# 记录最新的TICK数据
self.currentData = data
# 处理限价单
self.processLimitOrder()
# 推送到策略引擎中
event = Event()
event.dict_['data'] = data
self.strategyEngine.updateMarketData(event)
self.saveTradeData()
self.writeLog(u'回测结束')
#----------------------------------------------------------------------
def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset):
"""回测发单"""
order = LimitOrder(instrumentid)
order.price = price
order.direction = direction
order.volume = volume
order.offset = offset
self.orderRef = self.orderRef + 1
self.dictOrder[str(self.orderRef)] = order
return str(self.orderRef)
#----------------------------------------------------------------------
def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid):
"""回测撤单"""
try:
del self.dictOrder[orderref]
except KeyError:
pass
#----------------------------------------------------------------------
def writeLog(self, log):
"""写日志"""
print log
#----------------------------------------------------------------------
def selectInstrument(self, symbol):
"""读取合约数据"""
d = {}
d['ExchangeID'] = 'BackTesting'
return d
#----------------------------------------------------------------------
def saveTradeData(self):
"""保存交易记录"""
f = shelve.open('result.vn')
f['listTrade'] = self.listTrade
f.close()
#----------------------------------------------------------------------
def subscribe(self, symbol, exchange):
"""仿真订阅合约"""
pass

View File

@ -0,0 +1,37 @@
# encoding: UTF-8
from strategyEngine import *
from backtestingEngine import *
from demoStrategy import SimpleEmaStrategy
# 回测脚本
if __name__ == '__main__':
symbol = 'IF1506'
# 创建回测引擎
be = BacktestingEngine()
# 创建策略引擎对象
se = StrategyEngine(be.eventEngine, be, backtesting=True)
be.setStrategyEngine(se)
# 初始化回测引擎
be.connectMongo()
be.loadDataHistory(symbol, datetime(2015,5,1), datetime.today())
# 创建策略对象
setting = {}
setting['fastAlpha'] = 0.2
setting['slowAlpha'] = 0.05
setting['startDate'] = datetime(year=2015, month=5, day=20)
se.createStrategy(u'EMA演示策略', symbol, SimpleEmaStrategy, setting)
# 启动所有策略
se.startAll()
# 开始回测
be.startBacktesting()

View File

@ -74,14 +74,21 @@ class SimpleEmaStrategy(StrategyTemplate):
except KeyError:
self.engine.writeLog(self.name + u'读取参数设定出错,请检查参数字典')
self.initStrategy()
try:
self.initStrategy(setting['startDate'])
except KeyError:
self.initStrategy()
#----------------------------------------------------------------------
def initStrategy(self):
def initStrategy(self, startDate=None):
"""初始化"""
td = timedelta(days=3) # 读取3天的历史TICK数据
today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
cx = self.engine.loadTick(self.symbol, today-td)
if startDate:
cx = self.engine.loadTick(self.symbol, startDate-td)
else:
today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
cx = self.engine.loadTick(self.symbol, today-td)
if cx:
for data in cx:

View File

@ -68,7 +68,7 @@ class Tick:
########################################################################
class Trade:
class Trade(object):
"""成交数据对象"""
#----------------------------------------------------------------------
@ -86,7 +86,7 @@ class Trade:
########################################################################
class Order:
class Order(object):
"""报单数据对象"""
#----------------------------------------------------------------------
@ -112,7 +112,7 @@ class Order:
########################################################################
class StopOrder:
class StopOrder(object):
"""
停止单对象
用于实现价格突破某一水平后自动追入
@ -135,10 +135,11 @@ class StrategyEngine(object):
"""策略引擎"""
#----------------------------------------------------------------------
def __init__(self, eventEngine, mainEngine):
def __init__(self, eventEngine, mainEngine, backtesting=False):
"""Constructor"""
self.__eventEngine = eventEngine
self.mainEngine = mainEngine
self.backtesting = backtesting # 是否在进行回测
# 获取代表今日的datetime
t = datetime.today()
@ -209,17 +210,22 @@ class StrategyEngine(object):
self.__mongoTickDB[symbol].insert(data)
#----------------------------------------------------------------------
def loadTick(self, symbol, dt):
def loadTick(self, symbol, startDate, endDate=None):
"""从MongoDB中读取Tick数据"""
if self.__mongoConnected:
collection = self.__mongoTickDB[symbol]
cx = collection.find({'date':{'$gte':dt}})
# 如果输入了读取TICK的最后日期
if endDate:
cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}})
else:
cx = collection.find({'date':{'$gte':startDate}})
return cx
else:
return None
#----------------------------------------------------------------------
def __updateMarketData(self, event):
def updateMarketData(self, event):
"""行情更新"""
data = event.dict_['data']
symbol = data['InstrumentID']
@ -275,7 +281,8 @@ class StrategyEngine(object):
strategy.onTick(tick)
# 将数据插入MongoDB数据库实盘建议另开程序记录TICK数据
self.__recordTick(data)
if not self.backtesting:
self.__recordTick(data)
#----------------------------------------------------------------------
def __processStopOrder(self, tick):
@ -325,7 +332,7 @@ class StrategyEngine(object):
del self.__dictStopOrder[symbol]
#----------------------------------------------------------------------
def __updateOrder(self, event):
def updateOrder(self, event):
"""报单更新"""
data = event.dict_['data']
orderRef = data['OrderRef']
@ -358,7 +365,7 @@ class StrategyEngine(object):
self.__dictOrder[orderRef] = data
#----------------------------------------------------------------------
def __updateTrade(self, event):
def updateTrade(self, event):
"""成交更新"""
data = event.dict_['data']
orderRef = data['OrderRef']
@ -425,9 +432,9 @@ class StrategyEngine(object):
#----------------------------------------------------------------------
def __registerEvent(self):
"""注册事件监听"""
self.__eventEngine.register(EVENT_MARKETDATA, self.__updateMarketData)
self.__eventEngine.register(EVENT_ORDER, self.__updateOrder)
self.__eventEngine.register(EVENT_TRADE ,self.__updateTrade)
self.__eventEngine.register(EVENT_MARKETDATA, self.updateMarketData)
self.__eventEngine.register(EVENT_ORDER, self.updateOrder)
self.__eventEngine.register(EVENT_TRADE ,self.updateTrade)
#----------------------------------------------------------------------
def writeLog(self, log):