[增强功能] rabbitMQ组件更新

This commit is contained in:
msincenselee 2020-12-11 15:59:09 +08:00
parent e50e21fac3
commit 795d83a6b2
2 changed files with 33 additions and 19 deletions

View File

@ -62,28 +62,36 @@ class worker(base_broker):
self.channel.exchange_declare(exchange=exchange, self.channel.exchange_declare(exchange=exchange,
exchange_type='direct', exchange_type='direct',
durable=True) durable=True)
#print(f'remove queue if exist and none consume')
#self.channel.queue_delete(queue=queue, if_unused=True)
self.queue = self.channel.queue_declare(queue="", durable=False, auto_delete=True, exclusive=True).method.queue
if self.queue !=queue:
print(f'work queue:{queue}=>{self.queue}\n')
self.queue = self.channel.queue_declare(queue=queue, durable=True).method.queue print('worker bind {}:{}/ex:{}/queue:{}/r_k:{}'.format(host, port, exchange, self.queue, routing_key))
print('worker use exchange:{},queue:{}'.format(exchange, self.queue))
self.channel.queue_bind(queue=self.queue, exchange=exchange, self.channel.queue_bind(queue=self.queue, exchange=exchange,
routing_key=self.routing_key) # 队列名采用服务端分配的临时队列 routing_key=self.routing_key) # 队列名采用服务端分配的临时队列
# 每个worker只执行一个 # 每个worker只执行一个
self.channel.basic_qos(prefetch_count=1) self.channel.basic_qos(prefetch_count=1)
# 消息接收《=》call_back绑定
self.channel.basic_consume(self.queue, self.callback, auto_ack=False)
def callback(self, chan, method_frame, _header_frame, body, userdata=None): def callback(self, chan, method_frame, _header_frame, body, userdata=None):
#print(1) #print(1)
print(" [x] received task: %r" % body) print(" [x] received task: %r\n" % body)
chan.basic_ack(delivery_tag=method_frame.delivery_tag) chan.basic_ack(delivery_tag=method_frame.delivery_tag)
print(" [x] task finished ") print(" [x] task finished \n")
def subscribe(self): def subscribe(self):
print('worker subscribed') print(f'worker subscribed on queue:{self.queue}\n')
# 消息接收
self.channel.basic_consume(self.queue, self.callback, auto_ack=False)
self.channel.start_consuming() self.channel.start_consuming()
def start(self): def start(self):
"""
启动
:return:
"""
print('worker start') print('worker start')
try: try:
self.subscribe() self.subscribe()

View File

@ -1,6 +1,6 @@
# encoding: UTF-8 # encoding: UTF-8
# 消息生产者类(集合) # 消息生产者类(集合)
import sys
import json import json
import pika import pika
import traceback import traceback
@ -50,13 +50,15 @@ class sender(base_broker):
properties=pika.BasicProperties(content_type=content_type, properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1)) delivery_mode=1))
except Exception as e: except Exception as e:
print(e) print(e,file=sys.stderr)
# 重连一次,继续发送 if 'Channel is closed' in str(e):
self.reconnect().channel.basic_publish(exchange=self.exchange, self.reconnect()
routing_key=self.routing_key, # # 重连一次,继续发送
body=text, # self.reconnect().channel.basic_publish(exchange=self.exchange,
properties=pika.BasicProperties(content_type=content_type, # routing_key=self.routing_key,
delivery_mode=1)) # body=text,
# properties=pika.BasicProperties(content_type=content_type,
# delivery_mode=1))
def exit(self): def exit(self):
self.connection.close() self.connection.close()
@ -77,7 +79,9 @@ class task_creator(base_broker):
# 通过channel创建/使用一个queue。 # 通过channel创建/使用一个queue。
queue = self.channel.queue_declare(self.queue_name, durable=True).method.queue queue = self.channel.queue_declare(self.queue_name, durable=True).method.queue
print(f'create/use queue:{queue}') if queue == self.queue_name:
print(f'task queue {self.queue_name} => {queue}')
print(f'create/use queue:{host}:{port}/q_n:{queue}/r_k:{self.routing_key}\n')
# 通过channel创建/使用一个网关 # 通过channel创建/使用一个网关
# exchange_type: direct # exchange_type: direct
# passive: 只是检查其是否存在 # passive: 只是检查其是否存在
@ -158,14 +162,16 @@ class publisher(base_broker):
properties=pika.BasicProperties(content_type=content_type, properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1)) delivery_mode=1))
except Exception as e: except Exception as e:
print(e) print(f'pub exception:{str(e)}')
# 重连一次,继续发送 # 重连一次,继续发送
self.reconnect().channel.basic_publish(exchange=self.exchange, try:
self.reconnect().channel.basic_publish(exchange=self.exchange,
routing_key=routing_key, routing_key=routing_key,
body=text, body=text,
properties=pika.BasicProperties(content_type=content_type, properties=pika.BasicProperties(content_type=content_type,
delivery_mode=1)) delivery_mode=1))
except Exception as ex:
print(f're pub ex:{ex}')
def exit(self): def exit(self):
self.connection.close() self.connection.close()