增加rpcService模块,以及ServerClient的示例

This commit is contained in:
vn.py 2017-09-02 23:16:58 +08:00
parent aebac741f1
commit 286a8f72a1
10 changed files with 325 additions and 0 deletions

View File

@ -0,0 +1,7 @@
{
"brokerID": "9999",
"mdAddress": "tcp://180.168.146.187:10011",
"tdAddress": "tcp://180.168.146.187:10001",
"userID": "simnow申请",
"password": "simnow申请"
}

View File

@ -0,0 +1,4 @@
{
"repAddress": "tcp://*:2014",
"pubAddress": "tcp://*:0602"
}

View File

@ -0,0 +1,16 @@
{
"fontFamily": "微软雅黑",
"fontSize": 12,
"mongoHost": "localhost",
"mongoPort": 27017,
"mongoLogging": true,
"darkStyle": true,
"language": "chinese",
"logActive": true,
"logLevel": "debug",
"logConsole": true,
"logFile": true
}

View File

@ -0,0 +1,42 @@
# encoding: UTF-8
# 重载sys模块设置默认字符串编码方式为utf8
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# 判断操作系统
import platform
system = platform.system()
# vn.trader模块
from vnpy.event import EventEngine
from vnpy.trader.uiQt import createQApp
from vnpy.trader.uiMainWindow import MainWindow
from vnpy.trader.app.rpcService.rsClient import MainEngineProxy
#----------------------------------------------------------------------
def main():
"""主程序入口"""
# 创建Qt应用对象
qApp = createQApp()
# 创建事件引擎
ee = EventEngine()
# 创建主引擎
reqAddress = 'tcp://localhost:2014'
subAddress = 'tcp://localhost:0602'
me = MainEngineProxy(ee)
me.init(reqAddress, subAddress)
# 创建主窗口
mw = MainWindow(me, ee)
mw.showMaximized()
# 在主线程中启动Qt事件循环
sys.exit(qApp.exec_())
if __name__ == '__main__':
main()

View File

@ -0,0 +1,46 @@
# encoding: UTF-8
# 重载sys模块设置默认字符串编码方式为utf8
import sys
reload(sys)
sys.setdefaultencoding('utf8')
# 判断操作系统
import platform
system = platform.system()
# vn.trader模块
from vnpy.event import EventEngine2
from vnpy.trader.vtEngine import MainEngine
# 加载底层接口
from vnpy.trader.gateway import ctpGateway
# 加载上层应用
from vnpy.trader.app import (riskManager, ctaStrategy, rpcService)
#----------------------------------------------------------------------
def main():
"""主程序入口"""
# 创建事件引擎
ee = EventEngine2()
# 创建主引擎
me = MainEngine(ee)
# 添加交易接口
me.addGateway(ctpGateway)
# 添加上层应用
#me.addApp(riskManager)
#me.addApp(ctaStrategy)
#me.addApp(rpcService)
# 阻塞运行
cmd = ''
while cmd != 'exit':
cmd = raw_input()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,4 @@
{
"repAddress": "tcp://*:2014",
"pubAddress": "tcp://*:0602"
}

View File

@ -0,0 +1,11 @@
# encoding: UTF-8
from rsEngine import RsEngine
#from uiRmWidget import RmEngineManager
from vnpy.trader.uiQt import QtWidgets
appName = 'RpcService'
appDisplayName = u'RPC服务'
appEngine = RsEngine
appWidget = QtWidgets.QWidget
appIco = 'rs.ico'

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -0,0 +1,102 @@
# encoding: UTF-8
import copy
from vnpy.rpc import RpcClient
########################################################################
class AttributeProxy(object):
"""属性代理"""
#----------------------------------------------------------------------
def __init__(self, nameList, client):
"""Constructor"""
self.nameList = nameList # 属性名称关系列表
self.client = client # RPC客户端
#----------------------------------------------------------------------
def __getattr__(self, name):
"""获取某个不存在的属性"""
# 生成属性层级列表
newNameList = copy.copy(self.nameList)
newNameList.append(name)
# 创建代理对象
proxy = AttributeProxy(newNameList, self.client)
# 缓存代理对象
self.__dict__[name] = proxy
# 返回
return proxy
#----------------------------------------------------------------------
def __call__(self, *args, **kwargs):
"""被当做函数调用时"""
d = {}
d['nameList'] = self.nameList
d['args'] = args
d['kwargs'] = kwargs
return self.client.call(d)
########################################################################
class RsClient(RpcClient):
""""""
#----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress):
"""Constructor"""
super(RsClient, self).__init__(reqAddress, subAddress)
self.eventEngine = None
#----------------------------------------------------------------------
def callback(self, topic, data):
""""""
self.eventEngine.put(data)
#----------------------------------------------------------------------
def init(self, eventEngine):
""""""
self.eventEngine = eventEngine
self.usePickle()
self.subscribeTopic('')
self.start()
########################################################################
class MainEngineProxy(object):
""""""
#----------------------------------------------------------------------
def __init__(self, eventEngine):
"""Constructor"""
self.eventEngine = eventEngine
self.eventEngine.start(timer=False)
self.client = None
#----------------------------------------------------------------------
def init(self, reqAddress, subAddress):
""""""
self.client = RsClient(reqAddress, subAddress)
self.client.init(self.eventEngine)
#----------------------------------------------------------------------
def __getattr__(self, name):
""""""
# 生成属性名称层级列表
nameList = [name]
# 生成属性代理对象
proxy = AttributeProxy(nameList, self.client)
# 缓存属性代理对象,使得后续调用无需新建
self.__dict__[name] = proxy
# 返回属性代理
return proxy

View File

@ -0,0 +1,93 @@
# encoding: UTF-8
import json
from vnpy.trader.vtConstant import EMPTY_STRING
from vnpy.rpc import RpcServer
from vnpy.trader.vtFunction import getJsonPath
########################################################################
class RsEngine(object):
"""RPC服务引擎"""
settingFileName = 'RS_setting.json'
settingFilePath = getJsonPath(settingFileName, __file__)
name = u'RPC服务'
#----------------------------------------------------------------------
def __init__(self, mainEngine, eventEngine):
"""Constructor"""
self.mainEngine = mainEngine
self.eventEngine = eventEngine
self.server = None # RPC服务对象
self.repAddress = EMPTY_STRING # REP地址
self.pubAddress = EMPTY_STRING # PUB地址
self.functionDict = {} # 调用过的函数对象缓存字典
self.loadSetting()
self.registerEvent()
#----------------------------------------------------------------------
def loadSetting(self):
"""读取配置"""
with open(self.settingFilePath) as f:
d = json.load(f)
self.repAddress = d['repAddress']
self.pubAddress = d['pubAddress']
self.server = RpcServer(self.repAddress, self.pubAddress)
self.server.usePickle()
self.server.register(self.call)
self.server.start()
#----------------------------------------------------------------------
def registerEvent(self):
"""注册事件监听"""
self.eventEngine.registerGeneralHandler(self.processEvent)
#----------------------------------------------------------------------
def call(self, d):
"""调用函数"""
nameList = d['nameList'] # 对象属性列表
nameTuple = tuple(nameList) # 转化为元组
args = d['args'] # 调用参数
kwargs = d['kwargs']
# 如果已经有缓存,则直接调用
if nameTuple in self.functionDict:
function = self.functionDict[nameTuple]
result = function(*args, **kwargs)
return result
# 逐层寻找函数对象
else:
# 根对象为主引擎
obj = self.mainEngine
# 逐层寻找对象属性
for name in nameTuple:
obj = self.mainEngine.__getattribute__(name)
# 缓存结果
self.functionDict[nameTuple] = obj
# 调用最终对象
result = obj(*args, **kwargs)
return result
#----------------------------------------------------------------------
def processEvent(self, event):
"""处理事件推送"""
self.server.publish('', event)
#----------------------------------------------------------------------
def stop(self):
"""停止"""
pass