vnpy/vnpy/amqp/consumer.py
msincenselee 2b0ff99560 rabbitmq
2019-08-25 15:48:21 +08:00

330 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pika
import json
from vnpy.amqp.base import base_broker
import random
import traceback
from threading import Thread
######### 模式1接收者 #########
class receiver(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x',
queue='recv.{}'.format(random.randint(0, 1000000)), routing_key='default'):
super().__init__(host=host, port=port, user=user,
password=password)
# 唯一匹配
self.routing_key = routing_key
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key) # 队列名采用服务端分配的临时队列
# self.channel.basic_qos(prefetch_count=1)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] received: %r" % body)
def subscribe(self):
# 消息接收
self.channel.basic_consume(self.queue, self.callback, auto_ack=True)
self.channel.start_consuming()
def start(self):
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
######### 模式2执行接收者#########
class worker(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x_work_queue',
queue='task_queue', routing_key='default'):
super().__init__(host=host, port=port, user=user, password=password)
# 唯一匹配
self.routing_key = routing_key
self.exchange = exchange
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
durable=True)
self.queue = self.channel.queue_declare(queue=queue,durable=True).method.queue
print('worker use exchange:{},queue:{}'.format(exchange, self.queue))
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key) # 队列名采用服务端分配的临时队列
# 每个worker只执行一个
self.channel.basic_qos(prefetch_count=1)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] received task: %r" % body)
chan.basic_ack(delivery_tag=method_frame.delivery_tag)
print(" [x] task finished ")
def subscribe(self):
print('worker subscribed')
# 消息接收
self.channel.basic_consume(self.queue, self.callback, auto_ack=False)
self.channel.start_consuming()
def start(self):
print('worker start')
try:
self.subscribe()
except Exception as e:
print(str(e))
traceback.print_exc()
######### 模式3发布 / 订阅Pub/Sub模式, 订阅者#########
class subscriber(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x_fanout', queue='sub.{}'.format(random.randint(0, 1000000)),
routing_key='default'):
super().__init__(host=host, port=port, user=user, password=password)
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='fanout',
passive=False,
durable=False,
auto_delete=False)
self.routing_key = routing_key
# 队列名采用服务端分配的临时队列
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key)
# 缺省回调函数地址
self.cb_func = self.callback
def set_callback(self,cb_func):
self.cb_func = cb_func
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] %r" % body)
def subscribe(self):
self.channel.basic_consume(self.queue, self.cb_func, auto_ack=True)
self.channel.start_consuming()
# self.channel.basic_consume(
# self.callback, queue=self.queue_name, no_ack=True) # 消息接收
def start(self):
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
######### 模式4 路由模式 #########
class subscriber_routing(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x_direct', queue='sub_r.{}'.format(random.randint(0, 1000000)), routing_keys=['default']):
super().__init__(host=host, port=port, user=user,
password=password)
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
# 队列名采用服务端分配的临时队列
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
# 逐一绑定所有的routing 标签
for routing_key in routing_keys:
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=routing_key)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] %r" % body)
def subscribe(self):
self.channel.basic_consume(self.queue, self.callback, auto_ack=True)
self.channel.start_consuming()
def start(self):
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
######### 模式5主题模式 #########
class subscriber_topic(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin',
exchange='x_topic', queue='sub_t.{}'.format(random.randint(0, 1000000)), routing_keys=['default']):
super().__init__(host=host, port=port, user=user,
password=password)
self.queue = queue
self.channel.exchange_declare(exchange=exchange,
exchange_type='topic',
passive=False,
durable=False,
auto_delete=False)
# 队列名采用服务端分配的临时队列
self.queue = self.channel.queue_declare(
queue='', auto_delete=True, exclusive=True).method.queue
# 逐一绑定所有的routing 标签
for routing_key in routing_keys:
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=routing_key)
def callback(self, chan, method_frame, _header_frame, body, userdata=None):
print(1)
print(" [x] %r" % body)
def subscribe(self):
self.channel.basic_consume(self.queue, self.callback, auto_ack=True)
self.channel.start_consuming()
def start(self):
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
######### 模式6RPC模式 (服务端) #########
class rpc_server(base_broker):
def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x_rpc',
queue='rpc_queue', routing_key='default'):
super().__init__(host=host, port=port, user=user,
password=password)
self.exchange = exchange
# 唯一匹配
self.routing_key = routing_key
self.channel.exchange_declare(exchange=exchange,
exchange_type='direct',
passive=False,
durable=False,
auto_delete=False)
# 队列名采用指定队列
self.queue = queue
self.channel.queue_declare(queue=self.queue, auto_delete=True)
# 绑定 exchange->queue->channel,指定routing_key
self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key)
# 支持多个订阅
self.channel.basic_qos(prefetch_count=1)
# method
self.method_dict = {}
def register_method(self, method_name, func):
"""登记方法名称与调用函数"""
self.method_dict.update({method_name: func})
def on_request(self, chan, method_frame, _header_frame, body, userdata=None):
"""
响应rpc请求得处理函数
:param chan:
:param method_frame:
:param _header_frame:
:param body:
:param userdata:
:return:
"""
if isinstance(body, bytes):
body = body.decode('utf-8')
if isinstance(body, str):
body = json.loads(body)
print(" [RPC Server] on_request: %r" % body)
# 判断body内容类型
if not isinstance(body, dict):
print(u'请求不是dict格式')
resp_data = {'ret_code': -1, 'err_msg': u'请求不是dict格式'}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id, method_frame.delivery_tag)
return
method = body.get('method', None)
params = body.get('params', {})
if method is None or method not in self.method_dict:
print(u'请求方法:{}不在配置中'.format(method))
resp_data = {'ret_code': -1, 'err_msg': u'请求方法:{}不在配置中'.format(method)}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id,
method_frame.delivery_tag)
return
function = self.method_dict.get(method)
try:
ret = function(**params)
resp_data = {'ret_code': 0, 'data': ret}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id,
method_frame.delivery_tag)
except Exception as ex:
print(u'mq rpc server exception:{}'.format(str(ex)))
traceback.print_exc()
resp_data = {'ret_code': -1, 'err_msg': '执行异常:{}'.format(str(ex))}
self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id,
method_frame.delivery_tag)
def reply(self, chan, reply_data, reply_to, reply_id, delivery_tag):
"""返回调用结果"""
# data => string
reply_msg = json.dumps(reply_data)
# 发送返回消息
chan.basic_publish(exchange=self.exchange,
routing_key=reply_to,
properties=pika.BasicProperties(content_type='application/json',
correlation_id=reply_id),
body=reply_msg)
# 确认标识
chan.basic_ack(delivery_tag=delivery_tag)
def subscribe(self):
print(' consuming queue:{}'.format(self.queue))
# 绑定消息接收指定当前queue得处理函数为on_request
self.channel.basic_consume(self.queue, self.on_request)
# 进入死循环,不断接收
self.channel.start_consuming()
def start(self):
"""启动接收"""
try:
self.subscribe()
except Exception as e:
print(e)
self.start()
if __name__ == '__main__':
import sys
if len(sys.argv) >=2:
print(sys.argv)
from time import sleep
c = subscriber(user='admin', password='admin')
c.subscribe()
while True:
sleep(1)