From e24518bcb4b00f91a79d2d730b7f2a298fe0c619 Mon Sep 17 00:00:00 2001 From: Iman Tumorang Date: Sun, 16 Jun 2024 13:10:44 +0300 Subject: [PATCH] fix: linter (#14) --- ...yaml => rabbitmq-integration-testing.yaml} | 2 +- README.md | 55 ++++++++------ consumer/service.go | 3 + encoding.go | 4 + errors/error.go | 15 +++- .../rabbitmq/blackbox_consumer_test.go | 2 + internal/consumer/rabbitmq/consumer.go | 19 +++-- .../rabbitmq/blackbox_publisher_test.go | 2 + internal/publisher/rabbitmq/channel_pool.go | 11 +++ internal/publisher/rabbitmq/publisher.go | 32 +++++++- middleware/default_errormapper.go | 61 +++++++++++++++ middleware/example.go | 65 ++++++++++++++++ middleware/middleware.go | 76 +++---------------- options/consumer/consumer.go | 29 +++++-- options/goqueue.go | 29 ++++++- options/platforms.go | 1 + options/publisher/publisher.go | 16 +++- publisher/service.go | 3 + queueservice.go | 11 ++- 19 files changed, 323 insertions(+), 113 deletions(-) rename .github/workflows/{integration-testing.yaml => rabbitmq-integration-testing.yaml} (94%) create mode 100644 middleware/default_errormapper.go create mode 100644 middleware/example.go diff --git a/.github/workflows/integration-testing.yaml b/.github/workflows/rabbitmq-integration-testing.yaml similarity index 94% rename from .github/workflows/integration-testing.yaml rename to .github/workflows/rabbitmq-integration-testing.yaml index 6114b41..dfe5ea4 100644 --- a/.github/workflows/integration-testing.yaml +++ b/.github/workflows/rabbitmq-integration-testing.yaml @@ -1,4 +1,4 @@ -name: Integration Testing +name: Integration Testing with RabbitMQ on: push: diff --git a/README.md b/README.md index b7d8caa..a2d7d91 100644 --- a/README.md +++ b/README.md @@ -37,10 +37,12 @@ import ( "github.com/bxcodec/goqueue" "github.com/bxcodec/goqueue/consumer" - rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq" + "github.com/bxcodec/goqueue/interfaces" "github.com/bxcodec/goqueue/middleware" + "github.com/bxcodec/goqueue/options" + consumerOpts "github.com/bxcodec/goqueue/options/consumer" + publisherOpts "github.com/bxcodec/goqueue/options/publisher" "github.com/bxcodec/goqueue/publisher" - rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq" ) func initExchange(ch *amqp.Channel, exchangeName string) error { @@ -62,9 +64,14 @@ func main() { panic(err) } - rmqPub := rmqPublisher.NewPublisher(rmqConn, - publisher.WithPublisherID("publisher_id"), - publisher.WithMiddlewares( + rmqPub := publisher.NewPublisher( + publisherOpts.PublisherPlatformRabbitMQ, + publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ + Conn: rmqConn, + PublisherChannelPoolSize: 5, + }), + publisherOpts.WithPublisherID("publisher_id"), + publisherOpts.WithMiddlewares( middleware.HelloWorldMiddlewareExecuteBeforePublisher(), middleware.HelloWorldMiddlewareExecuteAfterPublisher(), ), @@ -83,35 +90,36 @@ func main() { panic(err) } defer consumerChannel.Close() - - rmqConsumer := rmqConsumer.NewConsumer( - publisherChannel, - consumerChannel, - consumer.WithMiddlewares( + rmqConsumer := consumer.NewConsumer( + consumerOpts.ConsumerPlatformRabbitMQ, + consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ + ConsumerChannel: consumerChannel, + ReQueueChannel: publisherChannel, + }), + consumerOpts.WithConsumerID("consumer_id"), + consumerOpts.WithMiddlewares( middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(), middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(), ), - consumer.WithQueueName("consumer_queue"), - consumer.WithConsumerID("consumer_id"), - consumer.WithBatchMessageSize(1), - consumer.WithMaxRetryFailedMessage(3), - consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"), - consumer.WithTopicName("goqueue"), + consumerOpts.WithMaxRetryFailedMessage(3), + consumerOpts.WithBatchMessageSize(1), + consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"), + consumerOpts.WithTopicName("goqueue"), + consumerOpts.WithQueueName("consumer_queue"), ) queueSvc := goqueue.NewQueueService( - goqueue.WithPublisher(rmqPub), - goqueue.WithConsumer(rmqConsumer), - goqueue.WithMessageHandler(handler()), + options.WithConsumer(rmqConsumer), + options.WithPublisher(rmqPub), + options.WithMessageHandler(handler()), ) - go func() { for i := 0; i < 10; i++ { data := map[string]interface{}{ "message": fmt.Sprintf("Hello World %d", i), } jbyt, _ := json.Marshal(data) - err := queueSvc.Publish(context.Background(), goqueue.Message{ + err := queueSvc.Publish(context.Background(), interfaces.Message{ Data: data, Action: "goqueue.payments.create", Topic: "goqueue", @@ -132,14 +140,15 @@ func main() { } } -func handler() goqueue.InboundMessageHandlerFunc { - return func(ctx context.Context, m goqueue.InboundMessage) (err error) { +func handler() interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) (err error) { data := m.Data jbyt, _ := json.Marshal(data) fmt.Println("Message Received: ", string(jbyt)) return m.Ack(ctx) } } + ``` ## Contribution diff --git a/consumer/service.go b/consumer/service.go index a49a7b8..347d78d 100644 --- a/consumer/service.go +++ b/consumer/service.go @@ -7,6 +7,9 @@ import ( consumerOpts "github.com/bxcodec/goqueue/options/consumer" ) +// NewConsumer creates a new consumer based on the specified platform. +// It accepts a platform option and additional consumer option functions. +// It returns a consumer.Consumer interface implementation. func NewConsumer(platform options.Platform, opts ...consumerOpts.ConsumerOptionFunc) consumer.Consumer { switch platform { case consumerOpts.ConsumerPlatformRabbitMQ: diff --git a/encoding.go b/encoding.go index 51ce37a..e95f3e4 100644 --- a/encoding.go +++ b/encoding.go @@ -72,3 +72,7 @@ var ( } DefaultEncoding = JSONEncoding ) + +func init() { + AddGoQueueEncoding(JSONEncoding.ContentType, JSONEncoding) +} diff --git a/errors/error.go b/errors/error.go index d97ea27..f26d0fb 100644 --- a/errors/error.go +++ b/errors/error.go @@ -1,9 +1,12 @@ package errors const ( - InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT" + InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT" + EncodingFormatNotSupported = "ENCODING_FORMAT_NOT_SUPPORTED" + UnKnownError = "UNKNOWN_ERROR" ) +// Error represents an error with a code and a message. type Error struct { Code string `json:"code"` Message string `json:"message"` @@ -14,7 +17,17 @@ func (e Error) Error() string { } var ( + // ErrInvalidMessageFormat is an error that occurs when attempting to unmarshal a message with an invalid format. ErrInvalidMessageFormat = Error{ Code: InvalidMessageFormatCode, Message: "failed to unmarshal the message, removing the message due to wrong message format"} + // ErrEncodingFormatNotSupported is an error that indicates the encoding format is not supported. + ErrEncodingFormatNotSupported = Error{ + Code: EncodingFormatNotSupported, + Message: "encoding format not supported. Please register the encoding format before using it"} + + // ErrUnknownError is an error that indicates an unknown error occurred. + ErrUnknownError = Error{ + Code: UnKnownError, + Message: "an unknown error occurred"} ) diff --git a/internal/consumer/rabbitmq/blackbox_consumer_test.go b/internal/consumer/rabbitmq/blackbox_consumer_test.go index f31d79b..2e8e888 100644 --- a/internal/consumer/rabbitmq/blackbox_consumer_test.go +++ b/internal/consumer/rabbitmq/blackbox_consumer_test.go @@ -43,6 +43,8 @@ func TestSuiteRabbitMQConsumer(t *testing.T) { t.Skip("Skip the Test Suite for RabbitMQ Consumer") } + time.Sleep(5 * time.Second) // wait for the rabbitmq to be ready + rmqURL := os.Getenv("RABBITMQ_TEST_URL") if rmqURL == "" { rmqURL = "amqp://test:test@localhost:5672/test" diff --git a/internal/consumer/rabbitmq/consumer.go b/internal/consumer/rabbitmq/consumer.go index 45750fb..20ef869 100644 --- a/internal/consumer/rabbitmq/consumer.go +++ b/internal/consumer/rabbitmq/consumer.go @@ -126,11 +126,20 @@ func (r *rabbitMQ) initConsumer() { r.msgReceiver = receiver } -// Consume consumes messages from a RabbitMQ queue. -// It takes a context, an inbound message handler, and metadata as input parameters. -// The method continuously listens for messages from the queue and handles them using the provided handler. -// If the context is canceled, the method stops consuming messages and returns. -// The method returns an error if there was an issue consuming messages. +// Consume consumes messages from a RabbitMQ queue and handles them using the provided message handler. +// It takes a context, an inbound message handler, and a map of metadata as input parameters. +// The function continuously listens for messages from the queue and processes them until the context is canceled. +// If the context is canceled, the function stops consuming messages and returns. +// For each received message, the function builds an inbound message, extracts the retry count, and checks if the maximum retry count has been reached. +// If the maximum retry count has been reached, the message is moved to the dead letter queue. +// Otherwise, the message is passed to the message handler for processing. +// The message handler is responsible for handling the message and returning an error if any. +// If an error occurs while handling the message, it is logged. +// The function provides methods for acknowledging, rejecting, and moving messages to the dead letter queue. +// These methods can be used by the message handler to control the message processing flow. +// The function also logs information about the received message, such as the message ID, topic, action, and timestamp. +// It applies any configured middlewares to the message handler before calling it. +// The function returns an error if any occurred during message handling or if the context was canceled. func (r *rabbitMQ) Consume(ctx context.Context, h interfaces.InboundMessageHandler, meta map[string]interface{}) (err error) { diff --git a/internal/publisher/rabbitmq/blackbox_publisher_test.go b/internal/publisher/rabbitmq/blackbox_publisher_test.go index 490a014..6ad45ce 100644 --- a/internal/publisher/rabbitmq/blackbox_publisher_test.go +++ b/internal/publisher/rabbitmq/blackbox_publisher_test.go @@ -40,6 +40,8 @@ func TestSuiteRabbitMQPublisher(t *testing.T) { t.Skip("Skip the Test Suite for RabbitMQ Publisher") } + time.Sleep(5 * time.Second) // wait for the rabbitmq to be ready + rmqURL := os.Getenv("RABBITMQ_TEST_URL") if rmqURL == "" { rmqURL = "amqp://test:test@localhost:5672/test" diff --git a/internal/publisher/rabbitmq/channel_pool.go b/internal/publisher/rabbitmq/channel_pool.go index 4616dec..0250679 100644 --- a/internal/publisher/rabbitmq/channel_pool.go +++ b/internal/publisher/rabbitmq/channel_pool.go @@ -8,6 +8,8 @@ import ( "go.uber.org/multierr" ) +// ChannelPool represents a pool of AMQP channels used for publishing messages. +// It provides a way to manage and reuse AMQP channels efficiently. type ChannelPool struct { conn *amqp.Connection mutex sync.Mutex @@ -15,6 +17,9 @@ type ChannelPool struct { maxSize int } +// NewChannelPool creates a new ChannelPool instance. +// It takes an AMQP connection and the maximum size of the pool as parameters. +// It returns a pointer to the newly created ChannelPool. func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool { return &ChannelPool{ conn: conn, @@ -23,6 +28,8 @@ func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool { } } +// Get returns a channel from the pool. If there are available channels in the pool, it returns one of them. +// Otherwise, it creates a new channel from the underlying connection and returns it. func (cp *ChannelPool) Get() (*amqp.Channel, error) { cp.mutex.Lock() defer cp.mutex.Unlock() @@ -35,6 +42,8 @@ func (cp *ChannelPool) Get() (*amqp.Channel, error) { } } +// Return returns a channel back to the channel pool. +// If the pool is full, the channel is closed. func (cp *ChannelPool) Return(ch *amqp.Channel) { cp.mutex.Lock() defer cp.mutex.Unlock() @@ -51,6 +60,8 @@ func (cp *ChannelPool) Return(ch *amqp.Channel) { } } +// Close closes the ChannelPool and all its associated channels. +// It returns an error if there was an error closing any of the channels. func (cp *ChannelPool) Close() (err error) { cp.mutex.Lock() defer cp.mutex.Unlock() diff --git a/internal/publisher/rabbitmq/publisher.go b/internal/publisher/rabbitmq/publisher.go index d93857f..facc61b 100644 --- a/internal/publisher/rabbitmq/publisher.go +++ b/internal/publisher/rabbitmq/publisher.go @@ -5,6 +5,7 @@ import ( "time" "github.com/bxcodec/goqueue" + "github.com/bxcodec/goqueue/errors" headerKey "github.com/bxcodec/goqueue/headers/key" headerVal "github.com/bxcodec/goqueue/headers/value" "github.com/bxcodec/goqueue/interfaces" @@ -25,6 +26,32 @@ type rabbitMQ struct { option *publisherOpts.PublisherOption } +// NewPublisher creates a new instance of the publisher.Publisher interface +// using the provided options. It returns a publisher.Publisher implementation +// that utilizes RabbitMQ as the underlying message broker. +// +// The function accepts a variadic parameter `opts` of type +// `publisherOpts.PublisherOptionFunc`, which allows the caller to provide +// custom configuration options for the publisher. +// +// Example usage: +// +// publisher := NewPublisher( +// publisherOpts.PublisherPlatformRabbitMQ, +// publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ +// Conn: rmqConn, +// PublisherChannelPoolSize: 5, +// }), +// publisherOpts.WithPublisherID("publisher_id"), +// publisherOpts.WithMiddlewares( +// middleware.HelloWorldMiddlewareExecuteBeforePublisher(), +// middleware.HelloWorldMiddlewareExecuteAfterPublisher(), +// ), +// +// ) +// +// The returned publisher can be used to publish messages to the configured +// RabbitMQ exchange and routing key. func NewPublisher( opts ...publisherOpts.PublisherOptionFunc, ) publisher.Publisher { @@ -47,6 +74,9 @@ func NewPublisher( } } +// Publish sends a message to the RabbitMQ exchange. +// It applies the default content type if not specified in the message. +// It also applies any registered middlewares before publishing the message. func (r *rabbitMQ) Publish(ctx context.Context, m interfaces.Message) (err error) { if m.ContentType == "" { m.ContentType = publisherOpts.DefaultContentType @@ -94,7 +124,7 @@ func (r *rabbitMQ) buildPublisher() interfaces.PublisherFunc { m.ID = id encoder, ok := goqueue.GetGoQueueEncoding(m.ContentType) if !ok { - encoder = goqueue.DefaultEncoding + return errors.ErrEncodingFormatNotSupported } data, err := encoder.Encode(ctx, m) diff --git a/middleware/default_errormapper.go b/middleware/default_errormapper.go new file mode 100644 index 0000000..2150c43 --- /dev/null +++ b/middleware/default_errormapper.go @@ -0,0 +1,61 @@ +package middleware + +import ( + "context" + + "github.com/bxcodec/goqueue/errors" + "github.com/bxcodec/goqueue/interfaces" +) + +// PublisherDefaultErrorMapper returns a middleware function that maps publisher errors to specific error types. +// It takes a next PublisherFunc as input and returns a new PublisherFunc that performs error mapping. +// If an error occurs during publishing, it will be mapped to a specific error type based on the error code. +// The mapped error will be returned, or nil if no error occurred. +func PublisherDefaultErrorMapper() interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, e interfaces.Message) (err error) { + err = next(ctx, e) + if err != nil { + switch err { + case errors.ErrInvalidMessageFormat: + return errors.ErrInvalidMessageFormat + case errors.ErrEncodingFormatNotSupported: + return errors.ErrEncodingFormatNotSupported + default: + return errors.Error{ + Code: errors.UnKnownError, + Message: err.Error(), + } + } + } + return nil + } + } +} + +// InboundMessageHandlerDefaultErrorMapper returns a middleware function that maps specific errors to predefined error types. +// It takes the next inbound message handler function as input and returns a new inbound message handler function. +// The returned function checks if an error occurred during the execution of the next handler function. +// If an error is found, it maps the error to a predefined error type and returns it. +// If no error is found, it returns nil. +func InboundMessageHandlerDefaultErrorMapper() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) (err error) { + err = next(ctx, m) + if err != nil { + switch err { + case errors.ErrInvalidMessageFormat: + return errors.ErrInvalidMessageFormat + case errors.ErrEncodingFormatNotSupported: + return errors.ErrEncodingFormatNotSupported + default: + return errors.Error{ + Code: errors.UnKnownError, + Message: err.Error(), + } + } + } + return nil + } + } +} diff --git a/middleware/example.go b/middleware/example.go new file mode 100644 index 0000000..a958944 --- /dev/null +++ b/middleware/example.go @@ -0,0 +1,65 @@ +package middleware + +import ( + "context" + + "github.com/bxcodec/goqueue/interfaces" + "github.com/sirupsen/logrus" +) + +// HelloWorldMiddlewareExecuteAfterInboundMessageHandler returns an inbound message handler middleware function. +// This middleware function executes after the inbound message handler and performs additional tasks. +// It logs any errors that occur during the execution of the next handler and provides an opportunity to handle them. +// You can customize the error handling logic by adding your own error handler, such as sending errors to Sentry or other error tracking tools. +// After error handling, it logs a message indicating that the hello-world-last-middleware has been executed. +// The function signature follows the `interfaces.InboundMessageHandlerMiddlewareFunc` type. +func HelloWorldMiddlewareExecuteAfterInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) (err error) { + err = next(ctx, m) + if err != nil { + logrus.Error("Error: ", err, "add your custom error handler here, eg send to Sentry or other error tracking tools") + } + logrus.Info("hello-world-last-middleware executed") + return err + } + } +} + +// HelloWorldMiddlewareExecuteBeforeInboundMessageHandler is a function that returns an inbound message handler middleware. +// This middleware logs a message and then calls the next inbound message handler in the chain. +func HelloWorldMiddlewareExecuteBeforeInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc { + return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { + return func(ctx context.Context, m interfaces.InboundMessage) (err error) { + logrus.Info("hello-world-first-middleware executed") + return next(ctx, m) + } + } +} + +// HelloWorldMiddlewareExecuteAfterPublisher is a function that returns a PublisherMiddlewareFunc. +// It wraps the provided PublisherFunc with additional functionality to be executed after publishing a message. +func HelloWorldMiddlewareExecuteAfterPublisher() interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, m interfaces.Message) (err error) { + err = next(ctx, m) + if err != nil { + logrus.Error("got error while publishing the message: ", err) + return err + } + logrus.Info("hello-world-last-middleware executed") + return nil + } + } +} + +// HelloWorldMiddlewareExecuteBeforePublisher is a function that returns a PublisherMiddlewareFunc. +// It wraps the provided PublisherFunc with a middleware that logs a message before executing the next middleware or the actual publisher function. +func HelloWorldMiddlewareExecuteBeforePublisher() interfaces.PublisherMiddlewareFunc { + return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { + return func(ctx context.Context, e interfaces.Message) (err error) { + logrus.Info("hello-world-first-middleware executed") + return next(ctx, e) + } + } +} diff --git a/middleware/middleware.go b/middleware/middleware.go index 147deb7..d8d67c0 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -1,83 +1,29 @@ package middleware import ( - "context" - "github.com/bxcodec/goqueue/interfaces" - "github.com/sirupsen/logrus" ) -// ApplyHandlerMiddleware applies a series of middleware functions to an inbound message handler function. -// It takes an inbound message handler function `h` and a variadic list of middleware functions `middleware`. -// Each middleware function is applied to the handler function in the order they are provided. -// The resulting handler function with all the middleware applied is returned. -func ApplyHandlerMiddleware(h interfaces.InboundMessageHandlerFunc, middlewares ...interfaces.InboundMessageHandlerMiddlewareFunc) interfaces.InboundMessageHandlerFunc { +// ApplyHandlerMiddleware applies a series of middlewares to an inbound message handler function. +// It takes an inbound message handler function and a variadic list of middlewares as input. +// Each middleware is applied to the handler function in the order they are provided. +// The resulting function is returned as the final handler function with all the middlewares applied. +func ApplyHandlerMiddleware(h interfaces.InboundMessageHandlerFunc, + middlewares ...interfaces.InboundMessageHandlerMiddlewareFunc) interfaces.InboundMessageHandlerFunc { for _, middleware := range middlewares { h = middleware(h) } return h } -// ApplyPublisherMiddleware applies the given publisher middleware functions to the provided publisher function. -// It iterates over the middleware functions and applies them in the order they are provided. +// ApplyPublisherMiddleware applies a series of middlewares to a publisher function. +// It takes a publisher function and a variadic list of publisher middleware functions as input. +// Each middleware function is applied to the publisher function in the order they are provided. // The resulting publisher function is returned. -func ApplyPublisherMiddleware(p interfaces.PublisherFunc, middlewares ...interfaces.PublisherMiddlewareFunc) interfaces.PublisherFunc { +func ApplyPublisherMiddleware(p interfaces.PublisherFunc, + middlewares ...interfaces.PublisherMiddlewareFunc) interfaces.PublisherFunc { for _, middleware := range middlewares { p = middleware(p) } return p } - -// HelloWorldMiddlewareExecuteAfterInboundMessageHandler returns an inbound message handler middleware function. -// It wraps the provided `next` inbound message handler function and executes some additional logic after it. -// The additional logic includes logging any error that occurred during the execution of the `next` function -// and logging a message indicating that the middleware has been executed. -func HelloWorldMiddlewareExecuteAfterInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc { - return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { - return func(ctx context.Context, m interfaces.InboundMessage) (err error) { - err = next(ctx, m) - if err != nil { - logrus.Error("Error: ", err, "add your custom error handler here, eg send to Sentry or other error tracking tools") - } - logrus.Info("hello-world-last-middleware executed") - return err - } - } -} - -// HelloWorldMiddlewareExecuteBeforeInboundMessageHandler returns a middleware function that logs a message before executing the handler. -func HelloWorldMiddlewareExecuteBeforeInboundMessageHandler() interfaces.InboundMessageHandlerMiddlewareFunc { - return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { - return func(ctx context.Context, m interfaces.InboundMessage) (err error) { - logrus.Info("hello-world-first-middleware executed") - return next(ctx, m) - } - } -} - -// HelloWorldMiddlewareExecuteAfterPublisher returns a PublisherMiddlewareFunc that executes after the publisher function. -// It logs any error that occurs during publishing and logs a message indicating that the last middleware has been executed. -func HelloWorldMiddlewareExecuteAfterPublisher() interfaces.PublisherMiddlewareFunc { - return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { - return func(ctx context.Context, m interfaces.Message) (err error) { - err = next(ctx, m) - if err != nil { - logrus.Error("got error while publishing the message: ", err) - return err - } - logrus.Info("hello-world-last-middleware executed") - return nil - } - } -} - -// HelloWorldMiddlewareExecuteBeforePublisher is a function that returns a PublisherMiddlewareFunc. -// It wraps the provided PublisherFunc with a middleware that logs a message before executing the next function. -func HelloWorldMiddlewareExecuteBeforePublisher() interfaces.PublisherMiddlewareFunc { - return func(next interfaces.PublisherFunc) interfaces.PublisherFunc { - return func(ctx context.Context, e interfaces.Message) (err error) { - logrus.Info("hello-world-first-middleware executed") - return next(ctx, e) - } - } -} diff --git a/options/consumer/consumer.go b/options/consumer/consumer.go index 973435d..e7414a4 100644 --- a/options/consumer/consumer.go +++ b/options/consumer/consumer.go @@ -12,18 +12,31 @@ const ( ) // ConsumerOption represents the configuration options for the consumer. +// ConsumerOption represents the options for configuring a consumer. type ConsumerOption struct { // BatchMessageSize specifies the maximum number of messages to be processed in a single batch. BatchMessageSize int + // QueueName specifies the name of the queue to consume messages from. QueueName string + // Middlewares is a list of middleware functions to be applied to the inbound message handler. - Middlewares []interfaces.InboundMessageHandlerMiddlewareFunc + Middlewares []interfaces.InboundMessageHandlerMiddlewareFunc + + // ActionsPatternSubscribed specifies the list of action patterns that the consumer is subscribed to. ActionsPatternSubscribed []string - TopicName string - MaxRetryFailedMessage int64 - ConsumerID string - RabbitMQConsumerConfig *RabbitMQConsumerConfig // optional, only if using RabbitMQ + + // TopicName specifies the name of the topic to consume messages from. + TopicName string + + // MaxRetryFailedMessage specifies the maximum number of times a failed message should be retried. + MaxRetryFailedMessage int64 + + // ConsumerID specifies the unique identifier for the consumer. + ConsumerID string + + // RabbitMQConsumerConfig specifies the configuration for RabbitMQ consumer (optional, only if using RabbitMQ). + RabbitMQConsumerConfig *RabbitMQConsumerConfig } // ConsumerOptionFunc is a function type that takes an `opt` parameter of type `*ConsumerOption`. @@ -94,6 +107,7 @@ func WithRabbitMQConsumerConfig(rabbitMQOption *RabbitMQConsumerConfig) Consumer } } +// DefaultConsumerOption returns the default consumer option. var DefaultConsumerOption = func() *ConsumerOption { return &ConsumerOption{ Middlewares: []interfaces.InboundMessageHandlerMiddlewareFunc{}, @@ -102,9 +116,12 @@ var DefaultConsumerOption = func() *ConsumerOption { } } +// RabbitMQConsumerConfig represents the configuration for a RabbitMQ consumer. type RabbitMQConsumerConfig struct { + // ConsumerChannel is the channel used for consuming messages from RabbitMQ. ConsumerChannel *amqp.Channel - ReQueueChannel *amqp.Channel + // ReQueueChannel is the channel used for re-queuing messages in RabbitMQ. + ReQueueChannel *amqp.Channel } const ( diff --git a/options/goqueue.go b/options/goqueue.go index c493752..18137e3 100644 --- a/options/goqueue.go +++ b/options/goqueue.go @@ -6,12 +6,16 @@ import ( "github.com/bxcodec/goqueue/internal/publisher" ) +// GoQueueOption represents the options for configuring a GoQueue instance. type GoQueueOption struct { - // number of consumer/worker/goroutine that will be spawned in one goqueue instance + // NumberOfConsumer is the number of consumer/worker/goroutine that will be spawned in one GoQueue instance. NumberOfConsumer int - Consumer consumer.Consumer - Publisher publisher.Publisher - MessageHandler interfaces.InboundMessageHandler + // Consumer is the consumer implementation that will be used by the GoQueue instance. + Consumer consumer.Consumer + // Publisher is the publisher implementation that will be used by the GoQueue instance. + Publisher publisher.Publisher + // MessageHandler is the inbound message handler that will be used by the GoQueue instance. + MessageHandler interfaces.InboundMessageHandler } // GoQueueOptionFunc used for option chaining @@ -23,24 +27,41 @@ func DefaultGoQueueOption() *GoQueueOption { } } +// WithNumberOfConsumer sets the number of consumers for the GoQueue. +// It takes an integer value 'n' as input and updates the NumberOfConsumer field of the GoQueueOption struct. +// This option determines how many goroutines will be created to consume items from the queue concurrently. func WithNumberOfConsumer(n int) GoQueueOptionFunc { return func(opt *GoQueueOption) { opt.NumberOfConsumer = n } } +// WithConsumer sets the consumer for the GoQueueOption. +// It takes a consumer.Consumer as a parameter and returns a GoQueueOptionFunc. +// The GoQueueOptionFunc sets the Consumer field of the GoQueueOption. func WithConsumer(c consumer.Consumer) GoQueueOptionFunc { return func(opt *GoQueueOption) { opt.Consumer = c } } +// WithPublisher sets the publisher for the GoQueueOption. +// It takes a publisher.Publisher as a parameter and returns a GoQueueOptionFunc. +// The returned GoQueueOptionFunc sets the Publisher field of the GoQueueOption to the provided publisher. func WithPublisher(p publisher.Publisher) GoQueueOptionFunc { return func(opt *GoQueueOption) { opt.Publisher = p } } +// WithMessageHandler sets the inbound message handler for the GoQueueOption. +// The inbound message handler is responsible for processing incoming messages. +// It takes an instance of the interfaces.InboundMessageHandler interface as a parameter. +// Example usage: +// +// WithMessageHandler(func(message interfaces.InboundMessage) { +// // Process the incoming message +// }) func WithMessageHandler(h interfaces.InboundMessageHandler) GoQueueOptionFunc { return func(opt *GoQueueOption) { opt.MessageHandler = h diff --git a/options/platforms.go b/options/platforms.go index 2fb3270..fd976c9 100644 --- a/options/platforms.go +++ b/options/platforms.go @@ -1,5 +1,6 @@ package options +// Platform represents a platform in the system. type Platform string const ( diff --git a/options/publisher/publisher.go b/options/publisher/publisher.go index 4f49ef4..83ed0ed 100644 --- a/options/publisher/publisher.go +++ b/options/publisher/publisher.go @@ -22,18 +22,25 @@ type PublisherOption struct { // PublisherOptionFunc used for option chaining type PublisherOptionFunc func(opt *PublisherOption) +// WithPublisherID sets the publisher ID for the PublisherOption. +// It returns a PublisherOptionFunc that can be used to configure the PublisherOption. func WithPublisherID(id string) PublisherOptionFunc { return func(opt *PublisherOption) { opt.PublisherID = id } } +// WithMiddlewares sets the middlewares for the publisher. +// It accepts one or more PublisherMiddlewareFunc functions as arguments. +// These functions are used to modify the behavior of the publisher. +// The middlewares are applied in the order they are provided. func WithMiddlewares(middlewares ...interfaces.PublisherMiddlewareFunc) PublisherOptionFunc { return func(opt *PublisherOption) { opt.Middlewares = middlewares } } +// DefaultPublisherOption returns the default options for a publisher. var DefaultPublisherOption = func() *PublisherOption { return &PublisherOption{ Middlewares: []interfaces.PublisherMiddlewareFunc{}, @@ -41,11 +48,18 @@ var DefaultPublisherOption = func() *PublisherOption { } } +// RabbitMQPublisherConfig represents the configuration options for a RabbitMQ publisher. type RabbitMQPublisherConfig struct { + // PublisherChannelPoolSize specifies the size of the channel pool for publishing messages. PublisherChannelPoolSize int - Conn *amqp.Connection + + // Conn is the RabbitMQ connection to be used by the publisher. + Conn *amqp.Connection } +// WithRabbitMQPublisherConfig sets the RabbitMQ publisher configuration for the publisher option. +// It takes a RabbitMQPublisherConfig struct as input and returns a PublisherOptionFunc. +// The returned PublisherOptionFunc sets the RabbitMQPublisherConfig field of the PublisherOption struct. func WithRabbitMQPublisherConfig(rabbitMQOption *RabbitMQPublisherConfig) PublisherOptionFunc { return func(opt *PublisherOption) { opt.RabbitMQPublisherConfig = rabbitMQOption diff --git a/publisher/service.go b/publisher/service.go index 493bff6..0ce36b2 100644 --- a/publisher/service.go +++ b/publisher/service.go @@ -7,6 +7,9 @@ import ( publisherOpts "github.com/bxcodec/goqueue/options/publisher" ) +// NewPublisher creates a new publisher based on the specified platform. +// It accepts the platform type and optional publisher options. +// Returns the corresponding publisher implementation based on the platform. func NewPublisher(platform options.Platform, opts ...publisherOpts.PublisherOptionFunc) publisher.Publisher { switch platform { case publisherOpts.PublisherPlatformRabbitMQ: diff --git a/queueservice.go b/queueservice.go index 2bee415..19e3393 100644 --- a/queueservice.go +++ b/queueservice.go @@ -12,7 +12,7 @@ import ( "golang.org/x/sync/errgroup" ) -// QueueService represents a queue service that handles incoming messages. +// QueueService represents a service that handles message queuing operations. type QueueService struct { consumer consumer.Consumer // The consumer responsible for consuming messages from the queue. handler interfaces.InboundMessageHandler // The handler responsible for processing incoming messages. @@ -21,7 +21,8 @@ type QueueService struct { } // NewQueueService creates a new instance of QueueService with the provided options. -// It accepts zero or more OptionFunc functions to customize the behavior of the QueueService. +// It accepts a variadic parameter `opts` which allows configuring the QueueService. +// The options are applied in the order they are provided. // Returns a pointer to the created QueueService. func NewQueueService(opts ...options.GoQueueOptionFunc) *QueueService { opt := options.DefaultGoQueueOption() @@ -39,8 +40,6 @@ func NewQueueService(opts ...options.GoQueueOptionFunc) *QueueService { // Start starts the queue service by spawning multiple consumers to process messages. // It returns an error if the consumer or handler is not defined. // The method uses the provided context to manage the lifecycle of the consumers. -// Each consumer is assigned a unique consumer ID and the start time is recorded in the meta data. -// The method uses the errgroup package to manage the goroutines and waits for all consumers to finish. func (qs *QueueService) Start(ctx context.Context) (err error) { if qs.consumer == nil { return errors.New("consumer is not defined") @@ -64,7 +63,7 @@ func (qs *QueueService) Start(ctx context.Context) (err error) { } // Stop stops the queue service by stopping the consumer and closing the publisher. -// It returns an error if there is an issue stopping the consumer or closing the publisher. +// It returns an error if there was an issue stopping the consumer or closing the publisher. func (qs *QueueService) Stop(ctx context.Context) error { if qs.consumer == nil { return errors.New("consumer is not defined") @@ -81,7 +80,7 @@ func (qs *QueueService) Stop(ctx context.Context) error { return nil } -// Publish sends a message to the queue using the defined publisher. +// Publish publishes a message to the queue. // It returns an error if the publisher is not defined or if there was an error while publishing the message. func (qs *QueueService) Publish(ctx context.Context, m interfaces.Message) (err error) { if qs.publisher == nil {