Skip to content

Commit

Permalink
fix: add custom RMQ option (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec authored Jun 16, 2024
1 parent e24518b commit 712be82
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 51 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,19 @@ func main() {
defer consumerChannel.Close()
rmqConsumer := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: consumerChannel,
ReQueueChannel: publisherChannel,
}),
consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern(
consumerChannel,
publisherChannel,
"goqueue", // exchange name
[]string{"goqueue.payments.#"}, // routing keys pattern
)),
consumerOpts.WithConsumerID("consumer_id"),
consumerOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumerOpts.WithTopicName("goqueue"),
consumerOpts.WithQueueName("consumer_queue"),
)

Expand Down
12 changes: 6 additions & 6 deletions examples/rabbitmq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,19 @@ func main() {
defer consumerChannel.Close()
rmqConsumer := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: consumerChannel,
ReQueueChannel: publisherChannel,
}),
consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern(
consumerChannel,
publisherChannel,
"goqueue", // exchange name
[]string{"goqueue.payments.#"}, // routing keys pattern
)),
consumerOpts.WithConsumerID("consumer_id"),
consumerOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumerOpts.WithTopicName("goqueue"),
consumerOpts.WithQueueName("consumer_queue"),
)

Expand Down
12 changes: 6 additions & 6 deletions internal/consumer/rabbitmq/blackbox_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@ func (s *rabbitMQTestSuite) TestConsumerWithoutExchangePatternProvided() {
func (s *rabbitMQTestSuite) TestConsumerWithExchangePatternProvided() {
s.seedPublish(string(headerVal.ContentTypeJSON), testAction)
rmqSubs := rmq.NewConsumer(
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: s.consumerChannel,
ReQueueChannel: s.publishChannel,
}),
consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern(
s.consumerChannel,
s.publishChannel,
testExchange,
[]string{"goqueue.action.#"},
)),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithMiddlewares(middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler()),
consumerOpts.WithQueueName(rabbitMQTestQueueName),
consumerOpts.WithActionsPatternSubscribed("goqueue.action.#"), //exchange pattern provided in constructor
consumerOpts.WithTopicName(testExchange), //exchange name provided in constructor
)

msgHandler := handler(s.T(), s.getMockData(testAction))
Expand Down
23 changes: 12 additions & 11 deletions internal/consumer/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ func NewConsumer(
requeueChannel: opt.RabbitMQConsumerConfig.ReQueueChannel,
option: opt,
}
if len(opt.ActionsPatternSubscribed) > 0 && opt.TopicName != "" {
if opt.RabbitMQConsumerConfig.QueueDeclareConfig != nil &&
opt.RabbitMQConsumerConfig.QueueBindConfig != nil {
rmqHandler.initQueue()
}

rmqHandler.initConsumer()
return rmqHandler
}
Expand All @@ -54,26 +56,25 @@ func (r *rabbitMQ) initQueue() {
// Returns an instance of amqp.Queue and any error encountered.
// Please refer to the RabbitMQ documentation for more information on the parameters.
// https://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare
// (from bxcodec: please raise a PR if you need a custom queue argument)
_, err := r.consumerChannel.QueueDeclare(
r.option.QueueName,
true,
false,
false,
false,
nil,
r.option.RabbitMQConsumerConfig.QueueDeclareConfig.Durable,
r.option.RabbitMQConsumerConfig.QueueDeclareConfig.AutoDelete,
r.option.RabbitMQConsumerConfig.QueueDeclareConfig.Exclusive,
r.option.RabbitMQConsumerConfig.QueueDeclareConfig.NoWait,
r.option.RabbitMQConsumerConfig.QueueDeclareConfig.Args,
)
if err != nil {
logrus.Fatal("error declaring the queue, ", err)
}

for _, eventType := range r.option.ActionsPatternSubscribed {
for _, eventType := range r.option.RabbitMQConsumerConfig.QueueBindConfig.RoutingKeys {
err = r.consumerChannel.QueueBind(
r.option.QueueName,
eventType,
r.option.TopicName,
false,
nil,
r.option.RabbitMQConsumerConfig.QueueBindConfig.ExchangeName,
r.option.RabbitMQConsumerConfig.QueueBindConfig.NoWait,
r.option.RabbitMQConsumerConfig.QueueBindConfig.Args,
)
if err != nil {
logrus.Fatal("error binding the queue, ", err)
Expand Down
101 changes: 79 additions & 22 deletions options/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ type ConsumerOption struct {
// Middlewares is a list of middleware functions to be applied to the inbound message handler.
Middlewares []interfaces.InboundMessageHandlerMiddlewareFunc

// ActionsPatternSubscribed specifies the list of action patterns that the consumer is subscribed to.
ActionsPatternSubscribed []string

// TopicName specifies the name of the topic to consume messages from.
TopicName string

// MaxRetryFailedMessage specifies the maximum number of times a failed message should be retried.
MaxRetryFailedMessage int64

Expand Down Expand Up @@ -68,22 +62,6 @@ func WithMiddlewares(middlewares ...interfaces.InboundMessageHandlerMiddlewareFu
}
}

// WithActionsPatternSubscribed sets the actions that the consumer will subscribe to.
// It takes a variadic parameter `actions` which represents the actions to be subscribed.
// The actions are stored in the `ActionsPatternSubscribed` field of the `ConsumerOption` struct.
func WithActionsPatternSubscribed(actions ...string) ConsumerOptionFunc {
return func(opt *ConsumerOption) {
opt.ActionsPatternSubscribed = actions
}
}

// WithTopicName sets the topic name for the consumer option.
func WithTopicName(name string) ConsumerOptionFunc {
return func(opt *ConsumerOption) {
opt.TopicName = name
}
}

// WithMaxRetryFailedMessage sets the maximum number of retries for failed messages.
// It takes an integer parameter 'n' and returns an ConsumerOptionFunc.
// The ConsumerOptionFunc updates the 'MaxRetryFailedMessage' field of the ConsumerOption struct.
Expand Down Expand Up @@ -122,10 +100,89 @@ type RabbitMQConsumerConfig struct {
ConsumerChannel *amqp.Channel
// ReQueueChannel is the channel used for re-queuing messages in RabbitMQ.
ReQueueChannel *amqp.Channel

// QueueDeclareConfig specifies the configuration for declaring a RabbitMQ queue.
QueueDeclareConfig *RabbitMQQueueDeclareConfig

// QueueBindConfig specifies the configuration for binding a queue to an exchange in RabbitMQ.
QueueBindConfig *RabbitMQQueueBindConfig
}

// RabbitMQQueueDeclareConfig represents the configuration options for declaring a RabbitMQ queue.
// * Durable and Non-Auto-Deleted queues will survive server restarts and remain
// when there are no remaining consumers or bindings. Persistent publishings will
// be restored in this queue on server restart. These queues are only able to be
// bound to durable exchanges.
// * Non-Durable and Auto-Deleted queues will not be redeclared on server restart
// and will be deleted by the server after a short time when the last consumer is
// canceled or the last consumer's channel is closed. Queues with this lifetime
// can also be deleted normally with QueueDelete. These durable queues can only
// be bound to non-durable exchanges.
// * Non-Durable and Non-Auto-Deleted queues will remain declared as long as the
// server is running regardless of how many consumers. This lifetime is useful
// for temporary topologies that may have long delays between consumer activity.
// These queues can only be bound to non-durable exchanges.
// * Durable and Auto-Deleted queues will be restored on server restart, but without
// active consumers will not survive and be removed. This Lifetime is unlikely
// to be useful.
// * Exclusive queues are only accessible by the connection that declares them and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
// * When noWait is true, the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
// When the error return value is not nil, you can assume the queue could not be
// declared with these parameters, and the channel will be closed.
type RabbitMQQueueDeclareConfig struct {
Durable bool // Whether the queue should survive a broker restart.
AutoDelete bool // Whether the queue should be deleted when there are no more consumers.
Exclusive bool // Whether the queue should be exclusive to the connection that declares it.
NoWait bool // Whether to wait for a response from the server after declaring the queue.
Args amqp.Table // Additional arguments to be passed when declaring the queue.
}

// RabbitMQQueueBindConfig represents the configuration options for binding a queue to an exchange in RabbitMQ.
type RabbitMQQueueBindConfig struct {
RoutingKeys []string // The routing key to use for the binding.
ExchangeName string // The name of the exchange to bind to.
NoWait bool // Whether to wait for a response from the server.
Args amqp.Table // Additional arguments for the binding.
}

const (
ConsumerPlatformRabbitMQ = options.PlatformRabbitMQ
ConsumerPlatformGooglePubSub = options.PlatformGooglePubSub
ConsumerPlatformSQS = options.PlatformSQS
)

// RabbitMQConfigWithDefaultTopicFanOutPattern returns a RabbitMQConsumerConfig with default configuration for topic fanout pattern.
// It takes the queueName, exchangeName, and routingKeys as parameters and returns a pointer to RabbitMQConsumerConfig.
// The default configuration includes a durable queue that is not auto-deleted, exclusive, and no-wait.
// The queue is bound to the exchange with the provided routing keys.
// the exchange is must be a fanout exchange. The exchange must be declared before using this configuration.
// Read more about fanout exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-fanout
//
// the routing keys can be in pattern format.
// e.g. "a.*.b.#" will match "a.b", "a.c.b", "a.c.d.b", etc.
// For more information on pattern matching, see https://www.rabbitmq.com/tutorials/tutorial-five-go.html
func RabbitMQConfigWithDefaultTopicFanOutPattern(consumerChannel, publisherChannel *amqp.Channel,
exchangeName string, routingKeys []string) *RabbitMQConsumerConfig {
return &RabbitMQConsumerConfig{
ConsumerChannel: consumerChannel,
ReQueueChannel: publisherChannel,
QueueDeclareConfig: &RabbitMQQueueDeclareConfig{
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
},
QueueBindConfig: &RabbitMQQueueBindConfig{
RoutingKeys: routingKeys,
ExchangeName: exchangeName,
NoWait: false,
Args: nil,
},
}
}

0 comments on commit 712be82

Please sign in to comment.