From a5a2fa68152d00e3758ccca9e6a510d9520f5572 Mon Sep 17 00:00:00 2001 From: Iman Tumorang Date: Sun, 11 Aug 2024 18:37:01 +0300 Subject: [PATCH] chore: fix dlq --- README.md | 5 +- examples/rabbitmq/basic/docker-compose.yaml | 12 ++ examples/rabbitmq/{ => basic}/main.go | 0 .../rabbitmq/withretries/docker-compose.yaml | 12 ++ examples/rabbitmq/withretries/main.go | 121 ++++++++++++++++ headers/key/const.go | 2 + interfaces/delayfn.go | 17 ++- internal/consumer/rabbitmq/consumer.go | 129 +++++++++++++++--- options/consumer/consumer.go | 4 +- 9 files changed, 275 insertions(+), 27 deletions(-) create mode 100644 examples/rabbitmq/basic/docker-compose.yaml rename examples/rabbitmq/{ => basic}/main.go (100%) create mode 100644 examples/rabbitmq/withretries/docker-compose.yaml create mode 100644 examples/rabbitmq/withretries/main.go diff --git a/README.md b/README.md index cf13a1f..162cd1a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # goqueue -GoQueue - one library to rule them all. A golang wrapper that handles all the complexity of every Queue platforms. Extensible and easy to learn +GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn. ## Index @@ -58,12 +58,15 @@ func initExchange(ch *amqp.Channel, exchangeName string) error { } func main() { + + // Initialize the RMQ connection rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/" rmqConn, err := amqp.Dial(rmqDSN) if err != nil { panic(err) } + // Initialize the Publisher rmqPub := publisher.NewPublisher( publisherOpts.PublisherPlatformRabbitMQ, publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ diff --git a/examples/rabbitmq/basic/docker-compose.yaml b/examples/rabbitmq/basic/docker-compose.yaml new file mode 100644 index 0000000..daffce2 --- /dev/null +++ b/examples/rabbitmq/basic/docker-compose.yaml @@ -0,0 +1,12 @@ +version: "3.7" +services: + rabbitmq-test: + image: rabbitmq:3.13.3-management-alpine + container_name: "goqueue-rabbitmq-example-basic" + hostname: rabbitmq + ports: + - "15671:15672" + - "5672:5672" + volumes: + - ../../../tests/localconf/rabbitmq/rabbitmq.definition.json:/opt/definitions.json:ro + - ../../../tests/localconf/rabbitmq/rabbitmq.config:/etc/rabbitmq/rabbitmq.config:ro diff --git a/examples/rabbitmq/main.go b/examples/rabbitmq/basic/main.go similarity index 100% rename from examples/rabbitmq/main.go rename to examples/rabbitmq/basic/main.go diff --git a/examples/rabbitmq/withretries/docker-compose.yaml b/examples/rabbitmq/withretries/docker-compose.yaml new file mode 100644 index 0000000..6594354 --- /dev/null +++ b/examples/rabbitmq/withretries/docker-compose.yaml @@ -0,0 +1,12 @@ +version: "3.7" +services: + rabbitmq-test: + image: rabbitmq:3.13.3-management-alpine + container_name: "goqueue-rabbitmq-example-with-retries" + hostname: rabbitmq + ports: + - "15671:15672" + - "5672:5672" + volumes: + - ../../../tests/localconf/rabbitmq/rabbitmq.definition.json:/opt/definitions.json:ro + - ../../../tests/localconf/rabbitmq/rabbitmq.config:/etc/rabbitmq/rabbitmq.config:ro diff --git a/examples/rabbitmq/withretries/main.go b/examples/rabbitmq/withretries/main.go new file mode 100644 index 0000000..35c880d --- /dev/null +++ b/examples/rabbitmq/withretries/main.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/bxcodec/goqueue" + "github.com/bxcodec/goqueue/consumer" + "github.com/bxcodec/goqueue/interfaces" + "github.com/bxcodec/goqueue/middleware" + "github.com/bxcodec/goqueue/options" + consumerOpts "github.com/bxcodec/goqueue/options/consumer" + publisherOpts "github.com/bxcodec/goqueue/options/publisher" + "github.com/bxcodec/goqueue/publisher" +) + +func initExchange(ch *amqp.Channel, exchangeName string) error { + return ch.ExchangeDeclare( + exchangeName, + "topic", + true, + false, + false, + false, + nil, + ) +} + +func main() { + rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/" + rmqConn, err := amqp.Dial(rmqDSN) + if err != nil { + panic(err) + } + + rmqPub := publisher.NewPublisher( + publisherOpts.PublisherPlatformRabbitMQ, + publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ + Conn: rmqConn, + PublisherChannelPoolSize: 5, + }), + publisherOpts.WithPublisherID("publisher_id"), + publisherOpts.WithMiddlewares( + middleware.HelloWorldMiddlewareExecuteBeforePublisher(), + middleware.HelloWorldMiddlewareExecuteAfterPublisher(), + ), + ) + + requeueChannel, err := rmqConn.Channel() + if err != nil { + panic(err) + } + + defer requeueChannel.Close() + initExchange(requeueChannel, "goqueue") + + consumerChannel, err := rmqConn.Channel() + if err != nil { + panic(err) + } + defer consumerChannel.Close() + rmqConsumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern( + consumerChannel, + requeueChannel, + "goqueue", // exchange name + []string{"goqueue.payments.#"}, // routing keys pattern + )), + consumerOpts.WithConsumerID("consumer_id"), + consumerOpts.WithMiddlewares( + middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(), + middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(), + ), + consumerOpts.WithMaxRetryFailedMessage(5), + consumerOpts.WithBatchMessageSize(1), + consumerOpts.WithQueueName("consumer_queue"), + ) + + queueSvc := goqueue.NewQueueService( + options.WithConsumer(rmqConsumer), + options.WithPublisher(rmqPub), + options.WithMessageHandler(handler()), + ) + go func() { + for i := 0; i < 10; i++ { + data := map[string]interface{}{ + "message": fmt.Sprintf("Hello World %d", i), + } + jbyt, _ := json.Marshal(data) + err := queueSvc.Publish(context.Background(), interfaces.Message{ + Data: data, + Action: "goqueue.payments.create", + Topic: "goqueue", + }) + if err != nil { + panic(err) + } + fmt.Println("Message Sent: ", string(jbyt)) + } + }() + + // change to context.Background() if you want to run it forever + // ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // defer cancel() + err = queueSvc.Start(context.Background()) + if err != nil { + panic(err) + } +} + +func handler() interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) (err error) { + fmt.Printf("Message: %+v\n", m) + // something happend, we need to requeue the message + return m.PutToBackOfQueueWithDelay(ctx, interfaces.ExponentialBackoffDelayFn) + } +} diff --git a/headers/key/const.go b/headers/key/const.go index 4ad7c4b..79a36ed 100644 --- a/headers/key/const.go +++ b/headers/key/const.go @@ -9,4 +9,6 @@ const ( ContentType = "goqueue-content-type" QueueServiceAgent = "goqueue-queue-service-agent" MessageID = "goqueue-message-id" + OriginalTopicName = "goqueue-original-topic-name" + OriginalActionName = "goqueue-original-action-name" ) diff --git a/interfaces/delayfn.go b/interfaces/delayfn.go index ca945a4..333f4ea 100644 --- a/interfaces/delayfn.go +++ b/interfaces/delayfn.go @@ -1,16 +1,27 @@ package interfaces +import "fmt" + type DelayFn func(retries int64) (delay int64) var ( // ExponentialBackoffDelayFn is a delay function that implements exponential backoff. - // It takes the number of retries as input and returns the delay in milliseconds. + // It takes the number of retries as input and returns the delay in seconds. ExponentialBackoffDelayFn DelayFn = func(retries int64) (delay int64) { - return 2 << retries + fmt.Println(">>>>> ExponentialBackoffDelayFn, retry: ", retries) + return 2 << (retries - 1) + } + + // LinearDelayFn is a delay function that implements linear delay. + // It takes the number of retries as input and returns the delay in seconds. + LinearDelayFn DelayFn = func(retries int64) (delay int64) { + fmt.Println(">>>>> LinearDelayFn, retry: ", retries) + return retries } + // NoDelayFn is a DelayFn implementation that returns 0 delay for retries. NoDelayFn DelayFn = func(retries int64) (delay int64) { return 0 } - DefaultDelayFn DelayFn = NoDelayFn + DefaultDelayFn DelayFn = LinearDelayFn ) diff --git a/internal/consumer/rabbitmq/consumer.go b/internal/consumer/rabbitmq/consumer.go index 995274b..680bbdc 100644 --- a/internal/consumer/rabbitmq/consumer.go +++ b/internal/consumer/rabbitmq/consumer.go @@ -21,11 +21,13 @@ import ( // rabbitMQ is the subscriber handler for rabbitmq type rabbitMQ struct { - consumerChannel *amqp.Channel - requeueChannel *amqp.Channel //if want requeue support to another queue - option *consumerOpts.ConsumerOption - tagName string - msgReceiver <-chan amqp.Delivery + consumerChannel *amqp.Channel + requeueChannel *amqp.Channel //if want requeue support to another queue + option *consumerOpts.ConsumerOption + tagName string + msgReceiver <-chan amqp.Delivery + retryExchangeName string + retryDeadLetterExchangeName string } // New will initialize the rabbitMQ subscriber @@ -37,9 +39,11 @@ func NewConsumer( } rmqHandler := &rabbitMQ{ - consumerChannel: opt.RabbitMQConsumerConfig.ConsumerChannel, - requeueChannel: opt.RabbitMQConsumerConfig.ReQueueChannel, - option: opt, + consumerChannel: opt.RabbitMQConsumerConfig.ConsumerChannel, + requeueChannel: opt.RabbitMQConsumerConfig.ReQueueChannel, + option: opt, + retryExchangeName: fmt.Sprintf("%s__retry_exchange", opt.QueueName), + retryDeadLetterExchangeName: fmt.Sprintf("%s__retry_dlx", opt.QueueName), } if opt.RabbitMQConsumerConfig.QueueDeclareConfig != nil && opt.RabbitMQConsumerConfig.QueueBindConfig != nil { @@ -47,6 +51,7 @@ func NewConsumer( } rmqHandler.initConsumer() + rmqHandler.initRetryModule() return rmqHandler } @@ -127,6 +132,80 @@ func (r *rabbitMQ) initConsumer() { r.msgReceiver = receiver } +func (r *rabbitMQ) initRetryModule() { + // declare retry exchange + err := r.consumerChannel.ExchangeDeclare( + r.retryExchangeName, + "topic", + true, + false, + false, + false, + nil, + ) + + if err != nil { + logrus.Fatal("error declaring the retry exchange, ", err) + } + + // declare dead letter exchange + err = r.consumerChannel.ExchangeDeclare( + r.retryDeadLetterExchangeName, + "fanout", + true, + false, + false, + false, + nil, + ) + + if err != nil { + logrus.Fatal("error declaring the retry dead letter exchange, ", err) + } + + // bind dead letter exchange to original queue + err = r.consumerChannel.QueueBind( + r.option.QueueName, + "", + r.retryDeadLetterExchangeName, + false, + nil, + ) + if err != nil { + logrus.Fatal("error binding the dead letter exchange to the original queue, ", err) + } + + // declare retry queue + for i := int64(1); i <= r.option.MaxRetryFailedMessage; i++ { + // declare retry queue + _, err = r.consumerChannel.QueueDeclare( + getRetryRoutingKey(r.option.QueueName, i), // queue name and routing key is the same for retry queue + true, + false, + false, + false, + amqp.Table{ + "x-dead-letter-exchange": r.retryDeadLetterExchangeName, + }, + ) + if err != nil { + logrus.Fatal("error declaring the retry queue, ", err) + } + + // bind retry queue to retry exchange + err = r.consumerChannel.QueueBind( + getRetryRoutingKey(r.option.QueueName, i), // queue name and routing key is the same for retry queue + getRetryRoutingKey(r.option.QueueName, i), // queue name and routing key is the same for retry queue + r.retryExchangeName, + false, + nil, + ) + if err != nil { + logrus.Fatal("error binding the retry queue, ", err) + } + } +} + // Consume consumes messages from a RabbitMQ queue and handles them using the provided message handler. // It takes a context, an inbound message handler, and a map of metadata as input parameters. // The function continuously listens for messages from the queue and processes them until the context is canceled. @@ -190,7 +269,6 @@ func (r *rabbitMQ) Consume(ctx context.Context, } continue } - m := interfaces.InboundMessage{ Message: msg, RetryCount: retryCount, @@ -215,18 +293,18 @@ func (r *rabbitMQ) Consume(ctx context.Context, return }, Nack: func(ctx context.Context) (err error) { - // receivedMsg.Nack(true) => will redelivered again instantly (same with receivedMsg.reject) - // receivedMsg.Nack(false) => will put the message to dead letter queue (same with receivedMsg.reject) + // receivedMsg.Nack(false, true) => will redelivered again instantly (same with receivedMsg.reject) + // receivedMsg.Nack(false, false) => will put the message to dead letter queue (same with receivedMsg.reject) err = receivedMsg.Nack(false, true) return }, MoveToDeadLetterQueue: func(ctx context.Context) (err error) { - // receivedMsg.Nack(true) => will redelivered again instantly (same with receivedMsg.reject) - // receivedMsg.Nack(false) => will put the message to dead letter queue (same with receivedMsg.reject) + // receivedMsg.Nack(false, true) => will redelivered again instantly (same with receivedMsg.reject) + // receivedMsg.Nack(false, false) => will put the message to dead letter queue (same with receivedMsg.reject) err = receivedMsg.Nack(false, false) return }, - PutToBackOfQueueWithDelay: r.requeueMessage(meta, receivedMsg), + PutToBackOfQueueWithDelay: r.requeueMessageWithDLQ(meta, msg, receivedMsg), } logrus.WithFields(logrus.Fields{ @@ -293,22 +371,25 @@ func buildMessage(consumerMeta map[string]interface{}, receivedMsg amqp.Delivery return msg, nil } -func (r *rabbitMQ) requeueMessage(consumerMeta map[string]interface{}, - receivedMsg amqp.Delivery) func(ctx context.Context, delayFn interfaces.DelayFn) (err error) { +func (r *rabbitMQ) requeueMessageWithDLQ(consumerMeta map[string]interface{}, msg interfaces.Message, receivedMsg amqp.Delivery) func(ctx context.Context, delayFn interfaces.DelayFn) (err error) { return func(ctx context.Context, delayFn interfaces.DelayFn) (err error) { if delayFn == nil { delayFn = interfaces.DefaultDelayFn } retries := extractHeaderInt(receivedMsg.Headers, headerKey.RetryCount) retries++ - delay := delayFn(retries) - time.Sleep(time.Duration(delay) * time.Second) + delayInSeconds := delayFn(retries) + routingKeyPrefixForRetryQueue := getRetryRoutingKey(r.option.QueueName, retries) + // it will publish to each retry queue with TTL is the delayInSeconds + // and there will n dead letter queues based on the limit of retries config headers := receivedMsg.Headers - headers[string(headerKey.RetryCount)] = retries + headers[headerKey.OriginalTopicName] = msg.Topic + headers[headerKey.OriginalActionName] = msg.Action + headers[headerKey.RetryCount] = retries requeueErr := r.requeueChannel.PublishWithContext( ctx, - receivedMsg.Exchange, - receivedMsg.RoutingKey, + r.retryExchangeName, + routingKeyPrefixForRetryQueue, false, false, amqp.Publishing{ @@ -317,8 +398,10 @@ func (r *rabbitMQ) requeueMessage(consumerMeta map[string]interface{}, Body: receivedMsg.Body, Timestamp: time.Now(), AppId: r.tagName, + Expiration: fmt.Sprintf("%d", delayInSeconds*10000), }, ) + // time.Sleep(5 * time.Second) if requeueErr != nil { logrus.WithFields(logrus.Fields{ "consumer_meta": consumerMeta, @@ -349,6 +432,10 @@ func (r *rabbitMQ) requeueMessage(consumerMeta map[string]interface{}, } } +func getRetryRoutingKey(queueName string, retry int64) string { + return fmt.Sprintf("%s__retry.%d", queueName, retry) +} + func extractHeaderString(headers amqp.Table, key string) string { val, ok := headers[key] if !ok { diff --git a/options/consumer/consumer.go b/options/consumer/consumer.go index 614cb0d..47e89d6 100644 --- a/options/consumer/consumer.go +++ b/options/consumer/consumer.go @@ -166,11 +166,11 @@ const ( // the routing keys can be in pattern format. // e.g. "a.*.b.#" will match "a.b", "a.c.b", "a.c.d.b", etc. // For more information on pattern matching, see https://www.rabbitmq.com/tutorials/tutorial-five-go.html -func RabbitMQConfigWithDefaultTopicFanOutPattern(consumerChannel, publisherChannel *amqp.Channel, +func RabbitMQConfigWithDefaultTopicFanOutPattern(consumerChannel, requeueChannel *amqp.Channel, exchangeName string, routingKeys []string) *RabbitMQConsumerConfig { return &RabbitMQConsumerConfig{ ConsumerChannel: consumerChannel, - ReQueueChannel: publisherChannel, + ReQueueChannel: requeueChannel, QueueDeclareConfig: &RabbitMQQueueDeclareConfig{ Durable: true, AutoDelete: false,