diff --git a/Client.py b/Client.py index 52d0c67..42560f2 100644 --- a/Client.py +++ b/Client.py @@ -9,14 +9,12 @@ class BlockingClient(object): logic of a queue. args: publish_queue : (str) - default exchange is used so this is the queue name - callback_queue : (str) - queue to check for response host : (str) - host of the rabbit instance timeout : (int) - timeout in seconds ''' def __init__(self, publish_queue='publish_lost', - callback_queue='callback_lost', host='rabbit', exclusive=False, timeout=60, @@ -27,8 +25,6 @@ def __init__(self, self.host = host self.exclusive = exclusive self.publish_queue = publish_queue - # callback_queue is the queue that is checked for replies - self.callback_queue = callback_queue # Basic rabbit connection params = pika.ConnectionParameters(host=self.host) @@ -37,10 +33,11 @@ def __init__(self, # publish_queue is used to forward to the correct queue, # since the default exchange is used - result = self.channel.queue_declare(self.callback_queue, + result = self.channel.queue_declare('amq.rabbitmq.reply-to', exclusive=self.exclusive) - self.channel.basic_consume(queue=self.callback_queue, + self.channel.basic_consume(queue='amq.rabbitmq.reply-to', + auto_ack=True, on_message_callback=self._on_response) return super().__init__(*args, **kwargs) @@ -51,8 +48,8 @@ def _on_response(self, ch, method, props, body): Internal function used to read the response. Not to be ever used explicitly. ''' - print("self is {} and props is {}".format(self.corr_id, - props.correlation_id)) + # print("self is {} and props is {}".format(self.corr_id, + # props.correlation_id)) if self.corr_id == props.correlation_id: self.response = body @@ -64,7 +61,7 @@ def call(self, message="Hello World!"): self.response = None self.corr_id = str(uuid.uuid4()) props = pika.BasicProperties( - reply_to=self.callback_queue, + reply_to='amq.rabbitmq.reply-to', correlation_id=self.corr_id, ) self.channel.basic_publish(exchange='', @@ -79,5 +76,6 @@ def call(self, message="Hello World!"): break time.sleep(0.125) else: + print("MISSED: {}".format(self.corr_id)) return "Connection timeout - 504" return self.response diff --git a/README.md b/README.md index ab3e891..9300ae6 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ This is a repo to accompany this [SO post.](https://stackoverflow.com/questions/ - pika==1.0.1 - rabbitmq:3.7.14 (run via [docker](https://hub.docker.com/_/rabbitmq)) +**NOTE!** Check the Update section at the bottom of this document for a list of things that have changed and why. + ### Code Goals Explained I followed the official [RabbitMQ Documentation](https://www.rabbitmq.com/tutorials/tutorial-six-python.html) as to how to get started with RabbitMQ and Pika. I wanted to create reuseable code that uses RPC logic with multiple Clients and Servers. The general idea is the following: Multiple Clienrs can publish messages to an exchange (the default is used for now) and only one Server picks it up, processes it and sends a response to a callback queue. The Clients meanwhile looks for responses at the same callback queue. The response is read only when the correlation_id is identified. @@ -14,7 +16,7 @@ I followed the official [RabbitMQ Documentation](https://www.rabbitmq.com/tutori Everything runs smoothly when a single Client is used. It does not matter if one or more (I have tried up until 4) Servers are used. The behavior is as expected. Messages are published, consumed once and a response is sent. -### When the code fails +### When the code fails (UPDATE: details at the end of this document) The problem occurs when (even with a single Server) more than one Clients are used. The easiest way to reproduce the issue is with 2 Clients and 1 Server. When one Client publishes a message, the Server picks it up and processes it. When the 2nd Client connects (it is actually the ```channel.basic_consume()``` command that breaks everything) then timeouts start to occur, even though the Server processes the messages correctly. @@ -67,3 +69,15 @@ while True: connection.process_data_events() ``` As the comment suggests, the `channel.basic_consume()` is when the trouble begins. +___ +___ +### UPDATE +After investigating the [direct reply-to](https://www.rabbitmq.com/direct-reply-to.html) functionality of RabbitMQ I removed the explicit decleration of a `callback_queue` and instead now always use the `amq.rabbitmq.reply-to` "queue". Necessary changes were made to both Client and Server classes and the problems described above seem to be resolved. **I do not however understand why this works and the previous logic failed to produce the results I expected.** + +### Possible Future Work +Transform this project into a robust RPC example with a detailed documentation explaining how everything works and why some decisions were made. + +### NonBlocking Connection? +Now that this version is working some other issues were discovered, regarding the stability of the system. I now understand that a Blocking Connection may not be the best choice, but it is a solid start that helps properly graps the basic concepts. I would love to either expand this example to a RPC system with reconnect capabilities and/or create a NonBlocking Connection robust example. + +#### Feel free to comment on these thoughts and changes, all feedback is greatly appreciated. \ No newline at end of file diff --git a/Server.py b/Server.py index 411a294..fa73097 100644 --- a/Server.py +++ b/Server.py @@ -5,7 +5,6 @@ class BlockingServer(object): def __init__(self, host='rabbit', consume_queue='consume_lost', - callback_queue='callback_lost', timeout=60, prefetch_count=1, process=None, @@ -17,7 +16,6 @@ def __init__(self, self.timeout = timeout self.prefetch_count = prefetch_count - self.callback_queue = callback_queue self.process = process params = pika.ConnectionParameters( @@ -40,7 +38,7 @@ def _on_request(self, ch, method, props, body): properties = pika.BasicProperties(correlation_id=props.correlation_id) print(props.correlation_id) ch.basic_publish(exchange='', - routing_key=self.callback_queue, + routing_key=props.reply_to, properties=properties, body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) @@ -51,6 +49,7 @@ def consume(self, queue=''): print(" [x] Awaiting RPC requests") self.channel.start_consuming() + # Example process function def fib(n): n = int(n)