[新功能] RabbitMQ 通信组件

This commit is contained in:
msincenselee 2019-11-30 09:50:43 +08:00
parent 763341d2dc
commit 18f3062686
14 changed files with 1100 additions and 0 deletions

70
vnpy/amqp/README.md Normal file
View File

@ -0,0 +1,70 @@
代码源自余总得 https://github.com/yutiansut/QAPUBSUB/
**RabbitMQ and AMQP**
RabbitMQ 是采用 Erlang 语言实现的 AMQP 协议的消息中间件,
最初起源于金融系统,用于在分布式系统中存储转发消息。
RabbitMQ 发展到今天,被越来越多的人认可,
这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。
消息支持多种传递模式详细见案例https://www.rabbitmq.com/getstarted.html
RabbitMQ 是一种典型的点对点模式,也可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果
**1、点对点**
发布者只需要定义queue的名称
接收者只需要定义queue的名称。 隐含上auto_ack=True即收到消息后自动确认就会从Queue中删除
**2、工作队列**
任务发布者指定queue名称durable=True。发送任务时发送到指定的queue设置delivery_mode=2(持久化该任务)
执行接收者指定queue名称durable=True。执行任务后才确认. ch.basic_ack(delivery_tag=method.delivery_tag
**3、发布 / 订阅Pub/Sub模式**
发布者创建channel时指定Exchange名称类型为fanout。
发布时指定Exchange名称无routing_key。
订阅者创建channel时指定Exchagne名称类型为fanout。
创建动态queue名绑定私有。
绑定channel与queue指定Exchange和queue名。
**4、路由模式**
发布者发布者创建channel时指定Exchange名称类型为direct。
发布时指定Exchange名称打上route_key作为标签
订阅者创建channel时指定Exchagne名称类型为direct。
创建动态queue名绑定私有。
绑定channel与queue指定Exchange、queue名和期望获取的标签
如订阅多个标签,绑定多次即可。
**5、主题模式**
发布者创建channel时指定Exchange名称类型为topic。
发布时指定Exchange名称打上route_key作为标签
订阅者创建channel时指定Exchagne名称类型为topic。
创建动态queue名绑定私有。
绑定channel与queue指定Exchange、queue名和期望获取的标签匹配符
如订阅多个标签匹配符,绑定多次即可。
匹配有两个关键字:
* 1~多个字母
# 0~多个字母
**6、远程调用模式**
服务端响应创建channel时指定queue定义basic_qos为只有一个执行
定义消息的响应执行方法。
消息执行方法执行完毕后发布结果消息使用指定routing_key标签为回复的queue
推送属性包含参照ID确认ACK.
客户端请求创建2个queue一个是接收执行结果的queue并绑定执行结果回调响应自动ack。
另一个是推送请求的queue推送时增加reply_to 和 参照ID。
**consumer.py 提供两种消费者:**
1. 订阅模式下得subscriber
2. 点对点/路由/主题模式下得subscriber_routing
**producter.py提供两种生产者:**
1. 订阅模式下得publisher
2. 点对点/路由/主题模式下得publisher_routing

0
vnpy/amqp/__init__.py Normal file
View File

BIN
vnpy/amqp/amqp_arch.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 40 KiB

60
vnpy/amqp/base.py Normal file
View File

@ -0,0 +1,60 @@
# encoding: UTF-8
import pika
class base_broker():
def __init__(self, host='localhost', port=5672, user='guest', password='guest',
channel_number=1):
"""
:param host: 连接rabbitmq的服务器地址或者群集地址
:param port: 端口
:param user: 用户名
:param password: 密码
:param channel_number: 频道的数字大于1
"""
self.host = host
self.port = port
self.user = user
self.password = password
self.channel_number = channel_number
# 身份鉴权
self.credentials = pika.PlainCredentials(self.user, self.password, erase_on_connect=True)
# 创建连接
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port,
credentials=self.credentials,
heartbeat=0, socket_timeout=5,
)
)
# 创建一个频道,或者指定频段数字编号
self.channel = self.connection.channel(
channel_number=self.channel_number)
def reconnect(self):
"""
重新连接
:return:
"""
try:
self.connection.close()
except:
pass
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port,
credentials=self.credentials,
heartbeat=0, socket_timeout=5,))
self.channel = self.connection.channel(
channel_number=self.channel_number)
return self
def close(self):
if self.connection:
self.connection.close()

339
vnpy/amqp/consumer.py Normal file
View File

@ -0,0 +1,339 @@
# encoding: UTF-8
# 消息消费者类(集合)
import json
import pika
import random
import traceback
from vnpy.amqp.base import base_broker
from threading import Thread
######### 模式1接收者 #########
class receiver(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x',
queue='recv.{}'.format(random.randint(0, 1000000)), routing_key='default'):
super().__init__(host=host, port=port, user=user,
password=password)
# 唯一匹配
self.routing_key = routing_key
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key) # 队列名采用服务端分配的临时队列
# self.channel.basic_qos(prefetch_count=1)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] received: %r" % body)
def subscribe(self):
# 消息接收
self.channel.basic_consume(self.queue, self.callback, auto_ack=True)
self.channel.start_consuming()
def start(self):
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
######### 模式2执行接收者#########
class worker(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x_work_queue',
queue='task_queue', routing_key='default'):
super().__init__(host=host, port=port, user=user, password=password)
# 唯一匹配
self.routing_key = routing_key
self.exchange = exchange
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
durable=True)
self.queue = self.channel.queue_declare(queue=queue,durable=True).method.queue
print('worker use exchange:{},queue:{}'.format(exchange, self.queue))
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key) # 队列名采用服务端分配的临时队列
# 每个worker只执行一个
self.channel.basic_qos(prefetch_count=1)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] received task: %r" % body)
chan.basic_ack(delivery_tag=method_frame.delivery_tag)
print(" [x] task finished ")
def subscribe(self):
print('worker subscribed')
# 消息接收
self.channel.basic_consume(self.queue, self.callback, auto_ack=False)
self.channel.start_consuming()
def start(self):
print('worker start')
try:
self.subscribe()
except Exception as e:
print(str(e))
traceback.print_exc()
######### 模式3发布 / 订阅Pub/Sub模式, 订阅者#########
class subscriber(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x_fanout', queue='sub.{}'.format(random.randint(0, 1000000)),
routing_key='default'):
super().__init__(host=host, port=port, user=user, password=password)
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='fanout',
passive=False,
durable=False,
auto_delete=False)
self.routing_key = routing_key
# 队列名采用服务端分配的临时队列
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key)
# 缺省回调函数地址
self.cb_func = self.callback
def set_callback(self,cb_func):
self.cb_func = cb_func
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] %r" % body)
def subscribe(self):
self.channel.basic_consume(self.queue, self.cb_func, auto_ack=True)
self.channel.start_consuming()
# self.channel.basic_consume(
# self.callback, queue=self.queue_name, no_ack=True) # 消息接收
def start(self):
try:
self.subscribe()
except Exception as ex:
print('subscriber exception:{}'.format(str(ex)))
traceback.print_exc()
#self.start()
######### 模式4 路由模式 #########
class subscriber_routing(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x_direct', queue='sub_r.{}'.format(random.randint(0, 1000000)), routing_keys=['default']):
super().__init__(host=host, port=port, user=user,
password=password)
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
# 队列名采用服务端分配的临时队列
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
# 逐一绑定所有的routing 标签
for routing_key in routing_keys:
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=routing_key)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] %r" % body)
def subscribe(self):
self.channel.basic_consume(self.queue, self.callback, auto_ack=True)
self.channel.start_consuming()
def start(self):
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
######### 模式5主题模式 #########
class subscriber_topic(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x_topic', queue='sub_t.{}'.format(random.randint(0, 1000000)), routing_keys=['default']):
super().__init__(host=host, port=port, user=user,
password=password)
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='topic',
passive=False,
durable=False,
auto_delete=False)
# 队列名采用服务端分配的临时队列
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
# 逐一绑定所有的routing 标签
for routing_key in routing_keys:
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=routing_key)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] %r" % body)
def subscribe(self):
self.channel.basic_consume(self.queue, self.callback, auto_ack=True)
self.channel.start_consuming()
def start(self):
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
######### 模式6RPC模式 (服务端) #########
class rpc_server(base_broker):
# 接收:
# exchange: x_rpc
# queue: rpc_queue
# routing_key: gateway_name
# 发送执行结果:
#
def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x_rpc',
queue='rpc_queue', routing_key='default'):
super().__init__(host=host, port=port, user=user,
password=password)
self.exchange = exchange
# 唯一匹配
self.routing_key = routing_key
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
# 队列名采用指定队列( 若建立多个server方式随机分配任务使用指定queue
self.queue = queue
self.channel.queue_declare(queue=self.queue, auto_delete=True)
# 绑定 exchange->queue->channel,指定routing_key
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key)
# 支持多个订阅
self.channel.basic_qos(prefetch_count=1)
# method
self.method_dict = {}
def register_method(self, method_name, func):
"""登记方法名称与调用函数"""
self.method_dict.update({method_name: func})
def on_request(self, chan, method_frame, _header_frame, body, userdata=None):
"""
响应rpc请求得处理函数
:param chan:
:param method_frame:
:param _header_frame:
:param body:
:param userdata:
:return:
"""
if isinstance(body, bytes):
body = body.decode('utf-8')
if isinstance(body, str):
body = json.loads(body)
print(" [RPC Server] on_request: %r" % body)
# 判断body内容类型
if not isinstance(body, dict):
print(u'请求不是dict格式')
resp_data = {'ret_code': -1, 'err_msg': u'请求不是dict格式'}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id, method_frame.delivery_tag)
return
method = body.get('method', None)
params = body.get('params', {})
if method is None or method not in self.method_dict:
print(u'请求方法:{}不在配置中'.format(method))
resp_data = {'ret_code': -1, 'err_msg': u'请求方法:{}不在配置中'.format(method)}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id,
method_frame.delivery_tag)
return
function = self.method_dict.get(method)
try:
ret = function(**params)
resp_data = {'ret_code': 0, 'data': ret}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id,
method_frame.delivery_tag)
except Exception as ex:
print(u'mq rpc server exception:{}'.format(str(ex)))
traceback.print_exc()
resp_data = {'ret_code': -1, 'err_msg': '执行异常:{}'.format(str(ex))}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id,
method_frame.delivery_tag)
def reply(self, chan, reply_data, reply_to, reply_id, delivery_tag):
"""返回调用结果"""
# data => string
reply_msg = json.dumps(reply_data)
# 发送返回消息
chan.basic_publish(exchange=self.exchange,
routing_key=reply_to,
properties=pika.BasicProperties(content_type='application/json',
correlation_id=reply_id),
body=reply_msg)
# 确认标识
chan.basic_ack(delivery_tag=delivery_tag)
def subscribe(self):
print(' consuming queue:{}'.format(self.queue))
# 绑定消息接收指定当前queue得处理函数为on_request
self.channel.basic_consume(self.queue, self.on_request)
# 进入死循环,不断接收
self.channel.start_consuming()
def start(self):
"""启动接收"""
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
if __name__ == '__main__':
import sys
if len(sys.argv) >=2:
print(sys.argv)
from time import sleep
c = subscriber(user='admin', password='admin')
c.subscribe()
while True:
sleep(1)

391
vnpy/amqp/producer.py Normal file
View File

@ -0,0 +1,391 @@
# encoding: UTF-8
# 消息生产者类(集合)
import sys
import json
import pika
import random
import traceback
from threading import Thread
from uuid import uuid1
from vnpy.amqp.base import base_broker
######### 模式1发送者 #########
class sender(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x', queue_name='', routing_key='default'):
# 通过基类创建connection & channel
super().__init__(host, port, user, password, channel_number=1)
self.exchange = exchange
self.queue_name = queue_name
self.routing_key = routing_key
# 通过channel创建/使用一个queue。
# auto_delete 当所有已绑定在queue的consumer不使用此queue时自动删除此queue
# exclusive private queue,它是True时auto_delete也是True
self.channel.queue_declare(self.queue_name, auto_delete=True, exclusive=True)
# 通过channel创建/使用一个网关
# exchange_type: direct 点对点
# passive: 只是检查其是否存在
# durable: 是否需要queue持久化消息。开启将降低性能至1/10
# auto_delete: 当没有queue绑定时就自动删除
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
def pub(self, text):
# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式
content_type = 'application/json' if isinstance(text, dict) else 'text/plain'
try:
self.channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
except Exception as e:
print(e)
# 重连一次,继续发送
self.reconnect().channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
def exit(self):
self.connection.close()
######### 模式2工作队列,任务发布者 #########
class task_creator(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
channel_number=1, queue_name='task_queue', routing_key='default',
exchange='x_work_queue'):
# 通过基类创建connection & channel
super().__init__(host, port, user, password, channel_number)
self.queue_name = queue_name
self.exchange = exchange
self.routing_key = routing_key
# 通过channel创建/使用一个queue。
queue = self.channel.queue_declare(self.queue_name, durable=True).method.queue
print(u'create/use queue:{}')
# 通过channel创建/使用一个网关
# exchange_type: direct
# passive: 只是检查其是否存在
# durable: 是否需要queue持久化消息。开启将降低性能至1/10
# auto_delete: 当没有queue绑定时就自动删除
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
durable=True)
def pub(self, text):
# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式
content_type = 'application/json' if isinstance(text, dict) else 'text/plain'
try:
self.channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=2))
except Exception as e:
print(e)
# 重连一次,继续发送
self.reconnect().channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=2))
def exit(self):
self.connection.close()
######### 3、发布 / 订阅Pub/Sub模式发布者 #########
class publisher(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
channel_number=1, queue_name='', routing_key='default',
exchange='x_fanout'):
# 通过基类创建connection & channel
super().__init__(host, port, user, password, channel_number)
self.queue_name = queue_name
self.exchange = exchange
self.routing_key = routing_key
# 通过channel创建/使用一个queue。
# auto_delete 当所有已绑定在queue的consumer不使用此queue时自动删除此queue
# exclusive private queue,它是True时auto_delete也是True
self.channel.queue_declare(self.queue_name,
auto_delete=True, exclusive=True)
# 通过channel创建/使用一个网关
# exchange_type: fanout1对多的广播式 topic: 主题匹配
# passive: 只是检查其是否存在
# durable: 是否需要queue持久化消息。开启将降低性能至1/10
# auto_delete: 当没有queue绑定时就自动删除
self.channel.exchange_declare(exchange=exchange,
exchange_type='fanout',
passive=False,
durable=False,
auto_delete=False)
def pub(self, text, routing_key=None):
# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式
content_type = 'application/json' if isinstance(text, dict) else 'text/plain'
if routing_key is None:
routing_key = self.routing_key
try:
self.channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
except Exception as e:
print(e)
# 重连一次,继续发送
self.reconnect().channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
def exit(self):
self.connection.close()
######### 4、路由模式发布者 #########
class publisher_routing(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
channel_number=1, queue_name='', routing_key='default', exchange='x_direct'):
super().__init__(host, port, user, password, channel_number)
self.queue_name = queue_name
self.exchange = exchange
self.routing_key = routing_key
self.channel.queue_declare(
self.queue_name, auto_delete=True, exclusive=True)
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
def pub(self, text, routing_key):
# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式
content_type = 'application/json' if isinstance(text, dict) else 'text/plain'
try:
self.channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
except Exception as e:
print(e)
self.reconnect().channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
def exit(self):
self.connection.close()
######### 5、主题模式发布者 #########
class publisher_topic(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
channel_number=1, queue_name='', routing_key='default', exchange='x_topic'):
super().__init__(host, port, user, password, channel_number)
self.queue_name = queue_name
self.exchange = exchange
self.routing_key = routing_key
self.channel.queue_declare(
self.queue_name, auto_delete=True, exclusive=True)
self.channel.exchange_declare(exchange=exchange,
exchange_type='topic',
passive=False,
durable=False,
auto_delete=False)
def pub(self, text, routing_key):
# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式
content_type = 'application/json' if isinstance(text, dict) else 'text/plain'
try:
self.channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
except Exception as e:
print(e)
self.reconnect().channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
body=text,
properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1))
def exit(self):
self.connection.close()
######### 6、RPC模式调用者) #########
class rpc_client(base_broker):
# 发送:
# exchange: x_rpc
# queue: rpc_queue
# 接收结果:
# exchange: x_rpc
# queue: 动态生成 cb_queue_name
# 执行结果回调:
# 暂时无用得 queue_name='rpc_queue',
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x_rpc', routing_key='default'):
# 通过基类创建connection & channel
super().__init__(host, port, user, password, channel_number=1)
self.exchange = exchange
#self.queue_name = queue_name
self.routing_key = routing_key
# 通过channel创建/使用一个网关
# exchange_type: direct 点对点
# passive: 只是检查其是否存在
# durable: 是否需要queue持久化消息。开启将降低性能至1/10
# auto_delete: 当没有queue绑定时就自动删除
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
# 创建/声明rpc结果消息队列
result = self.channel.queue_declare(queue='', exclusive=True)
self.cb_queue_name = result.method.queue
print('call back queue name:{}'.format(self.cb_queue_name))
# 绑定 回调消息队列exchange和channnel
self.channel.queue_bind(queue=self.cb_queue_name, exchange=exchange)
# 绑定 回调消息队列得接受处理函数为on_respone
self.channel.basic_consume(queue=self.cb_queue_name,
on_message_callback=self.on_respone,
auto_ack=True)
# 回调函数字典
self.cb_dict = {}
self.thread = Thread(target=self.start)
self.thread.start()
def on_respone(self, ch, method, props, body):
"""
相应 cb_queue返回的结果处理信息
:param ch:
:param method:
:param props:
:param body:
:return:
"""
if isinstance(body, bytes):
body = body.decode('utf-8')
if isinstance(body, str):
body = json.loads(body)
cb = self.cb_dict.pop(props.correlation_id, None)
if cb:
try:
cb(body)
except Exception as ex:
print('on_respone exception when call cb.{}'.format(str(ex)))
traceback.print_exc()
def call(self, req_text, correlation_id=None, cb_func=None):
"""
远程调用
:param req_text: 调用指令及内容
:param cb_func: 回调函数地址
:return:
"""
# channel.basic_publish向队列中发送信息
# exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
# routing_key 指定向哪个队列中发送消息
# body是要插入的内容, 字符串格式
#
content_type = 'application/json' if isinstance(req_text, dict) else 'text/plain'
if correlation_id is None:
correlation_id = str(uuid1())
try:
print(u'sending request message, exchange:{},routing_key:{},body:{},reply_queue:{}'
.format(self.exchange, self.routing_key, req_text, self.cb_queue_name))
self.channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=req_text,
properties=pika.BasicProperties(content_type=content_type,
reply_to=self.cb_queue_name,
correlation_id=correlation_id
))
except Exception as e:
print(e)
# 重连一次,继续发送
self.reconnect().channel.basic_publish(exchange=self.exchange,
routing_key=self.routing_key,
body=req_text,
properties=pika.BasicProperties(content_type=content_type,
reply_to=self.cb_queue_name,
correlation_id=correlation_id
))
# 登记参照id和回调函数
if cb_func:
self.cb_dict.update({correlation_id: cb_func})
def start(self):
try:
self.channel.start_consuming()
except Exception as ex:
print('rpc_client consuming exception:{}'.format(str(ex)))
traceback.print_exc()
self.channel.start_consuming()
def exit(self):
try:
self.channel.stop_consuming()
self.channel.close()
self.connection.close()
if self.thread:
self.thread.join()
except:
pass
if __name__ == '__main__':
import datetime
import time
p = publisher()
while True:
time.sleep(1)
msg = '{}'.format(datetime.datetime.now())
print(msg)
p.pub(msg)

View File

@ -0,0 +1,13 @@
from vnpy.amqp.consumer import receiver
if __name__ == '__main__':
import sys
from time import sleep
c = receiver(user='admin', password='admin')
c.subscribe()
while True:
sleep(1)

View File

@ -0,0 +1,11 @@
from vnpy.amqp.producer import sender
if __name__ == '__main__':
import datetime
import time
p = sender()
while True:
time.sleep(1)
msg = '{}'.format(datetime.datetime.now())
print(u'[x] send :{}'.format(msg))
p.pub(msg)

27
vnpy/amqp/test02_task.py Normal file
View File

@ -0,0 +1,27 @@
# encoding: UTF-8
from uuid import uuid1
import json
from vnpy.amqp.producer import task_creator
from vnpy.trader.constant import Direction
if __name__ == '__main__':
import datetime
import time
p = task_creator(host='192.168.0.202')
while True:
time.sleep(10)
mission = {}
mission.update({'id':str(uuid1())})
mission.update({'templateName': u'TWAP 时间加权平均'})
mission.update({'direction': Direction.LONG})
mission.update({'vtSymbol': '518880'})
mission.update({'is_stock': True})
mission.update({'totalVolume': 300})
mission.update({'target_price': 3.20})
mission.update({'minVolume': 100})
mission.update({'orderTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
msg = json.dumps(mission)
print(u'[x] create task :{}'.format(msg))
p.pub(msg)
break

13
vnpy/amqp/test02_woker.py Normal file
View File

@ -0,0 +1,13 @@
# encoding: UTF-8
from vnpy.amqp.consumer import worker
if __name__ == '__main__':
import sys
from time import sleep
c = worker(host='192.168.0.202', user='admin', password='admin')
c.start()
while True:
sleep(1)

View File

@ -0,0 +1,15 @@
# encoding: UTF-8
from vnpy.amqp.consumer import subscriber
if __name__ == '__main__':
import sys
from time import sleep
c = subscriber(user='admin', password='admin', exchange='x_fanout_md_tick')
c.subscribe()
while True:
sleep(1)

View File

@ -0,0 +1,37 @@
# encoding: UTF-8
from uuid import uuid1
import json
import random
from vnpy.amqp.producer import rpc_client
def cb_function(*args):
print('resp call back')
for arg in args:
print(u'{}'.format(arg))
if __name__ == '__main__':
import datetime
import time
c = rpc_client(host='localhost', user='admin', password='admin')
counter = 0
while True:
time.sleep(0.1)
mission = {'method': 'test_01'}
params = {}
params.update({'p2': random.random()})
params.update({'p3': random.random()})
params.update({'p1': counter})
mission.update({'params': params})
msg = json.dumps(mission)
print(u'[x] rpc call :{}'.format(msg))
c.call(msg,str(uuid1()), cb_function)
counter += 1
if counter > 100:
break
print('exit')
c.exit()

View File

@ -0,0 +1,63 @@
import os, sys, copy
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
routing_key = 'default'
from vnpy.amqp.consumer import rpc_server
import argparse
def test_func01(p1,p2,p3):
print('test_func01:', p1, p2, p3)
return p1+p2+p3
def test_func02(p1, p2=0):
print('test_func02:', p1, p2)
return str(p1 + p2)
def get_strategy_names():
print(u'{}'.format(routing_key))
return ['stratege_name_01', 'strategy_name_02']
if __name__ == '__main__':
# 参数分析
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--host', type=str, default='localhost',
help='rabbit mq host')
parser.add_argument('-p', '--port', type=int, default=5672,
help='rabbit mq port')
parser.add_argument('-U', '--user', type=str, default='admin',
help='rabbit mq user')
parser.add_argument('-P', '--password', type=str, default='admin',
help='rabbit mq password')
parser.add_argument('-x', '--exchange', type=str, default='exchange',
help='rabbit mq exchange')
parser.add_argument('-q', '--queue', type=str, default='queue',
help='rabbit mq queue')
parser.add_argument('-r', '--routing_key', type=str, default='default',
help='rabbit mq routing_key')
args = parser.parse_args()
routing_key = copy.copy(args.routing_key)
from time import sleep
s = rpc_server(host=args.host,
port=args.port,
user=args.user,
password=args.password,
exchange=args.exchange,
queue=args.queue,
routing_key=args.routing_key)
s.register_method('test_01', test_func01)
s.register_method('test_02', test_func02)
s.register_method('get_strategy_names', get_strategy_names)
s.start()
while True:
sleep(1)

View File

@ -0,0 +1,61 @@
import os, sys
# 将repostory的目录i作为根目录添加到系统环境中。
ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
sys.path.append(ROOT_PATH)
from vnpy.amqp.producer import rpc_client
from uuid import uuid1
import json
import random
import argparse
def cb_function(*args):
print('resp call back')
for arg in args:
if isinstance(arg,bytes):
print(u'{}'.format(arg.decode('utf-8')))
else:
print(u'{}'.format(arg))
from vnpy.trader.vtConstant import *
if __name__ == '__main__':
# 参数分析
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--host', type=str, default='localhost',
help='rabbit mq host')
parser.add_argument('-p', '--port', type=int, default=5672,
help='rabbit mq port')
parser.add_argument('-U', '--user', type=str, default='admin',
help='rabbit mq user')
parser.add_argument('-P', '--password', type=str, default='admin',
help='rabbit mq password')
parser.add_argument('-x', '--exchange', type=str, default='exchange',
help='rabbit mq exchange')
parser.add_argument('-q', '--queue', type=str, default='queue',
help='rabbit mq queue')
parser.add_argument('-r', '--routing_key', type=str, default='default',
help='rabbit mq routing_key')
args = parser.parse_args()
import datetime
import time
c = rpc_client(host=args.host, port=args.port, user=args.user, password=args.password, exchange=args.exchange, queue_name=args.queue, routing_key=args.routing_key)
counter = 0
while True:
time.sleep(10)
mission = {'method': 'get_strategy_names'}
params = {}
mission.update({'params': params})
msg = json.dumps(mission)
print(u'[x] rpc call :{}'.format(msg))
c.call(msg, str(uuid1()), cb_function)
counter += 1
if counter > 1:
break
print('exit')
c.exit()