Skip to content

Commit

Permalink
Update to rabbitmq:3.12.13 & remove pika redundant code, in favour …
Browse files Browse the repository at this point in the history
…of `rabbitmq-definitions.json` file
  • Loading branch information
pantunes committed Feb 22, 2024
1 parent d0f0d0c commit 6629590
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 15 deletions.
2 changes: 1 addition & 1 deletion compose/rabbitmq/rabbitmq-definitions.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"vhost": "/",
"destination": "trades",
"destination_type": "queue",
"routing_key": "trades",
"routing_key": "trades.all",
"arguments": {}
},
{
Expand Down
16 changes: 5 additions & 11 deletions exchange_radar/producer/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
ConnectionClosedByBroker,
StreamLostError,
)
from pika.exchange_type import ExchangeType

from exchange_radar.producer.serializers.base import BaseSerializer
from exchange_radar.producer.settings import base as settings
Expand Down Expand Up @@ -44,26 +43,21 @@ def get_connection(self) -> pika.BlockingConnection:

return self.connection

def get_channel(self, queue_name) -> BlockingChannel:
def get_channel(self) -> BlockingChannel:
if self.channel and self.channel.is_open:
logger.info("Reusing channel...")
return self.channel

self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange=settings.RABBITMQ_EXCHANGE, exchange_type=ExchangeType.fanout, durable=True
)

return self.channel


class ProducerChannel:
def __init__(self, queue_name):
def __init__(self):
self.connection = producer_connection.get_connection()
self.queue_name = queue_name

def __enter__(self) -> BlockingChannel:
return producer_connection.get_channel(self.queue_name)
return producer_connection.get_channel()

def __exit__(self, exc_type, exc_val, exc_tb):
pass
Expand All @@ -83,15 +77,15 @@ def publish(data: BaseSerializer) -> None:
body = data.model_dump_json().encode()

try:
with ProducerChannel(queue_name=settings.RABBITMQ_TRADES_ROUTING_KEY) as channel:
with ProducerChannel() as channel:
channel.basic_publish(routing_key=settings.RABBITMQ_TRADES_ROUTING_KEY, body=body, **params)

try:
queue_name = ROUTING_KEYS[get_ranking(data)]
except KeyError:
logger.info("No specific extra Queue")
else:
with ProducerChannel(queue_name=queue_name) as channel:
with ProducerChannel() as channel:
channel.basic_publish(routing_key=queue_name, body=body, **params)
except (
StreamLostError,
Expand Down
2 changes: 1 addition & 1 deletion exchange_radar/producer/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
RABBITMQ_CONNECTION_HEARTBEAT = env.int("RABBITMQ_CONNECTION_HEARTBEAT")
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT = env.int("RABBITMQ_BLOCKED_CONNECTION_TIMEOUT")

RABBITMQ_TRADES_ROUTING_KEY = "trades"
RABBITMQ_TRADES_ROUTING_KEY = "trades.all"
RABBITMQ_TRADES_WHALES_ROUTING_KEY = "trades.whales"
RABBITMQ_TRADES_DOLPHIN_ROUTING_KEY = "trades.dolphins"
RABBITMQ_TRADES_OCTOPUS_ROUTING_KEY = "trades.octopuses"
Expand Down
2 changes: 1 addition & 1 deletion local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
command: /start

rabbitmq:
image: rabbitmq:3.11.15-management-alpine
image: rabbitmq:3.12.13-management-alpine
container_name: exchange-radar-rabbitmq
env_file:
- ./.envs/.local/.rabbitmq
Expand Down
2 changes: 1 addition & 1 deletion production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ services:
command: /start

rabbitmq:
image: rabbitmq:3.11.15-management-alpine
image: rabbitmq:3.12.13-management-alpine
container_name: exchange-radar-rabbitmq
env_file:
- ./.envs/.production/.rabbitmq
Expand Down

0 comments on commit 6629590

Please sign in to comment.