From 712be82dca79fcd3ddf47e8a763047beef4fcf32 Mon Sep 17 00:00:00 2001 From: Iman Tumorang Date: Sun, 16 Jun 2024 13:46:19 +0300 Subject: [PATCH] fix: add custom RMQ option (#15) --- README.md | 12 +-- examples/rabbitmq/main.go | 12 +-- .../rabbitmq/blackbox_consumer_test.go | 12 +-- internal/consumer/rabbitmq/consumer.go | 23 ++-- options/consumer/consumer.go | 101 ++++++++++++++---- 5 files changed, 109 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index a2d7d91..cf13a1f 100644 --- a/README.md +++ b/README.md @@ -92,10 +92,12 @@ func main() { defer consumerChannel.Close() rmqConsumer := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, - consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ - ConsumerChannel: consumerChannel, - ReQueueChannel: publisherChannel, - }), + consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern( + consumerChannel, + publisherChannel, + "goqueue", // exchange name + []string{"goqueue.payments.#"}, // routing keys pattern + )), consumerOpts.WithConsumerID("consumer_id"), consumerOpts.WithMiddlewares( middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(), @@ -103,8 +105,6 @@ func main() { ), consumerOpts.WithMaxRetryFailedMessage(3), consumerOpts.WithBatchMessageSize(1), - consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"), - consumerOpts.WithTopicName("goqueue"), consumerOpts.WithQueueName("consumer_queue"), ) diff --git a/examples/rabbitmq/main.go b/examples/rabbitmq/main.go index fe8485f..1ccfa2f 100644 --- a/examples/rabbitmq/main.go +++ b/examples/rabbitmq/main.go @@ -65,10 +65,12 @@ func main() { defer consumerChannel.Close() rmqConsumer := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, - consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ - ConsumerChannel: consumerChannel, - ReQueueChannel: publisherChannel, - }), + consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern( + consumerChannel, + publisherChannel, + "goqueue", // exchange name + []string{"goqueue.payments.#"}, // routing keys pattern + )), consumerOpts.WithConsumerID("consumer_id"), consumerOpts.WithMiddlewares( middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(), @@ -76,8 +78,6 @@ func main() { ), consumerOpts.WithMaxRetryFailedMessage(3), consumerOpts.WithBatchMessageSize(1), - consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"), - consumerOpts.WithTopicName("goqueue"), consumerOpts.WithQueueName("consumer_queue"), ) diff --git a/internal/consumer/rabbitmq/blackbox_consumer_test.go b/internal/consumer/rabbitmq/blackbox_consumer_test.go index 2e8e888..815cec0 100644 --- a/internal/consumer/rabbitmq/blackbox_consumer_test.go +++ b/internal/consumer/rabbitmq/blackbox_consumer_test.go @@ -192,15 +192,15 @@ func (s *rabbitMQTestSuite) TestConsumerWithoutExchangePatternProvided() { func (s *rabbitMQTestSuite) TestConsumerWithExchangePatternProvided() { s.seedPublish(string(headerVal.ContentTypeJSON), testAction) rmqSubs := rmq.NewConsumer( - consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ - ConsumerChannel: s.consumerChannel, - ReQueueChannel: s.publishChannel, - }), + consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern( + s.consumerChannel, + s.publishChannel, + testExchange, + []string{"goqueue.action.#"}, + )), consumerOpts.WithBatchMessageSize(1), consumerOpts.WithMiddlewares(middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler()), consumerOpts.WithQueueName(rabbitMQTestQueueName), - consumerOpts.WithActionsPatternSubscribed("goqueue.action.#"), //exchange pattern provided in constructor - consumerOpts.WithTopicName(testExchange), //exchange name provided in constructor ) msgHandler := handler(s.T(), s.getMockData(testAction)) diff --git a/internal/consumer/rabbitmq/consumer.go b/internal/consumer/rabbitmq/consumer.go index 20ef869..995274b 100644 --- a/internal/consumer/rabbitmq/consumer.go +++ b/internal/consumer/rabbitmq/consumer.go @@ -41,9 +41,11 @@ func NewConsumer( requeueChannel: opt.RabbitMQConsumerConfig.ReQueueChannel, option: opt, } - if len(opt.ActionsPatternSubscribed) > 0 && opt.TopicName != "" { + if opt.RabbitMQConsumerConfig.QueueDeclareConfig != nil && + opt.RabbitMQConsumerConfig.QueueBindConfig != nil { rmqHandler.initQueue() } + rmqHandler.initConsumer() return rmqHandler } @@ -54,26 +56,25 @@ func (r *rabbitMQ) initQueue() { // Returns an instance of amqp.Queue and any error encountered. // Please refer to the RabbitMQ documentation for more information on the parameters. // https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare - // (from bxcodec: please raise a PR if you need a custom queue argument) _, err := r.consumerChannel.QueueDeclare( r.option.QueueName, - true, - false, - false, - false, - nil, + r.option.RabbitMQConsumerConfig.QueueDeclareConfig.Durable, + r.option.RabbitMQConsumerConfig.QueueDeclareConfig.AutoDelete, + r.option.RabbitMQConsumerConfig.QueueDeclareConfig.Exclusive, + r.option.RabbitMQConsumerConfig.QueueDeclareConfig.NoWait, + r.option.RabbitMQConsumerConfig.QueueDeclareConfig.Args, ) if err != nil { logrus.Fatal("error declaring the queue, ", err) } - for _, eventType := range r.option.ActionsPatternSubscribed { + for _, eventType := range r.option.RabbitMQConsumerConfig.QueueBindConfig.RoutingKeys { err = r.consumerChannel.QueueBind( r.option.QueueName, eventType, - r.option.TopicName, - false, - nil, + r.option.RabbitMQConsumerConfig.QueueBindConfig.ExchangeName, + r.option.RabbitMQConsumerConfig.QueueBindConfig.NoWait, + r.option.RabbitMQConsumerConfig.QueueBindConfig.Args, ) if err != nil { logrus.Fatal("error binding the queue, ", err) diff --git a/options/consumer/consumer.go b/options/consumer/consumer.go index e7414a4..614cb0d 100644 --- a/options/consumer/consumer.go +++ b/options/consumer/consumer.go @@ -23,12 +23,6 @@ type ConsumerOption struct { // Middlewares is a list of middleware functions to be applied to the inbound message handler. Middlewares []interfaces.InboundMessageHandlerMiddlewareFunc - // ActionsPatternSubscribed specifies the list of action patterns that the consumer is subscribed to. - ActionsPatternSubscribed []string - - // TopicName specifies the name of the topic to consume messages from. - TopicName string - // MaxRetryFailedMessage specifies the maximum number of times a failed message should be retried. MaxRetryFailedMessage int64 @@ -68,22 +62,6 @@ func WithMiddlewares(middlewares ...interfaces.InboundMessageHandlerMiddlewareFu } } -// WithActionsPatternSubscribed sets the actions that the consumer will subscribe to. -// It takes a variadic parameter `actions` which represents the actions to be subscribed. -// The actions are stored in the `ActionsPatternSubscribed` field of the `ConsumerOption` struct. -func WithActionsPatternSubscribed(actions ...string) ConsumerOptionFunc { - return func(opt *ConsumerOption) { - opt.ActionsPatternSubscribed = actions - } -} - -// WithTopicName sets the topic name for the consumer option. -func WithTopicName(name string) ConsumerOptionFunc { - return func(opt *ConsumerOption) { - opt.TopicName = name - } -} - // WithMaxRetryFailedMessage sets the maximum number of retries for failed messages. // It takes an integer parameter 'n' and returns an ConsumerOptionFunc. // The ConsumerOptionFunc updates the 'MaxRetryFailedMessage' field of the ConsumerOption struct. @@ -122,6 +100,54 @@ type RabbitMQConsumerConfig struct { ConsumerChannel *amqp.Channel // ReQueueChannel is the channel used for re-queuing messages in RabbitMQ. ReQueueChannel *amqp.Channel + + // QueueDeclareConfig specifies the configuration for declaring a RabbitMQ queue. + QueueDeclareConfig *RabbitMQQueueDeclareConfig + + // QueueBindConfig specifies the configuration for binding a queue to an exchange in RabbitMQ. + QueueBindConfig *RabbitMQQueueBindConfig +} + +// RabbitMQQueueDeclareConfig represents the configuration options for declaring a RabbitMQ queue. +// * Durable and Non-Auto-Deleted queues will survive server restarts and remain +// when there are no remaining consumers or bindings. Persistent publishings will +// be restored in this queue on server restart. These queues are only able to be +// bound to durable exchanges. +// * Non-Durable and Auto-Deleted queues will not be redeclared on server restart +// and will be deleted by the server after a short time when the last consumer is +// canceled or the last consumer's channel is closed. Queues with this lifetime +// can also be deleted normally with QueueDelete. These durable queues can only +// be bound to non-durable exchanges. +// * Non-Durable and Non-Auto-Deleted queues will remain declared as long as the +// server is running regardless of how many consumers. This lifetime is useful +// for temporary topologies that may have long delays between consumer activity. +// These queues can only be bound to non-durable exchanges. +// * Durable and Auto-Deleted queues will be restored on server restart, but without +// active consumers will not survive and be removed. This Lifetime is unlikely +// to be useful. +// * Exclusive queues are only accessible by the connection that declares them and +// will be deleted when the connection closes. Channels on other connections +// will receive an error when attempting to declare, bind, consume, purge or +// delete a queue with the same name. +// * When noWait is true, the queue will assume to be declared on the server. A +// channel exception will arrive if the conditions are met for existing queues +// or attempting to modify an existing queue from a different connection. +// When the error return value is not nil, you can assume the queue could not be +// declared with these parameters, and the channel will be closed. +type RabbitMQQueueDeclareConfig struct { + Durable bool // Whether the queue should survive a broker restart. + AutoDelete bool // Whether the queue should be deleted when there are no more consumers. + Exclusive bool // Whether the queue should be exclusive to the connection that declares it. + NoWait bool // Whether to wait for a response from the server after declaring the queue. + Args amqp.Table // Additional arguments to be passed when declaring the queue. +} + +// RabbitMQQueueBindConfig represents the configuration options for binding a queue to an exchange in RabbitMQ. +type RabbitMQQueueBindConfig struct { + RoutingKeys []string // The routing key to use for the binding. + ExchangeName string // The name of the exchange to bind to. + NoWait bool // Whether to wait for a response from the server. + Args amqp.Table // Additional arguments for the binding. } const ( @@ -129,3 +155,34 @@ const ( ConsumerPlatformGooglePubSub = options.PlatformGooglePubSub ConsumerPlatformSQS = options.PlatformSQS ) + +// RabbitMQConfigWithDefaultTopicFanOutPattern returns a RabbitMQConsumerConfig with default configuration for topic fanout pattern. +// It takes the queueName, exchangeName, and routingKeys as parameters and returns a pointer to RabbitMQConsumerConfig. +// The default configuration includes a durable queue that is not auto-deleted, exclusive, and no-wait. +// The queue is bound to the exchange with the provided routing keys. +// the exchange is must be a fanout exchange. The exchange must be declared before using this configuration. +// Read more about fanout exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-fanout +// +// the routing keys can be in pattern format. +// e.g. "a.*.b.#" will match "a.b", "a.c.b", "a.c.d.b", etc. +// For more information on pattern matching, see https://www.rabbitmq.com/tutorials/tutorial-five-go.html +func RabbitMQConfigWithDefaultTopicFanOutPattern(consumerChannel, publisherChannel *amqp.Channel, + exchangeName string, routingKeys []string) *RabbitMQConsumerConfig { + return &RabbitMQConsumerConfig{ + ConsumerChannel: consumerChannel, + ReQueueChannel: publisherChannel, + QueueDeclareConfig: &RabbitMQQueueDeclareConfig{ + Durable: true, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: nil, + }, + QueueBindConfig: &RabbitMQQueueBindConfig{ + RoutingKeys: routingKeys, + ExchangeName: exchangeName, + NoWait: false, + Args: nil, + }, + } +}