Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix maintaining pipeline when using AMQP #2533

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

kamil-certat
Copy link
Contributor

If RabbitMQ dropped the connection, pika can emit the StreamLostError which can be gracefully handled by reconnection attempt. In addition, consuming on BlockingConnection without the timeout can block internal maintenance operations, like sending heartbeats [1].

[1] https://pika.readthedocs.io/en/1.2.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.consume

If RabbitMQ droped the connection, pika can emit the StreamLostError
which can be gracefully handled by reconnection attempt. In addition,
consuming on BlockingConnection without the timeout can block internal
maintanence operations, like sending heartbeats [1].

[1] https://pika.readthedocs.io/en/1.2.0/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingChannel.consume
@sebix sebix added this to the 3.3.2 Bugfix release milestone Nov 4, 2024
@sebix sebix added bug Indicates an unexpected problem or unintended behavior component: core labels Nov 4, 2024
Copy link
Member

@sebix sebix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't test now if the code changes themselves but they all look reasonable.

Comment on lines +611 to +614
if reconnect and (
isinstance(exc, pika.exceptions.ConnectionClosed) or
isinstance(exc, pika.exceptions.StreamLostError)
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if reconnect and (
isinstance(exc, pika.exceptions.ConnectionClosed) or
isinstance(exc, pika.exceptions.StreamLostError)
):
if reconnect and isinstance(exc, (pika.exceptions.ConnectionClosed, pika.exceptions.StreamLostError)):

Comment on lines +654 to +656
method, _, body = next(
self.channel.consume(self.source_queue, inactivity_timeout=self.heartbeat / 2)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add an inline comment on why there's a need for a loop and why you chose heartbeat/2? That will make our lives easier when reading the code later on.

@@ -12,6 +12,7 @@
### Configuration

### Core
- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Fix maintaining pipeline connection when using AMQP (PR# by Kamil Mankowski).
- AMQP: Fix maintaining pipeline connection when during interrupted connections (PR#2533 by Kamil Mankowski).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Indicates an unexpected problem or unintended behavior component: core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants