移除trader目录下的eventEngine.py和vnrpc.py

This commit is contained in:
chenxy123 2017-05-04 23:34:03 +08:00
parent 1fac0d7f92
commit aab32bb1ee
14 changed files with 19 additions and 719 deletions

0
vnpy/event/__init__.py Normal file
View File

0
vnpy/rpc/__init__.py Normal file
View File

View File

@ -26,7 +26,8 @@ from datetime import datetime, timedelta
from ctaBase import *
from strategy import STRATEGY_CLASS
from eventEngine import *
from vnpy.event.eventEngine import *
from vnpy.trader.eventType import *
from vtConstant import *
from vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData
from vtFunction import todayDate

View File

@ -6,7 +6,7 @@ CTA模块相关的GUI控制组件
from uiBasicWidget import QtGui, QtCore, BasicCell
from eventEngine import *
from vnpy.event.eventEngine import *
from language import text

View File

@ -14,7 +14,8 @@ from datetime import datetime, timedelta
from Queue import Queue
from threading import Thread
from eventEngine import *
from vnpy.event.eventEngine import *
from vnpy.trader.eventType import *
from vtGateway import VtSubscribeReq, VtLogData
from drBase import *
from vtFunction import todayDate

View File

@ -7,7 +7,7 @@
import json
from uiBasicWidget import QtGui, QtCore
from eventEngine import *
from vnpy.event.eventEngine import *
from language import text

View File

@ -1,356 +0,0 @@
# encoding: UTF-8
# 系统模块
from Queue import Queue, Empty
from threading import Thread
from time import sleep
from collections import defaultdict
# 第三方模块
from PyQt4.QtCore import QTimer
# 自己开发的模块
from eventType import *
########################################################################
class EventEngine(object):
"""
事件驱动引擎
事件驱动引擎中所有的变量都设置为了私有这是为了防止不小心
从外部修改了这些变量的值或状态导致bug
变量说明
__queue私有变量事件队列
__active私有变量事件引擎开关
__thread私有变量事件处理线程
__timer私有变量计时器
__handlers私有变量事件处理函数字典
方法说明
__run: 私有方法事件处理线程连续运行用
__process: 私有方法处理事件调用注册在引擎中的监听函数
__onTimer私有方法计时器固定事件间隔触发后向事件队列中存入计时器事件
start: 公共方法启动引擎
stop公共方法停止引擎
register公共方法向引擎中注册监听函数
unregister公共方法向引擎中注销监听函数
put公共方法向事件队列中存入新的事件
事件监听函数必须定义为输入参数仅为一个event对象
函数
def func(event)
...
对象方法
def method(self, event)
...
"""
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件引擎"""
# 事件队列
self.__queue = Queue()
# 事件引擎开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target = self.__run)
# 计时器,用于触发计时器事件
self.__timer = QTimer()
self.__timer.timeout.connect(self.__onTimer)
# 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = defaultdict(list)
# __generalHandlers是一个列表用来保存通用回调函数所有事件均调用
self.__generalHandlers = []
#----------------------------------------------------------------------
def __run(self):
"""引擎运行"""
while self.__active == True:
try:
event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒
self.__process(event)
except Empty:
pass
#----------------------------------------------------------------------
def __process(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
# 若存在,则按顺序将事件传递给处理函数执行
[handler(event) for handler in self.__handlers[event.type_]]
# 以上语句为Python列表解析方式的写法对应的常规循环写法为
#for handler in self.__handlers[event.type_]:
#handler(event)
# 调用通用处理函数进行处理
if self.__generalHandlers:
[handler(event) for handler in self.__generalHandlers]
#----------------------------------------------------------------------
def __onTimer(self):
"""向事件队列中存入计时器事件"""
# 创建计时器事件
event = Event(type_=EVENT_TIMER)
# 向队列中存入计时器事件
self.put(event)
#----------------------------------------------------------------------
def start(self, timer=True):
"""
引擎启动
timer是否要启动计时器
"""
# 将引擎设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start()
# 启动计时器计时器事件间隔默认设定为1秒
if timer:
self.__timer.start(1000)
#----------------------------------------------------------------------
def stop(self):
"""停止引擎"""
# 将引擎设为停止
self.__active = False
# 停止计时器
self.__timer.stop()
# 等待事件处理线程退出
self.__thread.join()
#----------------------------------------------------------------------
def register(self, type_, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
handlerList = self.__handlers[type_]
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
def unregister(self, type_, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type_]
#----------------------------------------------------------------------
def put(self, event):
"""向事件队列中存入事件"""
self.__queue.put(event)
#----------------------------------------------------------------------
def registerGeneralHandler(self, handler):
"""注册通用事件处理函数监听"""
if handler not in self.__generalHandlers:
self.__generalHandlers.append(handler)
#----------------------------------------------------------------------
def unregisterGeneralHandler(self, handler):
"""注销通用事件处理函数监听"""
if handler in self.__generalHandlers:
self.__generalHandlers.remove(handler)
########################################################################
class EventEngine2(object):
"""
计时器使用python线程的事件驱动引擎
"""
#----------------------------------------------------------------------
def __init__(self):
"""初始化事件引擎"""
# 事件队列
self.__queue = Queue()
# 事件引擎开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target = self.__run)
# 计时器,用于触发计时器事件
self.__timer = Thread(target = self.__runTimer)
self.__timerActive = False # 计时器工作状态
self.__timerSleep = 1 # 计时器触发间隔默认1秒
# 这里的__handlers是一个字典用来保存对应的事件调用关系
# 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能
self.__handlers = defaultdict(list)
# __generalHandlers是一个列表用来保存通用回调函数所有事件均调用
self.__generalHandlers = []
#----------------------------------------------------------------------
def __run(self):
"""引擎运行"""
while self.__active == True:
try:
event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒
self.__process(event)
except Empty:
pass
#----------------------------------------------------------------------
def __process(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
# 若存在,则按顺序将事件传递给处理函数执行
[handler(event) for handler in self.__handlers[event.type_]]
# 以上语句为Python列表解析方式的写法对应的常规循环写法为
#for handler in self.__handlers[event.type_]:
#handler(event)
# 调用通用处理函数进行处理
if self.__generalHandlers:
[handler(event) for handler in self.__generalHandlers]
#----------------------------------------------------------------------
def __runTimer(self):
"""运行在计时器线程中的循环函数"""
while self.__timerActive:
# 创建计时器事件
event = Event(type_=EVENT_TIMER)
# 向队列中存入计时器事件
self.put(event)
# 等待
sleep(self.__timerSleep)
#----------------------------------------------------------------------
def start(self, timer=True):
"""
引擎启动
timer是否要启动计时器
"""
# 将引擎设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start()
# 启动计时器计时器事件间隔默认设定为1秒
if timer:
self.__timerActive = True
self.__timer.start()
#----------------------------------------------------------------------
def stop(self):
"""停止引擎"""
# 将引擎设为停止
self.__active = False
# 停止计时器
self.__timerActive = False
self.__timer.join()
# 等待事件处理线程退出
self.__thread.join()
#----------------------------------------------------------------------
def register(self, type_, handler):
"""注册事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表若无defaultDict会自动创建新的list
handlerList = self.__handlers[type_]
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler)
#----------------------------------------------------------------------
def unregister(self, type_, handler):
"""注销事件处理函数监听"""
# 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求
handlerList = self.__handlers[type_]
# 如果该函数存在于列表中,则移除
if handler in handlerList:
handlerList.remove(handler)
# 如果函数列表为空,则从引擎中移除该事件类型
if not handlerList:
del self.__handlers[type_]
#----------------------------------------------------------------------
def put(self, event):
"""向事件队列中存入事件"""
self.__queue.put(event)
#----------------------------------------------------------------------
def registerGeneralHandler(self, handler):
"""注册通用事件处理函数监听"""
if handler not in self.__generalHandlers:
self.__generalHandlers.append(handler)
#----------------------------------------------------------------------
def unregisterGeneralHandler(self, handler):
"""注销通用事件处理函数监听"""
if handler in self.__generalHandlers:
self.__generalHandlers.remove(handler)
########################################################################
class Event:
"""事件对象"""
#----------------------------------------------------------------------
def __init__(self, type_=None):
"""Constructor"""
self.type_ = type_ # 事件类型
self.dict_ = {} # 字典用于保存具体的事件数据
#----------------------------------------------------------------------
def test():
"""测试函数"""
import sys
from datetime import datetime
from PyQt4.QtCore import QCoreApplication
def simpletest(event):
print u'处理每秒触发的计时器事件:%s' % str(datetime.now())
app = QCoreApplication(sys.argv)
ee = EventEngine2()
#ee.register(EVENT_TIMER, simpletest)
ee.registerGeneralHandler(simpletest)
ee.start()
app.exec_()
# 直接运行脚本可以进行测试
if __name__ == '__main__':
test()

View File

@ -1,16 +1,11 @@
# encoding: UTF-8
'''
本文件仅用于存放对于事件类型常量的定义
由于python中不存在真正的常量概念因此选择使用全大写的变量名来代替常量
这里设计的命名规则以EVENT_前缀开头
常量的内容通常选择一个能够代表真实意义的字符串便于理解
建议将所有的常量定义放在该文件中便于检查是否存在重复的现象
本文件基于vnpy.event.eventType并添加更多字段
'''
from vnpy.event.eventType import *
# 系统相关
EVENT_TIMER = 'eTimer' # 计时器事件每隔1秒发送一次
EVENT_LOG = 'eLog' # 日志事件,全局通用
@ -32,33 +27,4 @@ EVENT_CTA_STRATEGY = 'eCtaStrategy.' # CTA策略状态变化事件
EVENT_DATARECORDER_LOG = 'eDataRecorderLog' # 行情记录日志更新事件
# Wind接口相关
EVENT_WIND_CONNECTREQ = 'eWindConnectReq' # Wind接口请求连接事件
#----------------------------------------------------------------------
def test():
"""检查是否存在内容重复的常量定义"""
check_dict = {}
global_dict = globals()
for key, value in global_dict.items():
if '__' not in key: # 不检查python内置对象
if value in check_dict:
check_dict[value].append(key)
else:
check_dict[value] = [key]
for key, value in check_dict.items():
if len(value)>1:
print u'存在重复的常量定义:' + str(key)
for name in value:
print name
print ''
print u'测试完毕'
# 直接运行脚本可以进行测试
if __name__ == '__main__':
test()
EVENT_WIND_CONNECTREQ = 'eWindConnectReq' # Wind接口请求连接事件

View File

@ -11,7 +11,8 @@ import json
import os
import platform
from eventEngine import *
from vnpy.event.eventEngine import *
from vnpy.trader.eventType import *
from vtConstant import *
from vtGateway import VtLogData

View File

@ -6,7 +6,7 @@
from uiBasicWidget import QtGui, QtCore
from eventEngine import *
from vnpy.event.eventEngine import *
from language import text

View File

@ -7,7 +7,8 @@ from collections import OrderedDict
from PyQt4 import QtGui, QtCore
from eventEngine import *
from vnpy.event.eventEngine import *
from vnpy.trader.eventType import *
from vtFunction import *
from vtGateway import *
import vtText

View File

@ -1,316 +0,0 @@
# encoding: UTF-8
import threading
import traceback
import signal
import zmq
from msgpack import packb, unpackb
from json import dumps, loads
import cPickle
pDumps = cPickle.dumps
pLoads = cPickle.loads
# 实现Ctrl-c中断recv
signal.signal(signal.SIGINT, signal.SIG_DFL)
########################################################################
class RpcObject(object):
"""
RPC对象
提供对数据的序列化打包和解包接口目前提供了jsonmsgpackcPickle三种工具
msgpack性能更高但通常需要安装msgpack相关工具
json性能略低但通用性更好大部分编程语言都内置了相关的库
cPickle性能一般且仅能用于Python但是可以直接传送Python对象非常方便
因此建议尽量使用msgpack如果要和某些语言通讯没有提供msgpack时再使用json
当传送的数据包含很多自定义的Python对象时建议使用cPickle
如果希望使用其他的序列化工具也可以在这里添加
"""
#----------------------------------------------------------------------
def __init__(self):
"""Constructor"""
# 默认使用msgpack作为序列化工具
#self.useMsgpack()
self.usePickle()
#----------------------------------------------------------------------
def pack(self, data):
"""打包"""
pass
#----------------------------------------------------------------------
def unpack(self, data):
"""解包"""
pass
#----------------------------------------------------------------------
def __jsonPack(self, data):
"""使用json打包"""
return dumps(data)
#----------------------------------------------------------------------
def __jsonUnpack(self, data):
"""使用json解包"""
return loads(data)
#----------------------------------------------------------------------
def __msgpackPack(self, data):
"""使用msgpack打包"""
return packb(data)
#----------------------------------------------------------------------
def __msgpackUnpack(self, data):
"""使用msgpack解包"""
return unpackb(data)
#----------------------------------------------------------------------
def __picklePack(self, data):
"""使用cPickle打包"""
return pDumps(data)
#----------------------------------------------------------------------
def __pickleUnpack(self, data):
"""使用cPickle解包"""
return pLoads(data)
#----------------------------------------------------------------------
def useJson(self):
"""使用json作为序列化工具"""
self.pack = self.__jsonPack
self.unpack = self.__jsonUnpack
#----------------------------------------------------------------------
def useMsgpack(self):
"""使用msgpack作为序列化工具"""
self.pack = self.__msgpackPack
self.unpack = self.__msgpackUnpack
#----------------------------------------------------------------------
def usePickle(self):
"""使用cPickle作为序列化工具"""
self.pack = self.__picklePack
self.unpack = self.__pickleUnpack
########################################################################
class RpcServer(RpcObject):
"""RPC服务器"""
#----------------------------------------------------------------------
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(RpcServer, self).__init__()
# 保存功能函数的字典key是函数名value是函数对象
self.__functions = {}
# zmq端口相关
self.__context = zmq.Context()
self.__socketREP = self.__context.socket(zmq.REP) # 请求回应socket
self.__socketREP.bind(repAddress)
self.__socketPUB = self.__context.socket(zmq.PUB) # 数据广播socket
self.__socketPUB.bind(pubAddress)
# 工作线程相关
self.__active = False # 服务器的工作状态
self.__thread = threading.Thread(target=self.run) # 服务器的工作线程
#----------------------------------------------------------------------
def start(self):
"""启动服务器"""
# 将服务器设为启动
self.__active = True
# 启动工作线程
if not self.__thread.isAlive():
self.__thread.start()
#----------------------------------------------------------------------
def stop(self):
"""停止服务器"""
# 将服务器设为停止
self.__active = False
# 等待工作线程退出
if self.__thread.isAlive():
self.__thread.join()
#----------------------------------------------------------------------
def run(self):
"""服务器运行函数"""
while self.__active:
# 使用poll来等待事件到达等待1秒1000毫秒
if not self.__socketREP.poll(1000):
continue
# 从请求响应socket收取请求数据
reqb = self.__socketREP.recv()
# 序列化解包
req = self.unpack(reqb)
# 获取函数名和参数
name, args, kwargs = req
# 获取引擎中对应的函数对象,并执行调用,如果有异常则捕捉后返回
try:
func = self.__functions[name]
r = func(*args, **kwargs)
rep = [True, r]
except Exception as e:
rep = [False, traceback.format_exc()]
# 序列化打包
repb = self.pack(rep)
# 通过请求响应socket返回调用结果
self.__socketREP.send(repb)
#----------------------------------------------------------------------
def publish(self, topic, data):
"""
广播推送数据
topic主题内容
data具体的数据
"""
# 序列化数据
datab = self.pack(data)
# 通过广播socket发送数据
self.__socketPUB.send_multipart([topic, datab])
#----------------------------------------------------------------------
def register(self, func):
"""注册函数"""
self.__functions[func.__name__] = func
########################################################################
class RpcClient(RpcObject):
"""RPC客户端"""
#----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress):
"""Constructor"""
super(RpcClient, self).__init__()
# zmq端口相关
self.__reqAddress = reqAddress
self.__subAddress = subAddress
self.__context = zmq.Context()
self.__socketREQ = self.__context.socket(zmq.REQ) # 请求发出socket
self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket
# 工作线程相关,用于处理服务器推送的数据
self.__active = False # 客户端的工作状态
self.__thread = threading.Thread(target=self.run) # 客户端的工作线程
#----------------------------------------------------------------------
def __getattr__(self, name):
"""实现远程调用功能"""
# 执行远程调用任务
def dorpc(*args, **kwargs):
# 生成请求
req = [name, args, kwargs]
# 序列化打包请求
reqb = self.pack(req)
# 发送请求并等待回应
self.__socketREQ.send(reqb)
repb = self.__socketREQ.recv()
# 序列化解包回应
rep = self.unpack(repb)
# 若正常则返回结果,调用失败则触发异常
if rep[0]:
return rep[1]
else:
raise RemoteException(rep[1])
return dorpc
#----------------------------------------------------------------------
def start(self):
"""启动客户端"""
# 连接端口
self.__socketREQ.connect(self.__reqAddress)
self.__socketSUB.connect(self.__subAddress)
# 将服务器设为启动
self.__active = True
# 启动工作线程
if not self.__thread.isAlive():
self.__thread.start()
#----------------------------------------------------------------------
def stop(self):
"""停止客户端"""
# 将客户端设为停止
self.__active = False
# 等待工作线程退出
if self.__thread.isAlive():
self.__thread.join()
#----------------------------------------------------------------------
def run(self):
"""客户端运行函数"""
while self.__active:
# 使用poll来等待事件到达等待1秒1000毫秒
if not self.__socketSUB.poll(1000):
continue
# 从订阅socket收取广播数据
topic, datab = self.__socketSUB.recv_multipart()
# 序列化解包
data = self.unpack(datab)
# 调用回调函数处理
self.callback(topic, data)
#----------------------------------------------------------------------
def callback(self, topic, data):
"""回调函数,必须由用户实现"""
raise NotImplementedError
#----------------------------------------------------------------------
def subscribeTopic(self, topic):
"""
订阅特定主题的广播数据
可以使用topic=''来订阅所有的主题
"""
self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic)
########################################################################
class RemoteException(Exception):
"""RPC远程异常"""
#----------------------------------------------------------------------
def __init__(self, value):
"""Constructor"""
self.__value = value
#----------------------------------------------------------------------
def __str__(self):
"""输出错误信息"""
return self.__value

View File

@ -7,7 +7,8 @@ from datetime import datetime
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
from eventEngine import *
from vnpy.event.eventEngine import *
from vnpy.trader.eventType import *
from vtGateway import *
from vtFunction import loadMongoSetting
from language import text

View File

@ -2,7 +2,8 @@
import time
from eventEngine import *
from vnpy.event.eventEngine import *
from vnpy.trader.eventType import *
from vtConstant import *