From ae633f5ca0b234e1ce3cbb043f0ad498b985898d Mon Sep 17 00:00:00 2001 From: WOLF Date: Thu, 28 May 2015 13:52:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=E5=9B=9E?= =?UTF-8?q?=E6=B5=8B=E5=BC=95=E6=93=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vn.strategy/README.md | 10 + vn.strategy/backtestingEngine.py | 224 ++++++++++++++++++ vn.strategy/strategydemo/README.md | 6 + vn.strategy/strategydemo/backtestingEngine.py | 224 ++++++++++++++++++ vn.strategy/strategydemo/demoBacktesting.py | 37 +++ vn.strategy/strategydemo/demoStrategy.py | 15 +- vn.strategy/strategydemo/strategyEngine.py | 33 ++- 7 files changed, 532 insertions(+), 17 deletions(-) create mode 100644 vn.strategy/backtestingEngine.py create mode 100644 vn.strategy/strategydemo/README.md create mode 100644 vn.strategy/strategydemo/backtestingEngine.py create mode 100644 vn.strategy/strategydemo/demoBacktesting.py diff --git a/vn.strategy/README.md b/vn.strategy/README.md index c7d7d2e6..d2e90d48 100644 --- a/vn.strategy/README.md +++ b/vn.strategy/README.md @@ -6,4 +6,14 @@ 完成了一个演示性的双指数均线策略,填入账号、密码等信息后直接运行demoStrategy.py就可以启动。 +##2015/5/28 +完成了一个简单的回测引擎,可以读取MongoDB中的历史TICK数据并通过回放的方式完全模拟真实情况下的策略交易情况。 + +回测结束后将所有的成交记录输出到一个基于shelve的二进制文件中,用户可以在IPython或者Spyder之类的交互式环境中读取该文件的数据进行绩效分析。 + +##计划 + +与其说是一个成熟的产品,该模块更多应该被视作一个Python策略自动交易的Demo,目前只是非常粗糙的实现了实盘策略交易和基于TICK的仿实盘回测功能,其他诸如交易结果分析之类的工作都需要用户自行完成。 + +未来将会加入回测结果输出等等的功能模块,回测完成后直接生成报表,便于用户查看;以及基于K线数据的回测(TICK数据还是挺难弄到的)。 diff --git a/vn.strategy/backtestingEngine.py b/vn.strategy/backtestingEngine.py new file mode 100644 index 00000000..f94f5068 --- /dev/null +++ b/vn.strategy/backtestingEngine.py @@ -0,0 +1,224 @@ +# encoding: UTF-8 + +import shelve + +from eventEngine import * +from pymongo import Connection +from pymongo.errors import * + +from strategyEngine import * + + +######################################################################## +class LimitOrder(object): + """限价单对象""" + + #---------------------------------------------------------------------- + def __init__(self, symbol): + """Constructor""" + self.symbol = symbol + self.price = 0 + self.volume = 0 + self.direction = None + self.offset = None + + +######################################################################## +class BacktestingEngine(object): + """ + 回测引擎,作用: + 1. 从数据库中读取数据并回放 + 2. 作为StrategyEngine创建时的参数传入 + """ + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.eventEngine = EventEngine() + + # 策略引擎 + self.strategyEngine = None + + # TICK历史数据列表,由于要使用For循环来实现仿真回放 + # 使用list的速度比Numpy和Pandas都要更快 + self.listDataHistory = [] + + # 限价单字典 + self.dictOrder = {} + + # 最新的TICK数据 + self.currentData = None + + # 回测的成交字典 + self.listTrade = [] + + # 报单编号 + self.orderRef = 0 + + # 成交编号 + self.tradeID = 0 + + #---------------------------------------------------------------------- + def setStrategyEngine(self, engine): + """设置策略引擎""" + self.strategyEngine = engine + self.writeLog(u'策略引擎设置完成') + + #---------------------------------------------------------------------- + def connectMongo(self): + """连接MongoDB数据库""" + try: + self.__mongoConnection = Connection() + self.__mongoConnected = True + self.__mongoTickDB = self.__mongoConnection['TickDB'] + self.writeLog(u'回测引擎连接MongoDB成功') + except ConnectionFailure: + self.writeLog(u'回测引擎连接MongoDB失败') + + #---------------------------------------------------------------------- + def loadDataHistory(self, symbol, startDate, endDate): + """载入历史TICK数据""" + if self.__mongoConnected: + collection = self.__mongoTickDB[symbol] + + # 如果输入了读取TICK的最后日期 + if endDate: + cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}}) + elif startDate: + cx = collection.find({'date':{'$gte':startDate}}) + else: + cx = collection.find() + + # 将TICK数据读入内存 + self.listDataHistory = [data for data in cx] + + self.writeLog(u'历史TICK数据载入完成') + else: + self.writeLog(u'MongoDB未连接,请检查') + + #---------------------------------------------------------------------- + def processLimitOrder(self): + """处理限价单""" + for ref, order in self.dictOrder.items(): + # 如果是买单,且限价大于等于当前TICK的卖一价,则假设成交 + if order.direction == DIRECTION_BUY and \ + order.price >= self.currentData['AskPrice1']: + self.executeLimitOrder(ref, order, self.currentData['AskPrice1']) + # 如果是卖单,且限价低于当前TICK的买一价,则假设全部成交 + if order.direction == DIRECTION_SELL and \ + order.price <= self.currentData['BidPrice1']: + self.executeLimitOrder(ref, order, self.currentData['BidPrice1']) + + #---------------------------------------------------------------------- + def executeLimitOrder(self, ref, order, price): + """限价单成交处理""" + # 成交回报 + self.tradeID = self.tradeID + 1 + + tradeData = {} + tradeData['InstrumentID'] = order.symbol + tradeData['OrderRef'] = ref + tradeData['TradeID'] = str(self.tradeID) + tradeData['Direction'] = order.direction + tradeData['OffsetFlag'] = order.offset + tradeData['Price'] = price + tradeData['Volume'] = order.volume + + tradeEvent = Event() + tradeEvent.dict_['data'] = tradeData + self.strategyEngine.updateTrade(tradeEvent) + + # 报单回报 + orderData = {} + orderData['InstrumentID'] = order.symbol + orderData['OrderRef'] = ref + orderData['Direction'] = order.direction + orderData['CombOffsetFlag'] = order.offset + orderData['LimitPrice'] = price + orderData['VolumeTotalOriginal'] = order.volume + orderData['VolumeTraded'] = order.volume + orderData['InsertTime'] = '' + orderData['CancelTime'] = '' + orderData['FrontID'] = '' + orderData['SessionID'] = '' + orderData['OrderStatus'] = '' + + orderEvent = Event() + orderEvent.dict_['data'] = orderData + self.strategyEngine.updateOrder(orderEvent) + + # 记录该成交到列表中 + self.listTrade.append(tradeData) + + # 删除该限价单 + del self.dictOrder[ref] + + #---------------------------------------------------------------------- + def startBacktesting(self): + """开始回测""" + self.writeLog(u'开始回测') + + for data in self.listDataHistory: + # 记录最新的TICK数据 + self.currentData = data + + # 处理限价单 + self.processLimitOrder() + + # 推送到策略引擎中 + event = Event() + event.dict_['data'] = data + self.strategyEngine.updateMarketData(event) + + self.saveTradeData() + + self.writeLog(u'回测结束') + + #---------------------------------------------------------------------- + def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset): + """回测发单""" + order = LimitOrder(instrumentid) + order.price = price + order.direction = direction + order.volume = volume + order.offset = offset + + self.orderRef = self.orderRef + 1 + self.dictOrder[str(self.orderRef)] = order + + return str(self.orderRef) + + #---------------------------------------------------------------------- + def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid): + """回测撤单""" + try: + del self.dictOrder[orderref] + except KeyError: + pass + + #---------------------------------------------------------------------- + def writeLog(self, log): + """写日志""" + print log + + #---------------------------------------------------------------------- + def selectInstrument(self, symbol): + """读取合约数据""" + d = {} + d['ExchangeID'] = 'BackTesting' + return d + + #---------------------------------------------------------------------- + def saveTradeData(self): + """保存交易记录""" + f = shelve.open('result.vn') + f['listTrade'] = self.listTrade + f.close() + + #---------------------------------------------------------------------- + def subscribe(self, symbol, exchange): + """仿真订阅合约""" + pass + + + \ No newline at end of file diff --git a/vn.strategy/strategydemo/README.md b/vn.strategy/strategydemo/README.md new file mode 100644 index 00000000..19b81e1b --- /dev/null +++ b/vn.strategy/strategydemo/README.md @@ -0,0 +1,6 @@ +# strategydemo说明 + +该文件夹下: + +- demoStrategy包含了一个简单的双EMA均线交易策略,填入账号密码等资料后可直接运行 +- demoBacktesting包含了一个基于以上策略的回测脚本,结果会输出到"result.vn"二进制文件中,可以使用Python的shelve模块打开 \ No newline at end of file diff --git a/vn.strategy/strategydemo/backtestingEngine.py b/vn.strategy/strategydemo/backtestingEngine.py new file mode 100644 index 00000000..f94f5068 --- /dev/null +++ b/vn.strategy/strategydemo/backtestingEngine.py @@ -0,0 +1,224 @@ +# encoding: UTF-8 + +import shelve + +from eventEngine import * +from pymongo import Connection +from pymongo.errors import * + +from strategyEngine import * + + +######################################################################## +class LimitOrder(object): + """限价单对象""" + + #---------------------------------------------------------------------- + def __init__(self, symbol): + """Constructor""" + self.symbol = symbol + self.price = 0 + self.volume = 0 + self.direction = None + self.offset = None + + +######################################################################## +class BacktestingEngine(object): + """ + 回测引擎,作用: + 1. 从数据库中读取数据并回放 + 2. 作为StrategyEngine创建时的参数传入 + """ + + #---------------------------------------------------------------------- + def __init__(self): + """Constructor""" + self.eventEngine = EventEngine() + + # 策略引擎 + self.strategyEngine = None + + # TICK历史数据列表,由于要使用For循环来实现仿真回放 + # 使用list的速度比Numpy和Pandas都要更快 + self.listDataHistory = [] + + # 限价单字典 + self.dictOrder = {} + + # 最新的TICK数据 + self.currentData = None + + # 回测的成交字典 + self.listTrade = [] + + # 报单编号 + self.orderRef = 0 + + # 成交编号 + self.tradeID = 0 + + #---------------------------------------------------------------------- + def setStrategyEngine(self, engine): + """设置策略引擎""" + self.strategyEngine = engine + self.writeLog(u'策略引擎设置完成') + + #---------------------------------------------------------------------- + def connectMongo(self): + """连接MongoDB数据库""" + try: + self.__mongoConnection = Connection() + self.__mongoConnected = True + self.__mongoTickDB = self.__mongoConnection['TickDB'] + self.writeLog(u'回测引擎连接MongoDB成功') + except ConnectionFailure: + self.writeLog(u'回测引擎连接MongoDB失败') + + #---------------------------------------------------------------------- + def loadDataHistory(self, symbol, startDate, endDate): + """载入历史TICK数据""" + if self.__mongoConnected: + collection = self.__mongoTickDB[symbol] + + # 如果输入了读取TICK的最后日期 + if endDate: + cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}}) + elif startDate: + cx = collection.find({'date':{'$gte':startDate}}) + else: + cx = collection.find() + + # 将TICK数据读入内存 + self.listDataHistory = [data for data in cx] + + self.writeLog(u'历史TICK数据载入完成') + else: + self.writeLog(u'MongoDB未连接,请检查') + + #---------------------------------------------------------------------- + def processLimitOrder(self): + """处理限价单""" + for ref, order in self.dictOrder.items(): + # 如果是买单,且限价大于等于当前TICK的卖一价,则假设成交 + if order.direction == DIRECTION_BUY and \ + order.price >= self.currentData['AskPrice1']: + self.executeLimitOrder(ref, order, self.currentData['AskPrice1']) + # 如果是卖单,且限价低于当前TICK的买一价,则假设全部成交 + if order.direction == DIRECTION_SELL and \ + order.price <= self.currentData['BidPrice1']: + self.executeLimitOrder(ref, order, self.currentData['BidPrice1']) + + #---------------------------------------------------------------------- + def executeLimitOrder(self, ref, order, price): + """限价单成交处理""" + # 成交回报 + self.tradeID = self.tradeID + 1 + + tradeData = {} + tradeData['InstrumentID'] = order.symbol + tradeData['OrderRef'] = ref + tradeData['TradeID'] = str(self.tradeID) + tradeData['Direction'] = order.direction + tradeData['OffsetFlag'] = order.offset + tradeData['Price'] = price + tradeData['Volume'] = order.volume + + tradeEvent = Event() + tradeEvent.dict_['data'] = tradeData + self.strategyEngine.updateTrade(tradeEvent) + + # 报单回报 + orderData = {} + orderData['InstrumentID'] = order.symbol + orderData['OrderRef'] = ref + orderData['Direction'] = order.direction + orderData['CombOffsetFlag'] = order.offset + orderData['LimitPrice'] = price + orderData['VolumeTotalOriginal'] = order.volume + orderData['VolumeTraded'] = order.volume + orderData['InsertTime'] = '' + orderData['CancelTime'] = '' + orderData['FrontID'] = '' + orderData['SessionID'] = '' + orderData['OrderStatus'] = '' + + orderEvent = Event() + orderEvent.dict_['data'] = orderData + self.strategyEngine.updateOrder(orderEvent) + + # 记录该成交到列表中 + self.listTrade.append(tradeData) + + # 删除该限价单 + del self.dictOrder[ref] + + #---------------------------------------------------------------------- + def startBacktesting(self): + """开始回测""" + self.writeLog(u'开始回测') + + for data in self.listDataHistory: + # 记录最新的TICK数据 + self.currentData = data + + # 处理限价单 + self.processLimitOrder() + + # 推送到策略引擎中 + event = Event() + event.dict_['data'] = data + self.strategyEngine.updateMarketData(event) + + self.saveTradeData() + + self.writeLog(u'回测结束') + + #---------------------------------------------------------------------- + def sendOrder(self, instrumentid, exchangeid, price, pricetype, volume, direction, offset): + """回测发单""" + order = LimitOrder(instrumentid) + order.price = price + order.direction = direction + order.volume = volume + order.offset = offset + + self.orderRef = self.orderRef + 1 + self.dictOrder[str(self.orderRef)] = order + + return str(self.orderRef) + + #---------------------------------------------------------------------- + def cancelOrder(self, instrumentid, exchangeid, orderref, frontid, sessionid): + """回测撤单""" + try: + del self.dictOrder[orderref] + except KeyError: + pass + + #---------------------------------------------------------------------- + def writeLog(self, log): + """写日志""" + print log + + #---------------------------------------------------------------------- + def selectInstrument(self, symbol): + """读取合约数据""" + d = {} + d['ExchangeID'] = 'BackTesting' + return d + + #---------------------------------------------------------------------- + def saveTradeData(self): + """保存交易记录""" + f = shelve.open('result.vn') + f['listTrade'] = self.listTrade + f.close() + + #---------------------------------------------------------------------- + def subscribe(self, symbol, exchange): + """仿真订阅合约""" + pass + + + \ No newline at end of file diff --git a/vn.strategy/strategydemo/demoBacktesting.py b/vn.strategy/strategydemo/demoBacktesting.py new file mode 100644 index 00000000..fc2af7cb --- /dev/null +++ b/vn.strategy/strategydemo/demoBacktesting.py @@ -0,0 +1,37 @@ +# encoding: UTF-8 + +from strategyEngine import * +from backtestingEngine import * +from demoStrategy import SimpleEmaStrategy + + + +# 回测脚本 +if __name__ == '__main__': + symbol = 'IF1506' + + # 创建回测引擎 + be = BacktestingEngine() + + # 创建策略引擎对象 + se = StrategyEngine(be.eventEngine, be, backtesting=True) + be.setStrategyEngine(se) + + # 初始化回测引擎 + be.connectMongo() + be.loadDataHistory(symbol, datetime(2015,5,1), datetime.today()) + + # 创建策略对象 + setting = {} + setting['fastAlpha'] = 0.2 + setting['slowAlpha'] = 0.05 + setting['startDate'] = datetime(year=2015, month=5, day=20) + se.createStrategy(u'EMA演示策略', symbol, SimpleEmaStrategy, setting) + + # 启动所有策略 + se.startAll() + + # 开始回测 + be.startBacktesting() + + diff --git a/vn.strategy/strategydemo/demoStrategy.py b/vn.strategy/strategydemo/demoStrategy.py index 247de73d..95561ac2 100644 --- a/vn.strategy/strategydemo/demoStrategy.py +++ b/vn.strategy/strategydemo/demoStrategy.py @@ -74,14 +74,21 @@ class SimpleEmaStrategy(StrategyTemplate): except KeyError: self.engine.writeLog(self.name + u'读取参数设定出错,请检查参数字典') - self.initStrategy() + try: + self.initStrategy(setting['startDate']) + except KeyError: + self.initStrategy() #---------------------------------------------------------------------- - def initStrategy(self): + def initStrategy(self, startDate=None): """初始化""" td = timedelta(days=3) # 读取3天的历史TICK数据 - today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) - cx = self.engine.loadTick(self.symbol, today-td) + + if startDate: + cx = self.engine.loadTick(self.symbol, startDate-td) + else: + today = datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) + cx = self.engine.loadTick(self.symbol, today-td) if cx: for data in cx: diff --git a/vn.strategy/strategydemo/strategyEngine.py b/vn.strategy/strategydemo/strategyEngine.py index 9ef0b346..d30a1b09 100644 --- a/vn.strategy/strategydemo/strategyEngine.py +++ b/vn.strategy/strategydemo/strategyEngine.py @@ -68,7 +68,7 @@ class Tick: ######################################################################## -class Trade: +class Trade(object): """成交数据对象""" #---------------------------------------------------------------------- @@ -86,7 +86,7 @@ class Trade: ######################################################################## -class Order: +class Order(object): """报单数据对象""" #---------------------------------------------------------------------- @@ -112,7 +112,7 @@ class Order: ######################################################################## -class StopOrder: +class StopOrder(object): """ 停止单对象 用于实现价格突破某一水平后自动追入 @@ -135,10 +135,11 @@ class StrategyEngine(object): """策略引擎""" #---------------------------------------------------------------------- - def __init__(self, eventEngine, mainEngine): + def __init__(self, eventEngine, mainEngine, backtesting=False): """Constructor""" self.__eventEngine = eventEngine self.mainEngine = mainEngine + self.backtesting = backtesting # 是否在进行回测 # 获取代表今日的datetime t = datetime.today() @@ -209,17 +210,22 @@ class StrategyEngine(object): self.__mongoTickDB[symbol].insert(data) #---------------------------------------------------------------------- - def loadTick(self, symbol, dt): + def loadTick(self, symbol, startDate, endDate=None): """从MongoDB中读取Tick数据""" if self.__mongoConnected: collection = self.__mongoTickDB[symbol] - cx = collection.find({'date':{'$gte':dt}}) + + # 如果输入了读取TICK的最后日期 + if endDate: + cx = collection.find({'date':{'$gte':startDate, '$lte':endDate}}) + else: + cx = collection.find({'date':{'$gte':startDate}}) return cx else: return None #---------------------------------------------------------------------- - def __updateMarketData(self, event): + def updateMarketData(self, event): """行情更新""" data = event.dict_['data'] symbol = data['InstrumentID'] @@ -275,7 +281,8 @@ class StrategyEngine(object): strategy.onTick(tick) # 将数据插入MongoDB数据库,实盘建议另开程序记录TICK数据 - self.__recordTick(data) + if not self.backtesting: + self.__recordTick(data) #---------------------------------------------------------------------- def __processStopOrder(self, tick): @@ -325,7 +332,7 @@ class StrategyEngine(object): del self.__dictStopOrder[symbol] #---------------------------------------------------------------------- - def __updateOrder(self, event): + def updateOrder(self, event): """报单更新""" data = event.dict_['data'] orderRef = data['OrderRef'] @@ -358,7 +365,7 @@ class StrategyEngine(object): self.__dictOrder[orderRef] = data #---------------------------------------------------------------------- - def __updateTrade(self, event): + def updateTrade(self, event): """成交更新""" data = event.dict_['data'] orderRef = data['OrderRef'] @@ -425,9 +432,9 @@ class StrategyEngine(object): #---------------------------------------------------------------------- def __registerEvent(self): """注册事件监听""" - self.__eventEngine.register(EVENT_MARKETDATA, self.__updateMarketData) - self.__eventEngine.register(EVENT_ORDER, self.__updateOrder) - self.__eventEngine.register(EVENT_TRADE ,self.__updateTrade) + self.__eventEngine.register(EVENT_MARKETDATA, self.updateMarketData) + self.__eventEngine.register(EVENT_ORDER, self.updateOrder) + self.__eventEngine.register(EVENT_TRADE ,self.updateTrade) #---------------------------------------------------------------------- def writeLog(self, log):