Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Applied direct reply-to logic - next steps and feedback #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions Client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ class BlockingClient(object):
logic of a queue.
args:
publish_queue : (str) - default exchange is used so this is the queue name
callback_queue : (str) - queue to check for response
host : (str) - host of the rabbit instance
timeout : (int) - timeout in seconds
'''

def __init__(self,
publish_queue='publish_lost',
callback_queue='callback_lost',
host='rabbit',
exclusive=False,
timeout=60,
Expand All @@ -27,8 +25,6 @@ def __init__(self,
self.host = host
self.exclusive = exclusive
self.publish_queue = publish_queue
# callback_queue is the queue that is checked for replies
self.callback_queue = callback_queue

# Basic rabbit connection
params = pika.ConnectionParameters(host=self.host)
Expand All @@ -37,10 +33,11 @@ def __init__(self,

# publish_queue is used to forward to the correct queue,
# since the default exchange is used
result = self.channel.queue_declare(self.callback_queue,
result = self.channel.queue_declare('amq.rabbitmq.reply-to',
exclusive=self.exclusive)

self.channel.basic_consume(queue=self.callback_queue,
self.channel.basic_consume(queue='amq.rabbitmq.reply-to',
auto_ack=True,
on_message_callback=self._on_response)

return super().__init__(*args, **kwargs)
Expand All @@ -51,8 +48,8 @@ def _on_response(self, ch, method, props, body):
Internal function used to read the response.
Not to be ever used explicitly.
'''
print("self is {} and props is {}".format(self.corr_id,
props.correlation_id))
# print("self is {} and props is {}".format(self.corr_id,
# props.correlation_id))
if self.corr_id == props.correlation_id:
self.response = body

Expand All @@ -64,7 +61,7 @@ def call(self, message="Hello World!"):
self.response = None
self.corr_id = str(uuid.uuid4())
props = pika.BasicProperties(
reply_to=self.callback_queue,
reply_to='amq.rabbitmq.reply-to',
correlation_id=self.corr_id,
)
self.channel.basic_publish(exchange='',
Expand All @@ -79,5 +76,6 @@ def call(self, message="Hello World!"):
break
time.sleep(0.125)
else:
print("MISSED: {}".format(self.corr_id))
return "Connection timeout - 504"
return self.response
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This is a repo to accompany this [SO post.](https://stackoverflow.com/questions/
- pika==1.0.1
- rabbitmq:3.7.14 (run via [docker](https://hub.docker.com/_/rabbitmq))

**NOTE!** Check the Update section at the bottom of this document for a list of things that have changed and why.

### Code Goals Explained

I followed the official [RabbitMQ Documentation](https://www.rabbitmq.com/tutorials/tutorial-six-python.html) as to how to get started with RabbitMQ and Pika. I wanted to create reuseable code that uses RPC logic with multiple Clients and Servers. The general idea is the following: Multiple Clienrs can publish messages to an exchange (the default is used for now) and only one Server picks it up, processes it and sends a response to a callback queue. The Clients meanwhile looks for responses at the same callback queue. The response is read only when the correlation_id is identified.
Expand All @@ -14,7 +16,7 @@ I followed the official [RabbitMQ Documentation](https://www.rabbitmq.com/tutori

Everything runs smoothly when a single Client is used. It does not matter if one or more (I have tried up until 4) Servers are used. The behavior is as expected. Messages are published, consumed once and a response is sent.

### When the code fails
### When the code fails (UPDATE: details at the end of this document)

The problem occurs when (even with a single Server) more than one Clients are used. The easiest way to reproduce the issue is with 2 Clients and 1 Server. When one Client publishes a message, the Server picks it up and processes it. When the 2nd Client connects (it is actually the ```channel.basic_consume()``` command that breaks everything) then timeouts start to occur, even though the Server processes the messages correctly.

Expand Down Expand Up @@ -67,3 +69,15 @@ while True:
connection.process_data_events()
```
As the comment suggests, the `channel.basic_consume()` is when the trouble begins.
___
___
### UPDATE
After investigating the [direct reply-to](https://www.rabbitmq.com/direct-reply-to.html) functionality of RabbitMQ I removed the explicit decleration of a `callback_queue` and instead now always use the `amq.rabbitmq.reply-to` "queue". Necessary changes were made to both Client and Server classes and the problems described above seem to be resolved. **I do not however understand why this works and the previous logic failed to produce the results I expected.**

### Possible Future Work
Transform this project into a robust RPC example with a detailed documentation explaining how everything works and why some decisions were made.

### NonBlocking Connection?
Now that this version is working some other issues were discovered, regarding the stability of the system. I now understand that a Blocking Connection may not be the best choice, but it is a solid start that helps properly graps the basic concepts. I would love to either expand this example to a RPC system with reconnect capabilities and/or create a NonBlocking Connection robust example.

#### Feel free to comment on these thoughts and changes, all feedback is greatly appreciated.
5 changes: 2 additions & 3 deletions Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ class BlockingServer(object):
def __init__(self,
host='rabbit',
consume_queue='consume_lost',
callback_queue='callback_lost',
timeout=60,
prefetch_count=1,
process=None,
Expand All @@ -17,7 +16,6 @@ def __init__(self,
self.timeout = timeout
self.prefetch_count = prefetch_count

self.callback_queue = callback_queue
self.process = process

params = pika.ConnectionParameters(
Expand All @@ -40,7 +38,7 @@ def _on_request(self, ch, method, props, body):
properties = pika.BasicProperties(correlation_id=props.correlation_id)
print(props.correlation_id)
ch.basic_publish(exchange='',
routing_key=self.callback_queue,
routing_key=props.reply_to,
properties=properties,
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
Expand All @@ -51,6 +49,7 @@ def consume(self, queue=''):
print(" [x] Awaiting RPC requests")
self.channel.start_consuming()


# Example process function
def fib(n):
n = int(n)
Expand Down