Skip to content

Commit

Permalink
chore: fix dlq
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec committed Aug 11, 2024
1 parent c736a57 commit a5a2fa6
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 27 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# goqueue

GoQueue - one library to rule them all. A golang wrapper that handles all the complexity of every Queue platforms. Extensible and easy to learn
GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.

## Index

Expand Down Expand Up @@ -58,12 +58,15 @@ func initExchange(ch *amqp.Channel, exchangeName string) error {
}

func main() {

// Initialize the RMQ connection
rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
rmqConn, err := amqp.Dial(rmqDSN)
if err != nil {
panic(err)
}

// Initialize the Publisher
rmqPub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Expand Down
12 changes: 12 additions & 0 deletions examples/rabbitmq/basic/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: "3.7"
services:
rabbitmq-test:
image: rabbitmq:3.13.3-management-alpine
container_name: "goqueue-rabbitmq-example-basic"
hostname: rabbitmq
ports:
- "15671:15672"
- "5672:5672"
volumes:
- ../../../tests/localconf/rabbitmq/rabbitmq.definition.json:/opt/definitions.json:ro
- ../../../tests/localconf/rabbitmq/rabbitmq.config:/etc/rabbitmq/rabbitmq.config:ro
File renamed without changes.
12 changes: 12 additions & 0 deletions examples/rabbitmq/withretries/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: "3.7"
services:
rabbitmq-test:
image: rabbitmq:3.13.3-management-alpine
container_name: "goqueue-rabbitmq-example-with-retries"
hostname: rabbitmq
ports:
- "15671:15672"
- "5672:5672"
volumes:
- ../../../tests/localconf/rabbitmq/rabbitmq.definition.json:/opt/definitions.json:ro
- ../../../tests/localconf/rabbitmq/rabbitmq.config:/etc/rabbitmq/rabbitmq.config:ro
121 changes: 121 additions & 0 deletions examples/rabbitmq/withretries/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"context"
"encoding/json"
"fmt"

amqp "github.com/rabbitmq/amqp091-go"

"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
"github.com/bxcodec/goqueue/interfaces"
"github.com/bxcodec/goqueue/middleware"
"github.com/bxcodec/goqueue/options"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
"github.com/bxcodec/goqueue/publisher"
)

func initExchange(ch *amqp.Channel, exchangeName string) error {
return ch.ExchangeDeclare(
exchangeName,
"topic",
true,
false,
false,
false,
nil,
)
}

func main() {
rmqDSN := "amqp://rabbitmq:rabbitmq@localhost:5672/"
rmqConn, err := amqp.Dial(rmqDSN)
if err != nil {
panic(err)
}

rmqPub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Conn: rmqConn,
PublisherChannelPoolSize: 5,
}),
publisherOpts.WithPublisherID("publisher_id"),
publisherOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
),
)

requeueChannel, err := rmqConn.Channel()
if err != nil {
panic(err)
}

defer requeueChannel.Close()
initExchange(requeueChannel, "goqueue")

consumerChannel, err := rmqConn.Channel()
if err != nil {
panic(err)
}
defer consumerChannel.Close()
rmqConsumer := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithRabbitMQConsumerConfig(consumerOpts.RabbitMQConfigWithDefaultTopicFanOutPattern(
consumerChannel,
requeueChannel,
"goqueue", // exchange name
[]string{"goqueue.payments.#"}, // routing keys pattern
)),
consumerOpts.WithConsumerID("consumer_id"),
consumerOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
),
consumerOpts.WithMaxRetryFailedMessage(5),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithQueueName("consumer_queue"),
)

queueSvc := goqueue.NewQueueService(
options.WithConsumer(rmqConsumer),
options.WithPublisher(rmqPub),
options.WithMessageHandler(handler()),
)
go func() {
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"message": fmt.Sprintf("Hello World %d", i),
}
jbyt, _ := json.Marshal(data)
err := queueSvc.Publish(context.Background(), interfaces.Message{
Data: data,
Action: "goqueue.payments.create",
Topic: "goqueue",
})
if err != nil {
panic(err)
}
fmt.Println("Message Sent: ", string(jbyt))
}
}()

// change to context.Background() if you want to run it forever
// ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// defer cancel()
err = queueSvc.Start(context.Background())
if err != nil {
panic(err)
}
}

func handler() interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
fmt.Printf("Message: %+v\n", m)
// something happend, we need to requeue the message
return m.PutToBackOfQueueWithDelay(ctx, interfaces.ExponentialBackoffDelayFn)
}
}
2 changes: 2 additions & 0 deletions headers/key/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ const (
ContentType = "goqueue-content-type"
QueueServiceAgent = "goqueue-queue-service-agent"
MessageID = "goqueue-message-id"
OriginalTopicName = "goqueue-original-topic-name"
OriginalActionName = "goqueue-original-action-name"
)
17 changes: 14 additions & 3 deletions interfaces/delayfn.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
package interfaces

import "fmt"

type DelayFn func(retries int64) (delay int64)

var (
// ExponentialBackoffDelayFn is a delay function that implements exponential backoff.
// It takes the number of retries as input and returns the delay in milliseconds.
// It takes the number of retries as input and returns the delay in seconds.
ExponentialBackoffDelayFn DelayFn = func(retries int64) (delay int64) {
return 2 << retries
fmt.Println(">>>>> ExponentialBackoffDelayFn, retry: ", retries)
return 2 << (retries - 1)
}

// LinearDelayFn is a delay function that implements linear delay.
// It takes the number of retries as input and returns the delay in seconds.
LinearDelayFn DelayFn = func(retries int64) (delay int64) {
fmt.Println(">>>>> LinearDelayFn, retry: ", retries)
return retries
}

// NoDelayFn is a DelayFn implementation that returns 0 delay for retries.
NoDelayFn DelayFn = func(retries int64) (delay int64) {
return 0
}
DefaultDelayFn DelayFn = NoDelayFn
DefaultDelayFn DelayFn = LinearDelayFn
)
Loading

0 comments on commit a5a2fa6

Please sign in to comment.