From aab32bb1eef893efd15c0f23246821a2fd5821d0 Mon Sep 17 00:00:00 2001 From: chenxy123 Date: Thu, 4 May 2017 23:34:03 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4trader=E7=9B=AE=E5=BD=95?= =?UTF-8?q?=E4=B8=8B=E7=9A=84eventEngine.py=E5=92=8Cvnrpc.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- vnpy/event/__init__.py | 0 vnpy/rpc/__init__.py | 0 vnpy/trader/ctaStrategy/ctaEngine.py | 3 +- vnpy/trader/ctaStrategy/uiCtaWidget.py | 2 +- vnpy/trader/dataRecorder/drEngine.py | 3 +- vnpy/trader/dataRecorder/uiDrWidget.py | 2 +- vnpy/trader/eventEngine.py | 356 ------------------------- vnpy/trader/eventType.py | 42 +-- vnpy/trader/riskManager/rmEngine.py | 3 +- vnpy/trader/riskManager/uiRmWidget.py | 2 +- vnpy/trader/uiBasicWidget.py | 3 +- vnpy/trader/vnrpc.py | 316 ---------------------- vnpy/trader/vtEngine.py | 3 +- vnpy/trader/vtGateway.py | 3 +- 14 files changed, 19 insertions(+), 719 deletions(-) create mode 100644 vnpy/event/__init__.py create mode 100644 vnpy/rpc/__init__.py delete mode 100644 vnpy/trader/eventEngine.py delete mode 100644 vnpy/trader/vnrpc.py diff --git a/vnpy/event/__init__.py b/vnpy/event/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/rpc/__init__.py b/vnpy/rpc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/trader/ctaStrategy/ctaEngine.py b/vnpy/trader/ctaStrategy/ctaEngine.py index 65f136aa..b6117de6 100644 --- a/vnpy/trader/ctaStrategy/ctaEngine.py +++ b/vnpy/trader/ctaStrategy/ctaEngine.py @@ -26,7 +26,8 @@ from datetime import datetime, timedelta from ctaBase import * from strategy import STRATEGY_CLASS -from eventEngine import * +from vnpy.event.eventEngine import * +from vnpy.trader.eventType import * from vtConstant import * from vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData from vtFunction import todayDate diff --git a/vnpy/trader/ctaStrategy/uiCtaWidget.py b/vnpy/trader/ctaStrategy/uiCtaWidget.py index 43b4f000..bc2c789e 100644 --- a/vnpy/trader/ctaStrategy/uiCtaWidget.py +++ b/vnpy/trader/ctaStrategy/uiCtaWidget.py @@ -6,7 +6,7 @@ CTA模块相关的GUI控制组件 from uiBasicWidget import QtGui, QtCore, BasicCell -from eventEngine import * +from vnpy.event.eventEngine import * from language import text diff --git a/vnpy/trader/dataRecorder/drEngine.py b/vnpy/trader/dataRecorder/drEngine.py index 3232bb82..84010c77 100644 --- a/vnpy/trader/dataRecorder/drEngine.py +++ b/vnpy/trader/dataRecorder/drEngine.py @@ -14,7 +14,8 @@ from datetime import datetime, timedelta from Queue import Queue from threading import Thread -from eventEngine import * +from vnpy.event.eventEngine import * +from vnpy.trader.eventType import * from vtGateway import VtSubscribeReq, VtLogData from drBase import * from vtFunction import todayDate diff --git a/vnpy/trader/dataRecorder/uiDrWidget.py b/vnpy/trader/dataRecorder/uiDrWidget.py index a852d913..b004ec5d 100644 --- a/vnpy/trader/dataRecorder/uiDrWidget.py +++ b/vnpy/trader/dataRecorder/uiDrWidget.py @@ -7,7 +7,7 @@ import json from uiBasicWidget import QtGui, QtCore -from eventEngine import * +from vnpy.event.eventEngine import * from language import text diff --git a/vnpy/trader/eventEngine.py b/vnpy/trader/eventEngine.py deleted file mode 100644 index d892177f..00000000 --- a/vnpy/trader/eventEngine.py +++ /dev/null @@ -1,356 +0,0 @@ -# encoding: UTF-8 - -# 系统模块 -from Queue import Queue, Empty -from threading import Thread -from time import sleep -from collections import defaultdict - -# 第三方模块 -from PyQt4.QtCore import QTimer - -# 自己开发的模块 -from eventType import * - - -######################################################################## -class EventEngine(object): - """ - 事件驱动引擎 - 事件驱动引擎中所有的变量都设置为了私有,这是为了防止不小心 - 从外部修改了这些变量的值或状态,导致bug。 - - 变量说明 - __queue:私有变量,事件队列 - __active:私有变量,事件引擎开关 - __thread:私有变量,事件处理线程 - __timer:私有变量,计时器 - __handlers:私有变量,事件处理函数字典 - - - 方法说明 - __run: 私有方法,事件处理线程连续运行用 - __process: 私有方法,处理事件,调用注册在引擎中的监听函数 - __onTimer:私有方法,计时器固定事件间隔触发后,向事件队列中存入计时器事件 - start: 公共方法,启动引擎 - stop:公共方法,停止引擎 - register:公共方法,向引擎中注册监听函数 - unregister:公共方法,向引擎中注销监听函数 - put:公共方法,向事件队列中存入新的事件 - - 事件监听函数必须定义为输入参数仅为一个event对象,即: - - 函数 - def func(event) - ... - - 对象方法 - def method(self, event) - ... - - """ - - #---------------------------------------------------------------------- - def __init__(self): - """初始化事件引擎""" - # 事件队列 - self.__queue = Queue() - - # 事件引擎开关 - self.__active = False - - # 事件处理线程 - self.__thread = Thread(target = self.__run) - - # 计时器,用于触发计时器事件 - self.__timer = QTimer() - self.__timer.timeout.connect(self.__onTimer) - - # 这里的__handlers是一个字典,用来保存对应的事件调用关系 - # 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能 - self.__handlers = defaultdict(list) - - # __generalHandlers是一个列表,用来保存通用回调函数(所有事件均调用) - self.__generalHandlers = [] - - #---------------------------------------------------------------------- - def __run(self): - """引擎运行""" - while self.__active == True: - try: - event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒 - self.__process(event) - except Empty: - pass - - #---------------------------------------------------------------------- - def __process(self, event): - """处理事件""" - # 检查是否存在对该事件进行监听的处理函数 - if event.type_ in self.__handlers: - # 若存在,则按顺序将事件传递给处理函数执行 - [handler(event) for handler in self.__handlers[event.type_]] - - # 以上语句为Python列表解析方式的写法,对应的常规循环写法为: - #for handler in self.__handlers[event.type_]: - #handler(event) - - # 调用通用处理函数进行处理 - if self.__generalHandlers: - [handler(event) for handler in self.__generalHandlers] - - #---------------------------------------------------------------------- - def __onTimer(self): - """向事件队列中存入计时器事件""" - # 创建计时器事件 - event = Event(type_=EVENT_TIMER) - - # 向队列中存入计时器事件 - self.put(event) - - #---------------------------------------------------------------------- - def start(self, timer=True): - """ - 引擎启动 - timer:是否要启动计时器 - """ - # 将引擎设为启动 - self.__active = True - - # 启动事件处理线程 - self.__thread.start() - - # 启动计时器,计时器事件间隔默认设定为1秒 - if timer: - self.__timer.start(1000) - - #---------------------------------------------------------------------- - def stop(self): - """停止引擎""" - # 将引擎设为停止 - self.__active = False - - # 停止计时器 - self.__timer.stop() - - # 等待事件处理线程退出 - self.__thread.join() - - #---------------------------------------------------------------------- - def register(self, type_, handler): - """注册事件处理函数监听""" - # 尝试获取该事件类型对应的处理函数列表,若无defaultDict会自动创建新的list - handlerList = self.__handlers[type_] - - # 若要注册的处理器不在该事件的处理器列表中,则注册该事件 - if handler not in handlerList: - handlerList.append(handler) - - #---------------------------------------------------------------------- - def unregister(self, type_, handler): - """注销事件处理函数监听""" - # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求 - handlerList = self.__handlers[type_] - - # 如果该函数存在于列表中,则移除 - if handler in handlerList: - handlerList.remove(handler) - - # 如果函数列表为空,则从引擎中移除该事件类型 - if not handlerList: - del self.__handlers[type_] - - #---------------------------------------------------------------------- - def put(self, event): - """向事件队列中存入事件""" - self.__queue.put(event) - - #---------------------------------------------------------------------- - def registerGeneralHandler(self, handler): - """注册通用事件处理函数监听""" - if handler not in self.__generalHandlers: - self.__generalHandlers.append(handler) - - #---------------------------------------------------------------------- - def unregisterGeneralHandler(self, handler): - """注销通用事件处理函数监听""" - if handler in self.__generalHandlers: - self.__generalHandlers.remove(handler) - - - -######################################################################## -class EventEngine2(object): - """ - 计时器使用python线程的事件驱动引擎 - """ - - #---------------------------------------------------------------------- - def __init__(self): - """初始化事件引擎""" - # 事件队列 - self.__queue = Queue() - - # 事件引擎开关 - self.__active = False - - # 事件处理线程 - self.__thread = Thread(target = self.__run) - - # 计时器,用于触发计时器事件 - self.__timer = Thread(target = self.__runTimer) - self.__timerActive = False # 计时器工作状态 - self.__timerSleep = 1 # 计时器触发间隔(默认1秒) - - # 这里的__handlers是一个字典,用来保存对应的事件调用关系 - # 其中每个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能 - self.__handlers = defaultdict(list) - - # __generalHandlers是一个列表,用来保存通用回调函数(所有事件均调用) - self.__generalHandlers = [] - - #---------------------------------------------------------------------- - def __run(self): - """引擎运行""" - while self.__active == True: - try: - event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒 - self.__process(event) - except Empty: - pass - - #---------------------------------------------------------------------- - def __process(self, event): - """处理事件""" - # 检查是否存在对该事件进行监听的处理函数 - if event.type_ in self.__handlers: - # 若存在,则按顺序将事件传递给处理函数执行 - [handler(event) for handler in self.__handlers[event.type_]] - - # 以上语句为Python列表解析方式的写法,对应的常规循环写法为: - #for handler in self.__handlers[event.type_]: - #handler(event) - - # 调用通用处理函数进行处理 - if self.__generalHandlers: - [handler(event) for handler in self.__generalHandlers] - - #---------------------------------------------------------------------- - def __runTimer(self): - """运行在计时器线程中的循环函数""" - while self.__timerActive: - # 创建计时器事件 - event = Event(type_=EVENT_TIMER) - - # 向队列中存入计时器事件 - self.put(event) - - # 等待 - sleep(self.__timerSleep) - - #---------------------------------------------------------------------- - def start(self, timer=True): - """ - 引擎启动 - timer:是否要启动计时器 - """ - # 将引擎设为启动 - self.__active = True - - # 启动事件处理线程 - self.__thread.start() - - # 启动计时器,计时器事件间隔默认设定为1秒 - if timer: - self.__timerActive = True - self.__timer.start() - - #---------------------------------------------------------------------- - def stop(self): - """停止引擎""" - # 将引擎设为停止 - self.__active = False - - # 停止计时器 - self.__timerActive = False - self.__timer.join() - - # 等待事件处理线程退出 - self.__thread.join() - - #---------------------------------------------------------------------- - def register(self, type_, handler): - """注册事件处理函数监听""" - # 尝试获取该事件类型对应的处理函数列表,若无defaultDict会自动创建新的list - handlerList = self.__handlers[type_] - - # 若要注册的处理器不在该事件的处理器列表中,则注册该事件 - if handler not in handlerList: - handlerList.append(handler) - - #---------------------------------------------------------------------- - def unregister(self, type_, handler): - """注销事件处理函数监听""" - # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求 - handlerList = self.__handlers[type_] - - # 如果该函数存在于列表中,则移除 - if handler in handlerList: - handlerList.remove(handler) - - # 如果函数列表为空,则从引擎中移除该事件类型 - if not handlerList: - del self.__handlers[type_] - - #---------------------------------------------------------------------- - def put(self, event): - """向事件队列中存入事件""" - self.__queue.put(event) - - #---------------------------------------------------------------------- - def registerGeneralHandler(self, handler): - """注册通用事件处理函数监听""" - if handler not in self.__generalHandlers: - self.__generalHandlers.append(handler) - - #---------------------------------------------------------------------- - def unregisterGeneralHandler(self, handler): - """注销通用事件处理函数监听""" - if handler in self.__generalHandlers: - self.__generalHandlers.remove(handler) - - -######################################################################## -class Event: - """事件对象""" - - #---------------------------------------------------------------------- - def __init__(self, type_=None): - """Constructor""" - self.type_ = type_ # 事件类型 - self.dict_ = {} # 字典用于保存具体的事件数据 - - -#---------------------------------------------------------------------- -def test(): - """测试函数""" - import sys - from datetime import datetime - from PyQt4.QtCore import QCoreApplication - - def simpletest(event): - print u'处理每秒触发的计时器事件:%s' % str(datetime.now()) - - app = QCoreApplication(sys.argv) - - ee = EventEngine2() - #ee.register(EVENT_TIMER, simpletest) - ee.registerGeneralHandler(simpletest) - ee.start() - - app.exec_() - - -# 直接运行脚本可以进行测试 -if __name__ == '__main__': - test() \ No newline at end of file diff --git a/vnpy/trader/eventType.py b/vnpy/trader/eventType.py index 1b3ccaa4..b26175fb 100644 --- a/vnpy/trader/eventType.py +++ b/vnpy/trader/eventType.py @@ -1,16 +1,11 @@ # encoding: UTF-8 ''' -本文件仅用于存放对于事件类型常量的定义。 - -由于python中不存在真正的常量概念,因此选择使用全大写的变量名来代替常量。 -这里设计的命名规则以EVENT_前缀开头。 - -常量的内容通常选择一个能够代表真实意义的字符串(便于理解)。 - -建议将所有的常量定义放在该文件中,便于检查是否存在重复的现象。 +本文件基于vnpy.event.eventType,并添加更多字段 ''' +from vnpy.event.eventType import * + # 系统相关 EVENT_TIMER = 'eTimer' # 计时器事件,每隔1秒发送一次 EVENT_LOG = 'eLog' # 日志事件,全局通用 @@ -32,33 +27,4 @@ EVENT_CTA_STRATEGY = 'eCtaStrategy.' # CTA策略状态变化事件 EVENT_DATARECORDER_LOG = 'eDataRecorderLog' # 行情记录日志更新事件 # Wind接口相关 -EVENT_WIND_CONNECTREQ = 'eWindConnectReq' # Wind接口请求连接事件 - - -#---------------------------------------------------------------------- -def test(): - """检查是否存在内容重复的常量定义""" - check_dict = {} - - global_dict = globals() - - for key, value in global_dict.items(): - if '__' not in key: # 不检查python内置对象 - if value in check_dict: - check_dict[value].append(key) - else: - check_dict[value] = [key] - - for key, value in check_dict.items(): - if len(value)>1: - print u'存在重复的常量定义:' + str(key) - for name in value: - print name - print '' - - print u'测试完毕' - - -# 直接运行脚本可以进行测试 -if __name__ == '__main__': - test() \ No newline at end of file +EVENT_WIND_CONNECTREQ = 'eWindConnectReq' # Wind接口请求连接事件 \ No newline at end of file diff --git a/vnpy/trader/riskManager/rmEngine.py b/vnpy/trader/riskManager/rmEngine.py index 6f270bca..e28570b8 100644 --- a/vnpy/trader/riskManager/rmEngine.py +++ b/vnpy/trader/riskManager/rmEngine.py @@ -11,7 +11,8 @@ import json import os import platform -from eventEngine import * +from vnpy.event.eventEngine import * +from vnpy.trader.eventType import * from vtConstant import * from vtGateway import VtLogData diff --git a/vnpy/trader/riskManager/uiRmWidget.py b/vnpy/trader/riskManager/uiRmWidget.py index 3eebcd52..9780c66b 100644 --- a/vnpy/trader/riskManager/uiRmWidget.py +++ b/vnpy/trader/riskManager/uiRmWidget.py @@ -6,7 +6,7 @@ from uiBasicWidget import QtGui, QtCore -from eventEngine import * +from vnpy.event.eventEngine import * from language import text diff --git a/vnpy/trader/uiBasicWidget.py b/vnpy/trader/uiBasicWidget.py index 79e55697..86400d62 100644 --- a/vnpy/trader/uiBasicWidget.py +++ b/vnpy/trader/uiBasicWidget.py @@ -7,7 +7,8 @@ from collections import OrderedDict from PyQt4 import QtGui, QtCore -from eventEngine import * +from vnpy.event.eventEngine import * +from vnpy.trader.eventType import * from vtFunction import * from vtGateway import * import vtText diff --git a/vnpy/trader/vnrpc.py b/vnpy/trader/vnrpc.py deleted file mode 100644 index c3a979c9..00000000 --- a/vnpy/trader/vnrpc.py +++ /dev/null @@ -1,316 +0,0 @@ -# encoding: UTF-8 - -import threading -import traceback -import signal - -import zmq -from msgpack import packb, unpackb -from json import dumps, loads - -import cPickle -pDumps = cPickle.dumps -pLoads = cPickle.loads - - -# 实现Ctrl-c中断recv -signal.signal(signal.SIGINT, signal.SIG_DFL) - - -######################################################################## -class RpcObject(object): - """ - RPC对象 - - 提供对数据的序列化打包和解包接口,目前提供了json、msgpack、cPickle三种工具。 - - msgpack:性能更高,但通常需要安装msgpack相关工具; - json:性能略低但通用性更好,大部分编程语言都内置了相关的库。 - cPickle:性能一般且仅能用于Python,但是可以直接传送Python对象,非常方便。 - - 因此建议尽量使用msgpack,如果要和某些语言通讯没有提供msgpack时再使用json, - 当传送的数据包含很多自定义的Python对象时建议使用cPickle。 - - 如果希望使用其他的序列化工具也可以在这里添加。 - """ - - #---------------------------------------------------------------------- - def __init__(self): - """Constructor""" - # 默认使用msgpack作为序列化工具 - #self.useMsgpack() - self.usePickle() - - #---------------------------------------------------------------------- - def pack(self, data): - """打包""" - pass - - #---------------------------------------------------------------------- - def unpack(self, data): - """解包""" - pass - - #---------------------------------------------------------------------- - def __jsonPack(self, data): - """使用json打包""" - return dumps(data) - - #---------------------------------------------------------------------- - def __jsonUnpack(self, data): - """使用json解包""" - return loads(data) - - #---------------------------------------------------------------------- - def __msgpackPack(self, data): - """使用msgpack打包""" - return packb(data) - - #---------------------------------------------------------------------- - def __msgpackUnpack(self, data): - """使用msgpack解包""" - return unpackb(data) - - #---------------------------------------------------------------------- - def __picklePack(self, data): - """使用cPickle打包""" - return pDumps(data) - - #---------------------------------------------------------------------- - def __pickleUnpack(self, data): - """使用cPickle解包""" - return pLoads(data) - - #---------------------------------------------------------------------- - def useJson(self): - """使用json作为序列化工具""" - self.pack = self.__jsonPack - self.unpack = self.__jsonUnpack - - #---------------------------------------------------------------------- - def useMsgpack(self): - """使用msgpack作为序列化工具""" - self.pack = self.__msgpackPack - self.unpack = self.__msgpackUnpack - - #---------------------------------------------------------------------- - def usePickle(self): - """使用cPickle作为序列化工具""" - self.pack = self.__picklePack - self.unpack = self.__pickleUnpack - - -######################################################################## -class RpcServer(RpcObject): - """RPC服务器""" - - #---------------------------------------------------------------------- - def __init__(self, repAddress, pubAddress): - """Constructor""" - super(RpcServer, self).__init__() - - # 保存功能函数的字典,key是函数名,value是函数对象 - self.__functions = {} - - # zmq端口相关 - self.__context = zmq.Context() - - self.__socketREP = self.__context.socket(zmq.REP) # 请求回应socket - self.__socketREP.bind(repAddress) - - self.__socketPUB = self.__context.socket(zmq.PUB) # 数据广播socket - self.__socketPUB.bind(pubAddress) - - # 工作线程相关 - self.__active = False # 服务器的工作状态 - self.__thread = threading.Thread(target=self.run) # 服务器的工作线程 - - #---------------------------------------------------------------------- - def start(self): - """启动服务器""" - # 将服务器设为启动 - self.__active = True - - # 启动工作线程 - if not self.__thread.isAlive(): - self.__thread.start() - - #---------------------------------------------------------------------- - def stop(self): - """停止服务器""" - # 将服务器设为停止 - self.__active = False - - # 等待工作线程退出 - if self.__thread.isAlive(): - self.__thread.join() - - #---------------------------------------------------------------------- - def run(self): - """服务器运行函数""" - while self.__active: - # 使用poll来等待事件到达,等待1秒(1000毫秒) - if not self.__socketREP.poll(1000): - continue - - # 从请求响应socket收取请求数据 - reqb = self.__socketREP.recv() - - # 序列化解包 - req = self.unpack(reqb) - - # 获取函数名和参数 - name, args, kwargs = req - - # 获取引擎中对应的函数对象,并执行调用,如果有异常则捕捉后返回 - try: - func = self.__functions[name] - r = func(*args, **kwargs) - rep = [True, r] - except Exception as e: - rep = [False, traceback.format_exc()] - - # 序列化打包 - repb = self.pack(rep) - - # 通过请求响应socket返回调用结果 - self.__socketREP.send(repb) - - #---------------------------------------------------------------------- - def publish(self, topic, data): - """ - 广播推送数据 - topic:主题内容 - data:具体的数据 - """ - # 序列化数据 - datab = self.pack(data) - - # 通过广播socket发送数据 - self.__socketPUB.send_multipart([topic, datab]) - - #---------------------------------------------------------------------- - def register(self, func): - """注册函数""" - self.__functions[func.__name__] = func - - -######################################################################## -class RpcClient(RpcObject): - """RPC客户端""" - - #---------------------------------------------------------------------- - def __init__(self, reqAddress, subAddress): - """Constructor""" - super(RpcClient, self).__init__() - - # zmq端口相关 - self.__reqAddress = reqAddress - self.__subAddress = subAddress - - self.__context = zmq.Context() - self.__socketREQ = self.__context.socket(zmq.REQ) # 请求发出socket - self.__socketSUB = self.__context.socket(zmq.SUB) # 广播订阅socket - - # 工作线程相关,用于处理服务器推送的数据 - self.__active = False # 客户端的工作状态 - self.__thread = threading.Thread(target=self.run) # 客户端的工作线程 - - #---------------------------------------------------------------------- - def __getattr__(self, name): - """实现远程调用功能""" - # 执行远程调用任务 - def dorpc(*args, **kwargs): - # 生成请求 - req = [name, args, kwargs] - - # 序列化打包请求 - reqb = self.pack(req) - - # 发送请求并等待回应 - self.__socketREQ.send(reqb) - repb = self.__socketREQ.recv() - - # 序列化解包回应 - rep = self.unpack(repb) - - # 若正常则返回结果,调用失败则触发异常 - if rep[0]: - return rep[1] - else: - raise RemoteException(rep[1]) - - return dorpc - - #---------------------------------------------------------------------- - def start(self): - """启动客户端""" - # 连接端口 - self.__socketREQ.connect(self.__reqAddress) - self.__socketSUB.connect(self.__subAddress) - - # 将服务器设为启动 - self.__active = True - - # 启动工作线程 - if not self.__thread.isAlive(): - self.__thread.start() - - #---------------------------------------------------------------------- - def stop(self): - """停止客户端""" - # 将客户端设为停止 - self.__active = False - - # 等待工作线程退出 - if self.__thread.isAlive(): - self.__thread.join() - - #---------------------------------------------------------------------- - def run(self): - """客户端运行函数""" - while self.__active: - # 使用poll来等待事件到达,等待1秒(1000毫秒) - if not self.__socketSUB.poll(1000): - continue - - # 从订阅socket收取广播数据 - topic, datab = self.__socketSUB.recv_multipart() - - # 序列化解包 - data = self.unpack(datab) - - # 调用回调函数处理 - self.callback(topic, data) - - #---------------------------------------------------------------------- - def callback(self, topic, data): - """回调函数,必须由用户实现""" - raise NotImplementedError - - #---------------------------------------------------------------------- - def subscribeTopic(self, topic): - """ - 订阅特定主题的广播数据 - - 可以使用topic=''来订阅所有的主题 - """ - self.__socketSUB.setsockopt(zmq.SUBSCRIBE, topic) - - - -######################################################################## -class RemoteException(Exception): - """RPC远程异常""" - - #---------------------------------------------------------------------- - def __init__(self, value): - """Constructor""" - self.__value = value - - #---------------------------------------------------------------------- - def __str__(self): - """输出错误信息""" - return self.__value - - \ No newline at end of file diff --git a/vnpy/trader/vtEngine.py b/vnpy/trader/vtEngine.py index 8bf32dc0..7c2569f5 100644 --- a/vnpy/trader/vtEngine.py +++ b/vnpy/trader/vtEngine.py @@ -7,7 +7,8 @@ from datetime import datetime from pymongo import MongoClient from pymongo.errors import ConnectionFailure -from eventEngine import * +from vnpy.event.eventEngine import * +from vnpy.trader.eventType import * from vtGateway import * from vtFunction import loadMongoSetting from language import text diff --git a/vnpy/trader/vtGateway.py b/vnpy/trader/vtGateway.py index b6670783..16a525a1 100644 --- a/vnpy/trader/vtGateway.py +++ b/vnpy/trader/vtGateway.py @@ -2,7 +2,8 @@ import time -from eventEngine import * +from vnpy.event.eventEngine import * +from vnpy.trader.eventType import * from vtConstant import *