diff --git a/vnpy/amqp/consumer.py b/vnpy/amqp/consumer.py index d7611df1..887d4913 100644 --- a/vnpy/amqp/consumer.py +++ b/vnpy/amqp/consumer.py @@ -62,28 +62,36 @@ class worker(base_broker): self.channel.exchange_declare(exchange=exchange, exchange_type='direct', durable=True) + #print(f'remove queue if exist and none consume') + #self.channel.queue_delete(queue=queue, if_unused=True) + self.queue = self.channel.queue_declare(queue="", durable=False, auto_delete=True, exclusive=True).method.queue + if self.queue !=queue: + print(f'work queue:{queue}=>{self.queue}\n') - self.queue = self.channel.queue_declare(queue=queue, durable=True).method.queue - print('worker use exchange:{},queue:{}'.format(exchange, self.queue)) + print('worker bind {}:{}/ex:{}/queue:{}/r_k:{}'.format(host, port, exchange, self.queue, routing_key)) self.channel.queue_bind(queue=self.queue, exchange=exchange, routing_key=self.routing_key) # 队列名采用服务端分配的临时队列 # 每个worker只执行一个 self.channel.basic_qos(prefetch_count=1) + # 消息接收《=》call_back绑定 + self.channel.basic_consume(self.queue, self.callback, auto_ack=False) def callback(self, chan, method_frame, _header_frame, body, userdata=None): #print(1) - print(" [x] received task: %r" % body) + print(" [x] received task: %r\n" % body) chan.basic_ack(delivery_tag=method_frame.delivery_tag) - print(" [x] task finished ") + print(" [x] task finished \n") def subscribe(self): - print('worker subscribed') - # 消息接收 - self.channel.basic_consume(self.queue, self.callback, auto_ack=False) + print(f'worker subscribed on queue:{self.queue}\n') self.channel.start_consuming() def start(self): + """ + 启动 + :return: + """ print('worker start') try: self.subscribe() diff --git a/vnpy/amqp/producer.py b/vnpy/amqp/producer.py index 8891eded..9c8df12b 100644 --- a/vnpy/amqp/producer.py +++ b/vnpy/amqp/producer.py @@ -1,6 +1,6 @@ # encoding: UTF-8 # 消息生产者类(集合) - +import sys import json import pika import traceback @@ -50,13 +50,15 @@ class sender(base_broker): properties=pika.BasicProperties(content_type=content_type, delivery_mode=1)) except Exception as e: - print(e) - # 重连一次,继续发送 - self.reconnect().channel.basic_publish(exchange=self.exchange, - routing_key=self.routing_key, - body=text, - properties=pika.BasicProperties(content_type=content_type, - delivery_mode=1)) + print(e,file=sys.stderr) + if 'Channel is closed' in str(e): + self.reconnect() + # # 重连一次,继续发送 + # self.reconnect().channel.basic_publish(exchange=self.exchange, + # routing_key=self.routing_key, + # body=text, + # properties=pika.BasicProperties(content_type=content_type, + # delivery_mode=1)) def exit(self): self.connection.close() @@ -77,7 +79,9 @@ class task_creator(base_broker): # 通过channel,创建/使用一个queue。 queue = self.channel.queue_declare(self.queue_name, durable=True).method.queue - print(f'create/use queue:{queue}') + if queue == self.queue_name: + print(f'task queue {self.queue_name} => {queue}') + print(f'create/use queue:{host}:{port}/q_n:{queue}/r_k:{self.routing_key}\n') # 通过channel,创建/使用一个网关 # exchange_type: direct # passive: 只是检查其是否存在 @@ -158,14 +162,16 @@ class publisher(base_broker): properties=pika.BasicProperties(content_type=content_type, delivery_mode=1)) except Exception as e: - print(e) + print(f'pub exception:{str(e)}') # 重连一次,继续发送 - self.reconnect().channel.basic_publish(exchange=self.exchange, + try: + self.reconnect().channel.basic_publish(exchange=self.exchange, routing_key=routing_key, body=text, properties=pika.BasicProperties(content_type=content_type, delivery_mode=1)) - + except Exception as ex: + print(f're pub ex:{ex}') def exit(self): self.connection.close()