vnpy/vn.trader/eventEngine.py

348 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding: UTF-8
# 系统模块
from Queue import Queue, Empty
from threading import Thread
from time import sleep
from collections import defaultdict
# 第三方模块
from PyQt4.QtCore import QTimer
# 自己开发的模块
from eventType import *
########################################################################
class EventEngine(object):
"""
事件驱动引擎
事件驱动引擎中所有的变量都设置为了私有,这是为了防止不小心
从外部修改了这些变量的值或状态导致bug。
变量说明
__queue私有变量事件队列
__active私有变量事件引擎开关
__thread私有变量事件处理线程
__timer私有变量计时器
__handlers私有变量事件处理函数字典
方法说明
__run: 私有方法,事件处理线程连续运行用
__process: 私有方法,处理事件,调用注册在引擎中的监听函数
__onTimer私有方法计时器固定事件间隔触发后向事件队列中存入计时器事件
start: 公共方法,启动引擎
stop公共方法停止引擎
register公共方法向引擎中注册监听函数
unregister公共方法向引擎中注销监听函数
put公共方法向事件队列中存入新的事件
事件监听函数必须定义为输入参数仅为一个event对象
函数
def func(event)
...
对象方法
def method(self, event)
...
"""
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件引擎"""
# 事件队列
self.__queue = Queue()
# 事件引擎开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target = self.__run)
# 计时器,用于触发计时器事件
self.__timer = QTimer()
self.__timer.timeout.connect(self.__onTimer)
# 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = defaultdict(list)
# __generalHandlers是一个列表用来保存通用回调函数所有事件均调用
self.__generalHandlers = []
#----------------------------------------------------------------------
def __run(self):
"""引擎运行"""
while self.__active == True:
try:
event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒
self.__process(event)
except Empty:
pass
#----------------------------------------------------------------------
def __process(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
# 若存在,则按顺序将事件传递给处理函数执行
[handler(event) for handler in self.__handlers[event.type_]]
# 以上语句为Python列表解析方式的写法对应的常规循环写法为
#for handler in self.__handlers[event.type_]:
#handler(event)
# 调用通用处理函数进行处理
if self.__generalHandlers:
[handler(event) for handler in self.__generalHandlers]
#----------------------------------------------------------------------
def __onTimer(self):
"""向事件队列中存入计时器事件"""
# 创建计时器事件
event = Event(type_=EVENT_TIMER)
# 向队列中存入计时器事件
self.put(event)
#----------------------------------------------------------------------
def start(self):
"""引擎启动"""
# 将引擎设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start()
# 启动计时器计时器事件间隔默认设定为1秒
self.__timer.start(1000)
#----------------------------------------------------------------------
def stop(self):
"""停止引擎"""
# 将引擎设为停止
self.__active = False
# 停止计时器
self.__timer.stop()
# 等待事件处理线程退出
self.__thread.join()
#----------------------------------------------------------------------
def register(self, type_, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
handlerList = self.__handlers[type_]
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
def unregister(self, type_, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type_]
#----------------------------------------------------------------------
def put(self, event):
"""向事件队列中存入事件"""
self.__queue.put(event)
#----------------------------------------------------------------------
def registerGeneralHandler(self, handler):
"""注册通用事件处理函数监听"""
if handler not in self.__generalHandlers:
self.__generalHandlers.append(handler)
#----------------------------------------------------------------------
def unregisterGeneralHandler(self, handler):
"""注销通用事件处理函数监听"""
if handler in self.__generalHandlers:
self.__generalHandlers.remove(handler)
########################################################################
class EventEngine2(object):
"""
计时器使用python线程的事件驱动引擎
"""
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件引擎"""
# 事件队列
self.__queue = Queue()
# 事件引擎开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target = self.__run)
# 计时器,用于触发计时器事件
self.__timer = Thread(target = self.__runTimer)
self.__timerActive = False # 计时器工作状态
self.__timerSleep = 1 # 计时器触发间隔默认1秒
# 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = defaultdict(list)
# __generalHandlers是一个列表用来保存通用回调函数所有事件均调用
self.__generalHandlers = []
#----------------------------------------------------------------------
def __run(self):
"""引擎运行"""
while self.__active == True:
try:
event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒
self.__process(event)
except Empty:
pass
#----------------------------------------------------------------------
def __process(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
# 若存在,则按顺序将事件传递给处理函数执行
[handler(event) for handler in self.__handlers[event.type_]]
# 以上语句为Python列表解析方式的写法对应的常规循环写法为
#for handler in self.__handlers[event.type_]:
#handler(event)
# 调用通用处理函数进行处理
if self.__generalHandlers:
[handler(event) for handler in self.__generalHandlers]
#----------------------------------------------------------------------
def __runTimer(self):
"""运行在计时器线程中的循环函数"""
while self.__timerActive:
# 创建计时器事件
event = Event(type_=EVENT_TIMER)
# 向队列中存入计时器事件
self.put(event)
# 等待
sleep(self.__timerSleep)
#----------------------------------------------------------------------
def start(self):
"""引擎启动"""
# 将引擎设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start()
# 启动计时器计时器事件间隔默认设定为1秒
self.__timerActive = True
self.__timer.start()
#----------------------------------------------------------------------
def stop(self):
"""停止引擎"""
# 将引擎设为停止
self.__active = False
# 停止计时器
self.__timerActive = False
self.__timer.join()
# 等待事件处理线程退出
self.__thread.join()
#----------------------------------------------------------------------
def register(self, type_, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
handlerList = self.__handlers[type_]
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
def unregister(self, type_, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type_]
#----------------------------------------------------------------------
def put(self, event):
"""向事件队列中存入事件"""
self.__queue.put(event)
#----------------------------------------------------------------------
def registerGeneralHandler(self, handler):
"""注册通用事件处理函数监听"""
if handler not in self.__generalHandlers:
self.__generalHandlers.append(handler)
#----------------------------------------------------------------------
def unregisterGeneralHandler(self, handler):
"""注销通用事件处理函数监听"""
if handler in self.__generalHandlers:
self.__generalHandlers.remove(handler)
########################################################################
class Event:
"""事件对象"""
#----------------------------------------------------------------------
def __init__(self, type_=None):
"""Constructor"""
self.type_ = type_ # 事件类型
self.dict_ = {} # 字典用于保存具体的事件数据
#----------------------------------------------------------------------
def test():
"""测试函数"""
import sys
from datetime import datetime
from PyQt4.QtCore import QCoreApplication
def simpletest(event):
print u'处理每秒触发的计时器事件:%s' % str(datetime.now())
app = QCoreApplication(sys.argv)
ee = EventEngine2()
#ee.register(EVENT_TIMER, simpletest)
ee.registerGeneralHandler(simpletest)
ee.start()
app.exec_()
# 直接运行脚本可以进行测试
if __name__ == '__main__':
test()