Skip to content

Commit

Permalink
fix: linter (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec authored Jun 16, 2024
1 parent 90f5e49 commit e24518b
Show file tree
Hide file tree
Showing 19 changed files with 323 additions and 113 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Integration Testing
name: Integration Testing with RabbitMQ

on:
push:
Expand Down
55 changes: 32 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (

"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq"
"github.com/bxcodec/goqueue/interfaces"
"github.com/bxcodec/goqueue/middleware"
"github.com/bxcodec/goqueue/options"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
"github.com/bxcodec/goqueue/publisher"
rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq"
)

func initExchange(ch *amqp.Channel, exchangeName string) error {
Expand All @@ -62,9 +64,14 @@ func main() {
panic(err)
}

rmqPub := rmqPublisher.NewPublisher(rmqConn,
publisher.WithPublisherID("publisher_id"),
publisher.WithMiddlewares(
rmqPub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Conn: rmqConn,
PublisherChannelPoolSize: 5,
}),
publisherOpts.WithPublisherID("publisher_id"),
publisherOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
),
Expand All @@ -83,35 +90,36 @@ func main() {
panic(err)
}
defer consumerChannel.Close()

rmqConsumer := rmqConsumer.NewConsumer(
publisherChannel,
consumerChannel,
consumer.WithMiddlewares(
rmqConsumer := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: consumerChannel,
ReQueueChannel: publisherChannel,
}),
consumerOpts.WithConsumerID("consumer_id"),
consumerOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
),
consumer.WithQueueName("consumer_queue"),
consumer.WithConsumerID("consumer_id"),
consumer.WithBatchMessageSize(1),
consumer.WithMaxRetryFailedMessage(3),
consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumer.WithTopicName("goqueue"),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumerOpts.WithTopicName("goqueue"),
consumerOpts.WithQueueName("consumer_queue"),
)

queueSvc := goqueue.NewQueueService(
goqueue.WithPublisher(rmqPub),
goqueue.WithConsumer(rmqConsumer),
goqueue.WithMessageHandler(handler()),
options.WithConsumer(rmqConsumer),
options.WithPublisher(rmqPub),
options.WithMessageHandler(handler()),
)

go func() {
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"message": fmt.Sprintf("Hello World %d", i),
}
jbyt, _ := json.Marshal(data)
err := queueSvc.Publish(context.Background(), goqueue.Message{
err := queueSvc.Publish(context.Background(), interfaces.Message{
Data: data,
Action: "goqueue.payments.create",
Topic: "goqueue",
Expand All @@ -132,14 +140,15 @@ func main() {
}
}

func handler() goqueue.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
func handler() interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
data := m.Data
jbyt, _ := json.Marshal(data)
fmt.Println("Message Received: ", string(jbyt))
return m.Ack(ctx)
}
}

```

## Contribution
Expand Down
3 changes: 3 additions & 0 deletions consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)

// NewConsumer creates a new consumer based on the specified platform.
// It accepts a platform option and additional consumer option functions.
// It returns a consumer.Consumer interface implementation.
func NewConsumer(platform options.Platform, opts ...consumerOpts.ConsumerOptionFunc) consumer.Consumer {
switch platform {
case consumerOpts.ConsumerPlatformRabbitMQ:
Expand Down
4 changes: 4 additions & 0 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ var (
}
DefaultEncoding = JSONEncoding
)

func init() {
AddGoQueueEncoding(JSONEncoding.ContentType, JSONEncoding)
}
15 changes: 14 additions & 1 deletion errors/error.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package errors

const (
InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT"
InvalidMessageFormatCode = "INVALID_MESSAGE_FORMAT"
EncodingFormatNotSupported = "ENCODING_FORMAT_NOT_SUPPORTED"
UnKnownError = "UNKNOWN_ERROR"
)

// Error represents an error with a code and a message.
type Error struct {
Code string `json:"code"`
Message string `json:"message"`
Expand All @@ -14,7 +17,17 @@ func (e Error) Error() string {
}

var (
// ErrInvalidMessageFormat is an error that occurs when attempting to unmarshal a message with an invalid format.
ErrInvalidMessageFormat = Error{
Code: InvalidMessageFormatCode,
Message: "failed to unmarshal the message, removing the message due to wrong message format"}
// ErrEncodingFormatNotSupported is an error that indicates the encoding format is not supported.
ErrEncodingFormatNotSupported = Error{
Code: EncodingFormatNotSupported,
Message: "encoding format not supported. Please register the encoding format before using it"}

// ErrUnknownError is an error that indicates an unknown error occurred.
ErrUnknownError = Error{
Code: UnKnownError,
Message: "an unknown error occurred"}
)
2 changes: 2 additions & 0 deletions internal/consumer/rabbitmq/blackbox_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestSuiteRabbitMQConsumer(t *testing.T) {
t.Skip("Skip the Test Suite for RabbitMQ Consumer")
}

time.Sleep(5 * time.Second) // wait for the rabbitmq to be ready

rmqURL := os.Getenv("RABBITMQ_TEST_URL")
if rmqURL == "" {
rmqURL = "amqp://test:test@localhost:5672/test"
Expand Down
19 changes: 14 additions & 5 deletions internal/consumer/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,20 @@ func (r *rabbitMQ) initConsumer() {
r.msgReceiver = receiver
}

// Consume consumes messages from a RabbitMQ queue.
// It takes a context, an inbound message handler, and metadata as input parameters.
// The method continuously listens for messages from the queue and handles them using the provided handler.
// If the context is canceled, the method stops consuming messages and returns.
// The method returns an error if there was an issue consuming messages.
// Consume consumes messages from a RabbitMQ queue and handles them using the provided message handler.
// It takes a context, an inbound message handler, and a map of metadata as input parameters.
// The function continuously listens for messages from the queue and processes them until the context is canceled.
// If the context is canceled, the function stops consuming messages and returns.
// For each received message, the function builds an inbound message, extracts the retry count, and checks if the maximum retry count has been reached.
// If the maximum retry count has been reached, the message is moved to the dead letter queue.
// Otherwise, the message is passed to the message handler for processing.
// The message handler is responsible for handling the message and returning an error if any.
// If an error occurs while handling the message, it is logged.
// The function provides methods for acknowledging, rejecting, and moving messages to the dead letter queue.
// These methods can be used by the message handler to control the message processing flow.
// The function also logs information about the received message, such as the message ID, topic, action, and timestamp.
// It applies any configured middlewares to the message handler before calling it.
// The function returns an error if any occurred during message handling or if the context was canceled.
func (r *rabbitMQ) Consume(ctx context.Context,
h interfaces.InboundMessageHandler,
meta map[string]interface{}) (err error) {
Expand Down
2 changes: 2 additions & 0 deletions internal/publisher/rabbitmq/blackbox_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestSuiteRabbitMQPublisher(t *testing.T) {
t.Skip("Skip the Test Suite for RabbitMQ Publisher")
}

time.Sleep(5 * time.Second) // wait for the rabbitmq to be ready

rmqURL := os.Getenv("RABBITMQ_TEST_URL")
if rmqURL == "" {
rmqURL = "amqp://test:test@localhost:5672/test"
Expand Down
11 changes: 11 additions & 0 deletions internal/publisher/rabbitmq/channel_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ import (
"go.uber.org/multierr"
)

// ChannelPool represents a pool of AMQP channels used for publishing messages.
// It provides a way to manage and reuse AMQP channels efficiently.
type ChannelPool struct {
conn *amqp.Connection
mutex sync.Mutex
pool chan *amqp.Channel
maxSize int
}

// NewChannelPool creates a new ChannelPool instance.
// It takes an AMQP connection and the maximum size of the pool as parameters.
// It returns a pointer to the newly created ChannelPool.
func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool {
return &ChannelPool{
conn: conn,
Expand All @@ -23,6 +28,8 @@ func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool {
}
}

// Get returns a channel from the pool. If there are available channels in the pool, it returns one of them.
// Otherwise, it creates a new channel from the underlying connection and returns it.
func (cp *ChannelPool) Get() (*amqp.Channel, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand All @@ -35,6 +42,8 @@ func (cp *ChannelPool) Get() (*amqp.Channel, error) {
}
}

// Return returns a channel back to the channel pool.
// If the pool is full, the channel is closed.
func (cp *ChannelPool) Return(ch *amqp.Channel) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand All @@ -51,6 +60,8 @@ func (cp *ChannelPool) Return(ch *amqp.Channel) {
}
}

// Close closes the ChannelPool and all its associated channels.
// It returns an error if there was an error closing any of the channels.
func (cp *ChannelPool) Close() (err error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
Expand Down
32 changes: 31 additions & 1 deletion internal/publisher/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/errors"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/interfaces"
Expand All @@ -25,6 +26,32 @@ type rabbitMQ struct {
option *publisherOpts.PublisherOption
}

// NewPublisher creates a new instance of the publisher.Publisher interface
// using the provided options. It returns a publisher.Publisher implementation
// that utilizes RabbitMQ as the underlying message broker.
//
// The function accepts a variadic parameter `opts` of type
// `publisherOpts.PublisherOptionFunc`, which allows the caller to provide
// custom configuration options for the publisher.
//
// Example usage:
//
// publisher := NewPublisher(
// publisherOpts.PublisherPlatformRabbitMQ,
// publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
// Conn: rmqConn,
// PublisherChannelPoolSize: 5,
// }),
// publisherOpts.WithPublisherID("publisher_id"),
// publisherOpts.WithMiddlewares(
// middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
// middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
// ),
//
// )
//
// The returned publisher can be used to publish messages to the configured
// RabbitMQ exchange and routing key.
func NewPublisher(
opts ...publisherOpts.PublisherOptionFunc,
) publisher.Publisher {
Expand All @@ -47,6 +74,9 @@ func NewPublisher(
}
}

// Publish sends a message to the RabbitMQ exchange.
// It applies the default content type if not specified in the message.
// It also applies any registered middlewares before publishing the message.
func (r *rabbitMQ) Publish(ctx context.Context, m interfaces.Message) (err error) {
if m.ContentType == "" {
m.ContentType = publisherOpts.DefaultContentType
Expand Down Expand Up @@ -94,7 +124,7 @@ func (r *rabbitMQ) buildPublisher() interfaces.PublisherFunc {
m.ID = id
encoder, ok := goqueue.GetGoQueueEncoding(m.ContentType)
if !ok {
encoder = goqueue.DefaultEncoding
return errors.ErrEncodingFormatNotSupported
}

data, err := encoder.Encode(ctx, m)
Expand Down
61 changes: 61 additions & 0 deletions middleware/default_errormapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package middleware

import (
"context"

"github.com/bxcodec/goqueue/errors"
"github.com/bxcodec/goqueue/interfaces"
)

// PublisherDefaultErrorMapper returns a middleware function that maps publisher errors to specific error types.
// It takes a next PublisherFunc as input and returns a new PublisherFunc that performs error mapping.
// If an error occurs during publishing, it will be mapped to a specific error type based on the error code.
// The mapped error will be returned, or nil if no error occurred.
func PublisherDefaultErrorMapper() interfaces.PublisherMiddlewareFunc {
return func(next interfaces.PublisherFunc) interfaces.PublisherFunc {
return func(ctx context.Context, e interfaces.Message) (err error) {
err = next(ctx, e)
if err != nil {
switch err {
case errors.ErrInvalidMessageFormat:
return errors.ErrInvalidMessageFormat
case errors.ErrEncodingFormatNotSupported:
return errors.ErrEncodingFormatNotSupported
default:
return errors.Error{
Code: errors.UnKnownError,
Message: err.Error(),
}
}
}
return nil
}
}
}

// InboundMessageHandlerDefaultErrorMapper returns a middleware function that maps specific errors to predefined error types.
// It takes the next inbound message handler function as input and returns a new inbound message handler function.
// The returned function checks if an error occurred during the execution of the next handler function.
// If an error is found, it maps the error to a predefined error type and returns it.
// If no error is found, it returns nil.
func InboundMessageHandlerDefaultErrorMapper() interfaces.InboundMessageHandlerMiddlewareFunc {
return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
err = next(ctx, m)
if err != nil {
switch err {
case errors.ErrInvalidMessageFormat:
return errors.ErrInvalidMessageFormat
case errors.ErrEncodingFormatNotSupported:
return errors.ErrEncodingFormatNotSupported
default:
return errors.Error{
Code: errors.UnKnownError,
Message: err.Error(),
}
}
}
return nil
}
}
}
Loading

0 comments on commit e24518b

Please sign in to comment.