diff --git a/vnpy/amqp/README.md b/vnpy/amqp/README.md new file mode 100644 index 00000000..f0c05776 --- /dev/null +++ b/vnpy/amqp/README.md @@ -0,0 +1,70 @@ +代码源自余总得 https://github.com/yutiansut/QAPUBSUB/ + +**RabbitMQ and AMQP** + +RabbitMQ 是采用 Erlang 语言实现的 AMQP 协议的消息中间件, +最初起源于金融系统,用于在分布式系统中存储转发消息。 +RabbitMQ 发展到今天,被越来越多的人认可, +这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。 + +消息支持多种传递模式,详细见案例https://www.rabbitmq.com/getstarted.html +RabbitMQ 是一种典型的点对点模式,也可以通过设置交换器类型来实现发布订阅模式而达到广播消费的效果 + +**1、点对点** + + 发布者:只需要定义queue的名称 + 接收者:只需要定义queue的名称。 隐含上,auto_ack=True,即收到消息后,自动确认,就会从Queue中删除 + +**2、工作队列** + + (任务)发布者:指定queue名称,durable=True。发送任务时,发送到指定的queue,设置delivery_mode=2(持久化该任务) + (执行)接收者:指定queue名称,durable=True。执行任务后,才确认. ch.basic_ack(delivery_tag=method.delivery_tag + +**3、发布 / 订阅(Pub/Sub)模式** + + 发布者:创建channel时,指定Exchange名称,类型为fanout。 + 发布时,指定Exchange名称,无routing_key。 + 订阅者:创建channel时,指定Exchagne名称,类型为fanout。 + 创建动态queue名,绑定私有。 + 绑定channel与queue,指定Exchange和queue名。 + +**4、路由模式** + + 发布者:发布者:创建channel时,指定Exchange名称,类型为direct。 + 发布时,指定Exchange名称,打上route_key作为标签 + 订阅者:创建channel时,指定Exchagne名称,类型为direct。 + 创建动态queue名,绑定私有。 + 绑定channel与queue,指定Exchange、queue名和期望获取的标签 + 如订阅多个标签,绑定多次即可。 + +**5、主题模式** + + 发布者:创建channel时,指定Exchange名称,类型为topic。 + 发布时,指定Exchange名称,打上route_key作为标签 + 订阅者:创建channel时,指定Exchagne名称,类型为topic。 + 创建动态queue名,绑定私有。 + 绑定channel与queue,指定Exchange、queue名和期望获取的标签匹配符 + 如订阅多个标签匹配符,绑定多次即可。 + 匹配有两个关键字: + * 1~多个字母 + # 0~多个字母 + +**6、远程调用模式** + + 服务端(响应):创建channel时,指定queue,定义basic_qos为只有一个执行, + 定义消息的响应执行方法。 + 消息执行方法:执行完毕后,发布结果消息,使用指定routing_key标签为回复的queue + 推送属性包含参照ID,确认ACK. + 客户端(请求):创建2个queue,一个是接收执行结果的queue,并绑定执行结果回调响应,自动ack。 + 另一个是推送请求的queue,推送时,增加reply_to 和 参照ID。 + + +**consumer.py 提供两种消费者:** + + 1. 订阅模式下得subscriber + 2. 点对点/路由/主题模式下得subscriber_routing + +**producter.py,提供两种生产者:** + + 1. 订阅模式下得publisher + 2. 点对点/路由/主题模式下得publisher_routing diff --git a/vnpy/amqp/__init__.py b/vnpy/amqp/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/vnpy/amqp/amqp_arch.jpg b/vnpy/amqp/amqp_arch.jpg new file mode 100644 index 00000000..3597b3e4 Binary files /dev/null and b/vnpy/amqp/amqp_arch.jpg differ diff --git a/vnpy/amqp/base.py b/vnpy/amqp/base.py new file mode 100644 index 00000000..82d4ed4d --- /dev/null +++ b/vnpy/amqp/base.py @@ -0,0 +1,60 @@ +# encoding: UTF-8 + +import pika + +class base_broker(): + + def __init__(self, host='localhost', port=5672, user='guest', password='guest', + channel_number=1): + """ + + :param host: 连接rabbitmq的服务器地址(或者群集地址) + :param port: 端口 + :param user: 用户名 + :param password: 密码 + :param channel_number: 频道的数字(大于1) + """ + self.host = host + self.port = port + self.user = user + self.password = password + + self.channel_number = channel_number + + # 身份鉴权 + self.credentials = pika.PlainCredentials(self.user, self.password, erase_on_connect=True) + + # 创建连接 + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, port=self.port, + credentials=self.credentials, + heartbeat=0, socket_timeout=5, + ) + ) + + # 创建一个频道,或者指定频段数字编号 + self.channel = self.connection.channel( + channel_number=self.channel_number) + + def reconnect(self): + """ + 重新连接 + :return: + """ + try: + self.connection.close() + except: + pass + + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host, port=self.port, + credentials=self.credentials, + heartbeat=0, socket_timeout=5,)) + + self.channel = self.connection.channel( + channel_number=self.channel_number) + return self + + def close(self): + if self.connection: + self.connection.close() diff --git a/vnpy/amqp/consumer.py b/vnpy/amqp/consumer.py new file mode 100644 index 00000000..95204de0 --- /dev/null +++ b/vnpy/amqp/consumer.py @@ -0,0 +1,339 @@ +# encoding: UTF-8 + # 消息消费者类(集合) +import json +import pika +import random +import traceback +from vnpy.amqp.base import base_broker + +from threading import Thread + +######### 模式1:接收者 ######### +class receiver(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x', + queue='recv.{}'.format(random.randint(0, 1000000)), routing_key='default'): + super().__init__(host=host, port=port, user=user, + password=password) + + # 唯一匹配 + self.routing_key = routing_key + + self.queue = queue + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + passive=False, + durable=False, + auto_delete=False) + + self.queue = self.channel.queue_declare( + queue='', auto_delete=True, exclusive=True).method.queue + self.channel.queue_bind(queue=self.queue, exchange=exchange, + routing_key=self.routing_key) # 队列名采用服务端分配的临时队列 + # self.channel.basic_qos(prefetch_count=1) + + def callback(self, chan, method_frame, _header_frame, body, userdata=None): + print(1) + print(" [x] received: %r" % body) + + def subscribe(self): + # 消息接收 + self.channel.basic_consume(self.queue, self.callback, auto_ack=True) + self.channel.start_consuming() + + def start(self): + try: + self.subscribe() + except Exception as e: + print(e) + self.start() + +######### 模式2:(执行)接收者######### +class worker(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x_work_queue', + queue='task_queue', routing_key='default'): + super().__init__(host=host, port=port, user=user, password=password) + + # 唯一匹配 + self.routing_key = routing_key + self.exchange = exchange + + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + durable=True) + + self.queue = self.channel.queue_declare(queue=queue,durable=True).method.queue + print('worker use exchange:{},queue:{}'.format(exchange, self.queue)) + self.channel.queue_bind(queue=self.queue, exchange=exchange, + routing_key=self.routing_key) # 队列名采用服务端分配的临时队列 + + # 每个worker只执行一个 + self.channel.basic_qos(prefetch_count=1) + + def callback(self, chan, method_frame, _header_frame, body, userdata=None): + print(1) + print(" [x] received task: %r" % body) + chan.basic_ack(delivery_tag=method_frame.delivery_tag) + print(" [x] task finished ") + + def subscribe(self): + print('worker subscribed') + # 消息接收 + self.channel.basic_consume(self.queue, self.callback, auto_ack=False) + self.channel.start_consuming() + + def start(self): + print('worker start') + try: + self.subscribe() + except Exception as e: + print(str(e)) + traceback.print_exc() + +######### 模式3:发布 / 订阅(Pub/Sub)模式, 订阅者######### +class subscriber(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + exchange='x_fanout', queue='sub.{}'.format(random.randint(0, 1000000)), + routing_key='default'): + super().__init__(host=host, port=port, user=user, password=password) + self.queue = queue + self.channel.exchange_declare(exchange=exchange, + exchange_type='fanout', + passive=False, + durable=False, + auto_delete=False) + self.routing_key = routing_key + + # 队列名采用服务端分配的临时队列 + self.queue = self.channel.queue_declare( + queue='', auto_delete=True, exclusive=True).method.queue + self.channel.queue_bind(queue=self.queue, exchange=exchange, + routing_key=self.routing_key) + + # 缺省回调函数地址 + self.cb_func = self.callback + + def set_callback(self,cb_func): + self.cb_func = cb_func + + def callback(self, chan, method_frame, _header_frame, body, userdata=None): + print(1) + print(" [x] %r" % body) + + def subscribe(self): + self.channel.basic_consume(self.queue, self.cb_func, auto_ack=True) + self.channel.start_consuming() + # self.channel.basic_consume( + # self.callback, queue=self.queue_name, no_ack=True) # 消息接收 + + def start(self): + try: + self.subscribe() + except Exception as ex: + print('subscriber exception:{}'.format(str(ex))) + traceback.print_exc() + #self.start() + +######### 模式4: 路由模式 ######### +class subscriber_routing(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + exchange='x_direct', queue='sub_r.{}'.format(random.randint(0, 1000000)), routing_keys=['default']): + super().__init__(host=host, port=port, user=user, + password=password) + self.queue = queue + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + passive=False, + durable=False, + auto_delete=False) + # 队列名采用服务端分配的临时队列 + self.queue = self.channel.queue_declare( + queue='', auto_delete=True, exclusive=True).method.queue + + # 逐一绑定所有的routing 标签 + for routing_key in routing_keys: + self.channel.queue_bind(queue=self.queue, exchange=exchange, + routing_key=routing_key) + + def callback(self, chan, method_frame, _header_frame, body, userdata=None): + print(1) + print(" [x] %r" % body) + + def subscribe(self): + self.channel.basic_consume(self.queue, self.callback, auto_ack=True) + self.channel.start_consuming() + + def start(self): + try: + self.subscribe() + except Exception as e: + print(e) + self.start() + + +######### 模式5:主题模式 ######### +class subscriber_topic(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + exchange='x_topic', queue='sub_t.{}'.format(random.randint(0, 1000000)), routing_keys=['default']): + super().__init__(host=host, port=port, user=user, + password=password) + self.queue = queue + self.channel.exchange_declare(exchange=exchange, + exchange_type='topic', + passive=False, + durable=False, + auto_delete=False) + # 队列名采用服务端分配的临时队列 + self.queue = self.channel.queue_declare( + queue='', auto_delete=True, exclusive=True).method.queue + + # 逐一绑定所有的routing 标签 + for routing_key in routing_keys: + self.channel.queue_bind(queue=self.queue, exchange=exchange, + routing_key=routing_key) + + def callback(self, chan, method_frame, _header_frame, body, userdata=None): + print(1) + print(" [x] %r" % body) + + def subscribe(self): + self.channel.basic_consume(self.queue, self.callback, auto_ack=True) + self.channel.start_consuming() + + def start(self): + try: + self.subscribe() + except Exception as e: + print(e) + self.start() + +######### 模式6:RPC模式 (服务端) ######### +class rpc_server(base_broker): + # 接收: + # exchange: x_rpc + # queue: rpc_queue + # routing_key: gateway_name + # 发送执行结果: + # + def __init__(self, host='localhost', port=5672, user='admin', password='admin', exchange='x_rpc', + queue='rpc_queue', routing_key='default'): + super().__init__(host=host, port=port, user=user, + password=password) + + self.exchange = exchange + # 唯一匹配 + self.routing_key = routing_key + + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + passive=False, + durable=False, + auto_delete=False) + + # 队列名采用指定队列( 若建立多个server方式随机分配任务,使用指定queue) + self.queue = queue + self.channel.queue_declare(queue=self.queue, auto_delete=True) + + # 绑定 exchange->queue->channel,指定routing_key + self.channel.queue_bind(queue=self.queue, exchange=exchange, + routing_key=self.routing_key) + # 支持多个订阅 + self.channel.basic_qos(prefetch_count=1) + # method + self.method_dict = {} + + def register_method(self, method_name, func): + """登记方法名称与调用函数""" + self.method_dict.update({method_name: func}) + + def on_request(self, chan, method_frame, _header_frame, body, userdata=None): + """ + 响应rpc请求得处理函数 + :param chan: + :param method_frame: + :param _header_frame: + :param body: + :param userdata: + :return: + """ + if isinstance(body, bytes): + body = body.decode('utf-8') + if isinstance(body, str): + body = json.loads(body) + print(" [RPC Server] on_request: %r" % body) + # 判断body内容类型 + if not isinstance(body, dict): + print(u'请求不是dict格式') + resp_data = {'ret_code': -1, 'err_msg': u'请求不是dict格式'} + self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id, method_frame.delivery_tag) + return + + method = body.get('method', None) + params = body.get('params', {}) + if method is None or method not in self.method_dict: + print(u'请求方法:{}不在配置中'.format(method)) + resp_data = {'ret_code': -1, 'err_msg': u'请求方法:{}不在配置中'.format(method)} + self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id, + method_frame.delivery_tag) + return + + function = self.method_dict.get(method) + try: + ret = function(**params) + resp_data = {'ret_code': 0, 'data': ret} + self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id, + method_frame.delivery_tag) + except Exception as ex: + print(u'mq rpc server exception:{}'.format(str(ex))) + traceback.print_exc() + resp_data = {'ret_code': -1, 'err_msg': '执行异常:{}'.format(str(ex))} + self.reply(chan, resp_data, _header_frame.reply_to, _header_frame.correlation_id, + method_frame.delivery_tag) + + def reply(self, chan, reply_data, reply_to, reply_id, delivery_tag): + """返回调用结果""" + # data => string + reply_msg = json.dumps(reply_data) + # 发送返回消息 + chan.basic_publish(exchange=self.exchange, + routing_key=reply_to, + properties=pika.BasicProperties(content_type='application/json', + correlation_id=reply_id), + body=reply_msg) + # 确认标识 + chan.basic_ack(delivery_tag=delivery_tag) + + def subscribe(self): + print(' consuming queue:{}'.format(self.queue)) + # 绑定消息接收,指定当前queue得处理函数为on_request + self.channel.basic_consume(self.queue, self.on_request) + + # 进入死循环,不断接收 + self.channel.start_consuming() + + def start(self): + """启动接收""" + try: + self.subscribe() + except Exception as e: + print(e) + self.start() + +if __name__ == '__main__': + + import sys + if len(sys.argv) >=2: + print(sys.argv) + from time import sleep + c = subscriber(user='admin', password='admin') + + c.subscribe() + + while True: + sleep(1) + + diff --git a/vnpy/amqp/producer.py b/vnpy/amqp/producer.py new file mode 100644 index 00000000..3726cb06 --- /dev/null +++ b/vnpy/amqp/producer.py @@ -0,0 +1,391 @@ +# encoding: UTF-8 +# 消息生产者类(集合) +import sys +import json +import pika +import random +import traceback +from threading import Thread +from uuid import uuid1 +from vnpy.amqp.base import base_broker + +######### 模式1:发送者 ######### +class sender(base_broker): + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + exchange='x', queue_name='', routing_key='default'): + + # 通过基类,创建connection & channel + super().__init__(host, port, user, password, channel_number=1) + + self.exchange = exchange + self.queue_name = queue_name + self.routing_key = routing_key + + # 通过channel,创建/使用一个queue。 + # auto_delete: 当所有已绑定在queue的consumer不使用此queue时,自动删除此queue + # exclusive: private queue,它是True时,auto_delete也是True + self.channel.queue_declare(self.queue_name, auto_delete=True, exclusive=True) + + # 通过channel,创建/使用一个网关 + # exchange_type: direct 点对点 + # passive: 只是检查其是否存在 + # durable: 是否需要queue持久化消息。开启将降低性能至1/10 + # auto_delete: 当没有queue绑定时,就自动删除 + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + passive=False, + durable=False, + auto_delete=False) + + def pub(self, text): + # channel.basic_publish向队列中发送信息 + # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。 + # routing_key 指定向哪个队列中发送消息 + # body是要插入的内容, 字符串格式 + content_type = 'application/json' if isinstance(text, dict) else 'text/plain' + try: + self.channel.basic_publish(exchange=self.exchange, + routing_key=self.routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + except Exception as e: + print(e) + # 重连一次,继续发送 + self.reconnect().channel.basic_publish(exchange=self.exchange, + routing_key=self.routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + + def exit(self): + self.connection.close() + +######### 模式2:工作队列,任务发布者 ######### +class task_creator(base_broker): + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + channel_number=1, queue_name='task_queue', routing_key='default', + exchange='x_work_queue'): + + # 通过基类,创建connection & channel + super().__init__(host, port, user, password, channel_number) + + self.queue_name = queue_name + self.exchange = exchange + self.routing_key = routing_key + + # 通过channel,创建/使用一个queue。 + queue = self.channel.queue_declare(self.queue_name, durable=True).method.queue + print(u'create/use queue:{}') + # 通过channel,创建/使用一个网关 + # exchange_type: direct + # passive: 只是检查其是否存在 + # durable: 是否需要queue持久化消息。开启将降低性能至1/10 + # auto_delete: 当没有queue绑定时,就自动删除 + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + durable=True) + + def pub(self, text): + # channel.basic_publish向队列中发送信息 + # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。 + # routing_key 指定向哪个队列中发送消息 + # body是要插入的内容, 字符串格式 + content_type = 'application/json' if isinstance(text, dict) else 'text/plain' + try: + self.channel.basic_publish(exchange=self.exchange, + routing_key=self.routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=2)) + except Exception as e: + print(e) + # 重连一次,继续发送 + self.reconnect().channel.basic_publish(exchange=self.exchange, + routing_key=self.routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=2)) + + def exit(self): + self.connection.close() + +######### 3、发布 / 订阅(Pub/Sub)模式,发布者 ######### +class publisher(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + channel_number=1, queue_name='', routing_key='default', + exchange='x_fanout'): + + # 通过基类,创建connection & channel + super().__init__(host, port, user, password, channel_number) + + self.queue_name = queue_name + self.exchange = exchange + self.routing_key = routing_key + + # 通过channel,创建/使用一个queue。 + # auto_delete: 当所有已绑定在queue的consumer不使用此queue时,自动删除此queue + # exclusive: private queue,它是True时,auto_delete也是True + self.channel.queue_declare(self.queue_name, + auto_delete=True, exclusive=True) + + # 通过channel,创建/使用一个网关 + # exchange_type: fanout,1对多的广播式, topic: 主题匹配 + # passive: 只是检查其是否存在 + # durable: 是否需要queue持久化消息。开启将降低性能至1/10 + # auto_delete: 当没有queue绑定时,就自动删除 + self.channel.exchange_declare(exchange=exchange, + exchange_type='fanout', + passive=False, + durable=False, + auto_delete=False) + + def pub(self, text, routing_key=None): + # channel.basic_publish向队列中发送信息 + # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。 + # routing_key 指定向哪个队列中发送消息 + # body是要插入的内容, 字符串格式 + content_type = 'application/json' if isinstance(text, dict) else 'text/plain' + if routing_key is None: + routing_key = self.routing_key + try: + self.channel.basic_publish(exchange=self.exchange, + routing_key=routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + except Exception as e: + print(e) + # 重连一次,继续发送 + self.reconnect().channel.basic_publish(exchange=self.exchange, + routing_key=routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + + def exit(self): + self.connection.close() + +######### 4、路由模式:发布者 ######### +class publisher_routing(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + channel_number=1, queue_name='', routing_key='default', exchange='x_direct'): + super().__init__(host, port, user, password, channel_number) + + self.queue_name = queue_name + self.exchange = exchange + self.routing_key = routing_key + + self.channel.queue_declare( + self.queue_name, auto_delete=True, exclusive=True) + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + passive=False, + durable=False, + auto_delete=False) + + def pub(self, text, routing_key): + # channel.basic_publish向队列中发送信息 + # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。 + # routing_key 指定向哪个队列中发送消息 + # body是要插入的内容, 字符串格式 + content_type = 'application/json' if isinstance(text, dict) else 'text/plain' + try: + self.channel.basic_publish(exchange=self.exchange, + routing_key=routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + except Exception as e: + print(e) + self.reconnect().channel.basic_publish(exchange=self.exchange, + routing_key=routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + + def exit(self): + self.connection.close() + +######### 5、主题模式:发布者 ######### +class publisher_topic(base_broker): + + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + channel_number=1, queue_name='', routing_key='default', exchange='x_topic'): + super().__init__(host, port, user, password, channel_number) + + self.queue_name = queue_name + self.exchange = exchange + self.routing_key = routing_key + + self.channel.queue_declare( + self.queue_name, auto_delete=True, exclusive=True) + self.channel.exchange_declare(exchange=exchange, + exchange_type='topic', + passive=False, + durable=False, + auto_delete=False) + + def pub(self, text, routing_key): + # channel.basic_publish向队列中发送信息 + # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。 + # routing_key 指定向哪个队列中发送消息 + # body是要插入的内容, 字符串格式 + content_type = 'application/json' if isinstance(text, dict) else 'text/plain' + try: + self.channel.basic_publish(exchange=self.exchange, + routing_key=routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + except Exception as e: + print(e) + self.reconnect().channel.basic_publish(exchange=self.exchange, + routing_key=routing_key, + body=text, + properties=pika.BasicProperties(content_type=content_type, + delivery_mode=1)) + + def exit(self): + self.connection.close() + + +######### 6、RPC模式(调用者) ######### +class rpc_client(base_broker): + # 发送: + # exchange: x_rpc + # queue: rpc_queue + # 接收结果: + # exchange: x_rpc + # queue: 动态生成 cb_queue_name + # 执行结果回调: + # 暂时无用得 queue_name='rpc_queue', + def __init__(self, host='localhost', port=5672, user='admin', password='admin', + exchange='x_rpc', routing_key='default'): + + # 通过基类,创建connection & channel + super().__init__(host, port, user, password, channel_number=1) + + self.exchange = exchange + #self.queue_name = queue_name + self.routing_key = routing_key + + # 通过channel,创建/使用一个网关 + # exchange_type: direct 点对点 + # passive: 只是检查其是否存在 + # durable: 是否需要queue持久化消息。开启将降低性能至1/10 + # auto_delete: 当没有queue绑定时,就自动删除 + self.channel.exchange_declare(exchange=exchange, + exchange_type='direct', + passive=False, + durable=False, + auto_delete=False) + + # 创建/声明rpc结果消息队列 + result = self.channel.queue_declare(queue='', exclusive=True) + self.cb_queue_name = result.method.queue + print('call back queue name:{}'.format(self.cb_queue_name)) + + # 绑定 回调消息队列,exchange和channnel + self.channel.queue_bind(queue=self.cb_queue_name, exchange=exchange) + + # 绑定 回调消息队列得接受处理函数为on_respone + self.channel.basic_consume(queue=self.cb_queue_name, + on_message_callback=self.on_respone, + auto_ack=True) + # 回调函数字典 + self.cb_dict = {} + + self.thread = Thread(target=self.start) + self.thread.start() + + def on_respone(self, ch, method, props, body): + """ + 相应 cb_queue返回的结果处理信息 + :param ch: + :param method: + :param props: + :param body: + :return: + """ + if isinstance(body, bytes): + body = body.decode('utf-8') + if isinstance(body, str): + body = json.loads(body) + + cb = self.cb_dict.pop(props.correlation_id, None) + if cb: + try: + cb(body) + except Exception as ex: + print('on_respone exception when call cb.{}'.format(str(ex))) + traceback.print_exc() + + def call(self, req_text, correlation_id=None, cb_func=None): + """ + 远程调用 + :param req_text: 调用指令及内容 + :param cb_func: 回调函数地址 + :return: + """ + # channel.basic_publish向队列中发送信息 + # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。 + # routing_key 指定向哪个队列中发送消息 + # body是要插入的内容, 字符串格式 + # + content_type = 'application/json' if isinstance(req_text, dict) else 'text/plain' + if correlation_id is None: + correlation_id = str(uuid1()) + try: + print(u'sending request message, exchange:{},routing_key:{},body:{},reply_queue:{}' + .format(self.exchange, self.routing_key, req_text, self.cb_queue_name)) + self.channel.basic_publish(exchange=self.exchange, + routing_key=self.routing_key, + body=req_text, + properties=pika.BasicProperties(content_type=content_type, + reply_to=self.cb_queue_name, + correlation_id=correlation_id + )) + except Exception as e: + print(e) + # 重连一次,继续发送 + self.reconnect().channel.basic_publish(exchange=self.exchange, + routing_key=self.routing_key, + body=req_text, + properties=pika.BasicProperties(content_type=content_type, + reply_to=self.cb_queue_name, + correlation_id=correlation_id + )) + # 登记参照id和回调函数 + if cb_func: + self.cb_dict.update({correlation_id: cb_func}) + + def start(self): + try: + self.channel.start_consuming() + except Exception as ex: + print('rpc_client consuming exception:{}'.format(str(ex))) + traceback.print_exc() + self.channel.start_consuming() + + def exit(self): + try: + self.channel.stop_consuming() + self.channel.close() + self.connection.close() + if self.thread: + self.thread.join() + except: + pass + +if __name__ == '__main__': + import datetime + import time + p = publisher() + while True: + time.sleep(1) + msg = '{}'.format(datetime.datetime.now()) + print(msg) + p.pub(msg) diff --git a/vnpy/amqp/test01_receiver.py b/vnpy/amqp/test01_receiver.py new file mode 100644 index 00000000..e6d3f421 --- /dev/null +++ b/vnpy/amqp/test01_receiver.py @@ -0,0 +1,13 @@ +from vnpy.amqp.consumer import receiver + +if __name__ == '__main__': + + import sys + + from time import sleep + c = receiver(user='admin', password='admin') + + c.subscribe() + + while True: + sleep(1) diff --git a/vnpy/amqp/test01_sender.py b/vnpy/amqp/test01_sender.py new file mode 100644 index 00000000..66ed3504 --- /dev/null +++ b/vnpy/amqp/test01_sender.py @@ -0,0 +1,11 @@ +from vnpy.amqp.producer import sender + +if __name__ == '__main__': + import datetime + import time + p = sender() + while True: + time.sleep(1) + msg = '{}'.format(datetime.datetime.now()) + print(u'[x] send :{}'.format(msg)) + p.pub(msg) diff --git a/vnpy/amqp/test02_task.py b/vnpy/amqp/test02_task.py new file mode 100644 index 00000000..37365973 --- /dev/null +++ b/vnpy/amqp/test02_task.py @@ -0,0 +1,27 @@ +# encoding: UTF-8 + +from uuid import uuid1 +import json +from vnpy.amqp.producer import task_creator +from vnpy.trader.constant import Direction + +if __name__ == '__main__': + import datetime + import time + p = task_creator(host='192.168.0.202') + while True: + time.sleep(10) + mission = {} + mission.update({'id':str(uuid1())}) + mission.update({'templateName': u'TWAP 时间加权平均'}) + mission.update({'direction': Direction.LONG}) + mission.update({'vtSymbol': '518880'}) + mission.update({'is_stock': True}) + mission.update({'totalVolume': 300}) + mission.update({'target_price': 3.20}) + mission.update({'minVolume': 100}) + mission.update({'orderTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}) + msg = json.dumps(mission) + print(u'[x] create task :{}'.format(msg)) + p.pub(msg) + break diff --git a/vnpy/amqp/test02_woker.py b/vnpy/amqp/test02_woker.py new file mode 100644 index 00000000..b14c81a0 --- /dev/null +++ b/vnpy/amqp/test02_woker.py @@ -0,0 +1,13 @@ +# encoding: UTF-8 + +from vnpy.amqp.consumer import worker + +if __name__ == '__main__': + + import sys + + from time import sleep + c = worker(host='192.168.0.202', user='admin', password='admin') + c.start() + while True: + sleep(1) diff --git a/vnpy/amqp/test03_subscriber.py b/vnpy/amqp/test03_subscriber.py new file mode 100644 index 00000000..b33951fe --- /dev/null +++ b/vnpy/amqp/test03_subscriber.py @@ -0,0 +1,15 @@ +# encoding: UTF-8 + +from vnpy.amqp.consumer import subscriber + +if __name__ == '__main__': + + import sys + + from time import sleep + c = subscriber(user='admin', password='admin', exchange='x_fanout_md_tick') + + c.subscribe() + + while True: + sleep(1) diff --git a/vnpy/amqp/test06_rpc_client.py b/vnpy/amqp/test06_rpc_client.py new file mode 100644 index 00000000..432fe744 --- /dev/null +++ b/vnpy/amqp/test06_rpc_client.py @@ -0,0 +1,37 @@ +# encoding: UTF-8 + +from uuid import uuid1 +import json +import random +from vnpy.amqp.producer import rpc_client + +def cb_function(*args): + print('resp call back') + for arg in args: + print(u'{}'.format(arg)) + +if __name__ == '__main__': + import datetime + import time + c = rpc_client(host='localhost', user='admin', password='admin') + + counter = 0 + while True: + time.sleep(0.1) + mission = {'method': 'test_01'} + params = {} + params.update({'p2': random.random()}) + params.update({'p3': random.random()}) + params.update({'p1': counter}) + mission.update({'params': params}) + msg = json.dumps(mission) + print(u'[x] rpc call :{}'.format(msg)) + + c.call(msg,str(uuid1()), cb_function) + counter += 1 + + if counter > 100: + break + + print('exit') + c.exit() diff --git a/vnpy/amqp/test06_rpc_server.py b/vnpy/amqp/test06_rpc_server.py new file mode 100644 index 00000000..79863ccb --- /dev/null +++ b/vnpy/amqp/test06_rpc_server.py @@ -0,0 +1,63 @@ +import os, sys, copy +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) + +routing_key = 'default' + +from vnpy.amqp.consumer import rpc_server + +import argparse + +def test_func01(p1,p2,p3): + print('test_func01:', p1, p2, p3) + return p1+p2+p3 + +def test_func02(p1, p2=0): + print('test_func02:', p1, p2) + return str(p1 + p2) + +def get_strategy_names(): + print(u'{}'.format(routing_key)) + return ['stratege_name_01', 'strategy_name_02'] + + +if __name__ == '__main__': + + # 参数分析 + parser = argparse.ArgumentParser() + + parser.add_argument('-s', '--host', type=str, default='localhost', + help='rabbit mq host') + parser.add_argument('-p', '--port', type=int, default=5672, + help='rabbit mq port') + parser.add_argument('-U', '--user', type=str, default='admin', + help='rabbit mq user') + parser.add_argument('-P', '--password', type=str, default='admin', + help='rabbit mq password') + parser.add_argument('-x', '--exchange', type=str, default='exchange', + help='rabbit mq exchange') + parser.add_argument('-q', '--queue', type=str, default='queue', + help='rabbit mq queue') + parser.add_argument('-r', '--routing_key', type=str, default='default', + help='rabbit mq routing_key') + args = parser.parse_args() + + routing_key = copy.copy(args.routing_key) + + from time import sleep + s = rpc_server(host=args.host, + port=args.port, + user=args.user, + password=args.password, + exchange=args.exchange, + queue=args.queue, + routing_key=args.routing_key) + + s.register_method('test_01', test_func01) + s.register_method('test_02', test_func02) + s.register_method('get_strategy_names', get_strategy_names) + + s.start() + while True: + sleep(1) diff --git a/vnpy/amqp/test07_rpc_client.py b/vnpy/amqp/test07_rpc_client.py new file mode 100644 index 00000000..e414c192 --- /dev/null +++ b/vnpy/amqp/test07_rpc_client.py @@ -0,0 +1,61 @@ +import os, sys +# 将repostory的目录i,作为根目录,添加到系统环境中。 +ROOT_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) +sys.path.append(ROOT_PATH) + +from vnpy.amqp.producer import rpc_client +from uuid import uuid1 +import json +import random +import argparse + +def cb_function(*args): + print('resp call back') + for arg in args: + if isinstance(arg,bytes): + print(u'{}'.format(arg.decode('utf-8'))) + else: + print(u'{}'.format(arg)) + +from vnpy.trader.vtConstant import * +if __name__ == '__main__': + # 参数分析 + parser = argparse.ArgumentParser() + + parser.add_argument('-s', '--host', type=str, default='localhost', + help='rabbit mq host') + parser.add_argument('-p', '--port', type=int, default=5672, + help='rabbit mq port') + parser.add_argument('-U', '--user', type=str, default='admin', + help='rabbit mq user') + parser.add_argument('-P', '--password', type=str, default='admin', + help='rabbit mq password') + parser.add_argument('-x', '--exchange', type=str, default='exchange', + help='rabbit mq exchange') + parser.add_argument('-q', '--queue', type=str, default='queue', + help='rabbit mq queue') + parser.add_argument('-r', '--routing_key', type=str, default='default', + help='rabbit mq routing_key') + args = parser.parse_args() + + import datetime + import time + + c = rpc_client(host=args.host, port=args.port, user=args.user, password=args.password, exchange=args.exchange, queue_name=args.queue, routing_key=args.routing_key) + + counter = 0 + while True: + time.sleep(10) + mission = {'method': 'get_strategy_names'} + params = {} + mission.update({'params': params}) + msg = json.dumps(mission) + print(u'[x] rpc call :{}'.format(msg)) + c.call(msg, str(uuid1()), cb_function) + counter += 1 + + if counter > 1: + break + + print('exit') + c.exit()