1. 修改eventEngine.py的部分代码,更加清晰

2. 在CTA引擎中增加策略对象函数执行时的异常捕捉功能,避免因为个别策略代码问题导致程序停止
This commit is contained in:
chenxy123 2016-11-01 22:41:08 +08:00
parent f1c5122e93
commit f17b710d36
3 changed files with 65 additions and 65 deletions

View File

@ -17,7 +17,6 @@ from eventType import *
class EventEngine(object): class EventEngine(object):
""" """
事件驱动引擎 事件驱动引擎
事件驱动引擎中所有的变量都设置为了私有这是为了防止不小心 事件驱动引擎中所有的变量都设置为了私有这是为了防止不小心
从外部修改了这些变量的值或状态导致bug 从外部修改了这些变量的值或状态导致bug
@ -129,9 +128,12 @@ class EventEngine(object):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def register(self, type_, handler): def register(self, type_, handler):
"""注册事件处理函数监听""" """注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
handlerList = self.__handlers[type_]
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件 # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in self.__handlers[type_]: if handler not in handlerList:
self.__handlers[type_].append(handler) handlerList.append(handler)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def unregister(self, type_, handler): def unregister(self, type_, handler):
@ -146,6 +148,7 @@ class EventEngine(object):
# 如果函数列表为空,则从引擎中移除该事件类型 # 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList: if not handlerList:
del self.__handlers[type_] del self.__handlers[type_]
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def put(self, event): def put(self, event):
"""向事件队列中存入事件""" """向事件队列中存入事件"""
@ -177,7 +180,7 @@ class EventEngine2(object):
# 这里的__handlers是一个字典用来保存对应的事件调用关系 # 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能 # 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = {} self.__handlers = defaultdict(list)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __run(self): def __run(self):
@ -243,12 +246,8 @@ class EventEngine2(object):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def register(self, type_, handler): def register(self, type_, handler):
"""注册事件处理函数监听""" """注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建 # 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
try:
handlerList = self.__handlers[type_] handlerList = self.__handlers[type_]
except KeyError:
handlerList = []
self.__handlers[type_] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件 # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList: if handler not in handlerList:
@ -258,7 +257,6 @@ class EventEngine2(object):
def unregister(self, type_, handler): def unregister(self, type_, handler):
"""注销事件处理函数监听""" """注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求 # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type_] handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除 # 如果该函数存在于列表中,则移除
@ -268,8 +266,6 @@ class EventEngine2(object):
# 如果函数列表为空,则从引擎中移除该事件类型 # 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList: if not handlerList:
del self.__handlers[type_] del self.__handlers[type_]
except KeyError:
pass
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def put(self, event): def put(self, event):

View File

@ -18,6 +18,7 @@
import json import json
import os import os
import traceback
from collections import OrderedDict from collections import OrderedDict
from datetime import datetime, timedelta from datetime import datetime, timedelta
@ -256,7 +257,7 @@ class CtaEngine(object):
# 逐个推送到策略实例中 # 逐个推送到策略实例中
l = self.tickStrategyDict[tick.vtSymbol] l = self.tickStrategyDict[tick.vtSymbol]
for strategy in l: for strategy in l:
strategy.onTick(ctaTick) self.callStrategyFunc(strategy, strategy.onTick, ctaTick)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def processOrderEvent(self, event): def processOrderEvent(self, event):
@ -265,7 +266,7 @@ class CtaEngine(object):
if order.vtOrderID in self.orderStrategyDict: if order.vtOrderID in self.orderStrategyDict:
strategy = self.orderStrategyDict[order.vtOrderID] strategy = self.orderStrategyDict[order.vtOrderID]
strategy.onOrder(order) self.callStrategyFunc(strategy, strategy.onOrder, order)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def processTradeEvent(self, event): def processTradeEvent(self, event):
@ -281,7 +282,7 @@ class CtaEngine(object):
else: else:
strategy.pos -= trade.volume strategy.pos -= trade.volume
strategy.onTrade(trade) self.callStrategyFunc(strategy, strategy.onTrade, trade)
# 更新持仓缓存数据 # 更新持仓缓存数据
if trade.vtSymbol in self.tickStrategyDict: if trade.vtSymbol in self.tickStrategyDict:
@ -417,7 +418,7 @@ class CtaEngine(object):
if not strategy.inited: if not strategy.inited:
strategy.inited = True strategy.inited = True
strategy.onInit() self.callStrategyFunc(strategy, strategy.onInit)
else: else:
self.writeCtaLog(u'请勿重复初始化策略实例:%s' %name) self.writeCtaLog(u'请勿重复初始化策略实例:%s' %name)
else: else:
@ -431,7 +432,7 @@ class CtaEngine(object):
if strategy.inited and not strategy.trading: if strategy.inited and not strategy.trading:
strategy.trading = True strategy.trading = True
strategy.onStart() self.callStrategyFunc(strategy, strategy.onStart)
else: else:
self.writeCtaLog(u'策略实例不存在:%s' %name) self.writeCtaLog(u'策略实例不存在:%s' %name)
@ -443,7 +444,7 @@ class CtaEngine(object):
if strategy.trading: if strategy.trading:
strategy.trading = False strategy.trading = False
strategy.onStop() self.callStrategyFunc(strategy, strategy.onStop)
# 对该策略发出的所有限价单进行撤单 # 对该策略发出的所有限价单进行撤单
for vtOrderID, s in self.orderStrategyDict.items(): for vtOrderID, s in self.orderStrategyDict.items():
@ -517,6 +518,23 @@ class CtaEngine(object):
event = Event(EVENT_CTA_STRATEGY+name) event = Event(EVENT_CTA_STRATEGY+name)
self.eventEngine.put(event) self.eventEngine.put(event)
#----------------------------------------------------------------------
def callStrategyFunc(self, strategy, func, params=None):
"""调用策略的函数,若触发异常则捕捉"""
try:
if params:
func(params)
else:
func()
except Exception:
# 停止策略,修改状态为未初始化
strategy.trading = False
strategy.inited = False
# 发出日志
content = '\n'.join([u'策略%s触发异常已停止' %strategy.name,
traceback.format_exc()])
self.writeCtaLog(content)
######################################################################## ########################################################################

View File

@ -4,6 +4,7 @@
from Queue import Queue, Empty from Queue import Queue, Empty
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from collections import defaultdict
# 第三方模块 # 第三方模块
from PyQt4.QtCore import QTimer from PyQt4.QtCore import QTimer
@ -16,7 +17,6 @@ from eventType import *
class EventEngine(object): class EventEngine(object):
""" """
事件驱动引擎 事件驱动引擎
事件驱动引擎中所有的变量都设置为了私有这是为了防止不小心 事件驱动引擎中所有的变量都设置为了私有这是为了防止不小心
从外部修改了这些变量的值或状态导致bug 从外部修改了这些变量的值或状态导致bug
@ -68,7 +68,7 @@ class EventEngine(object):
# 这里的__handlers是一个字典用来保存对应的事件调用关系 # 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能 # 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = {} self.__handlers = defaultdict(list)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __run(self): def __run(self):
@ -128,12 +128,8 @@ class EventEngine(object):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def register(self, type_, handler): def register(self, type_, handler):
"""注册事件处理函数监听""" """注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建 # 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
try:
handlerList = self.__handlers[type_] handlerList = self.__handlers[type_]
except KeyError:
handlerList = []
self.__handlers[type_] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件 # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList: if handler not in handlerList:
@ -143,7 +139,6 @@ class EventEngine(object):
def unregister(self, type_, handler): def unregister(self, type_, handler):
"""注销事件处理函数监听""" """注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求 # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type_] handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除 # 如果该函数存在于列表中,则移除
@ -153,8 +148,6 @@ class EventEngine(object):
# 如果函数列表为空,则从引擎中移除该事件类型 # 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList: if not handlerList:
del self.__handlers[type_] del self.__handlers[type_]
except KeyError:
pass
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def put(self, event): def put(self, event):
@ -187,7 +180,7 @@ class EventEngine2(object):
# 这里的__handlers是一个字典用来保存对应的事件调用关系 # 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能 # 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = {} self.__handlers = defaultdict(list)
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def __run(self): def __run(self):
@ -253,12 +246,8 @@ class EventEngine2(object):
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def register(self, type_, handler): def register(self, type_, handler):
"""注册事件处理函数监听""" """注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建 # 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
try:
handlerList = self.__handlers[type_] handlerList = self.__handlers[type_]
except KeyError:
handlerList = []
self.__handlers[type_] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件 # 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList: if handler not in handlerList:
@ -268,7 +257,6 @@ class EventEngine2(object):
def unregister(self, type_, handler): def unregister(self, type_, handler):
"""注销事件处理函数监听""" """注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求 # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
try:
handlerList = self.__handlers[type_] handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除 # 如果该函数存在于列表中,则移除
@ -278,8 +266,6 @@ class EventEngine2(object):
# 如果函数列表为空,则从引擎中移除该事件类型 # 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList: if not handlerList:
del self.__handlers[type_] del self.__handlers[type_]
except KeyError:
pass
#---------------------------------------------------------------------- #----------------------------------------------------------------------
def put(self, event): def put(self, event):