Skip to content

Commit

Permalink
refactor: hide unnecessary fns (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec authored Jun 16, 2024
1 parent dfdf7c0 commit 90f5e49
Show file tree
Hide file tree
Showing 30 changed files with 513 additions and 604 deletions.
83 changes: 0 additions & 83 deletions consumer/option.go

This file was deleted.

22 changes: 22 additions & 0 deletions consumer/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package consumer

import (
"github.com/bxcodec/goqueue/internal/consumer"
"github.com/bxcodec/goqueue/internal/consumer/rabbitmq"
"github.com/bxcodec/goqueue/options"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
)

func NewConsumer(platform options.Platform, opts ...consumerOpts.ConsumerOptionFunc) consumer.Consumer {
switch platform {
case consumerOpts.ConsumerPlatformRabbitMQ:
return rabbitmq.NewConsumer(opts...)
case consumerOpts.ConsumerPlatformGooglePubSub:
// TODO (bxcodec): implement google pubsub publisher
case consumerOpts.ConsumerPlatformSQS:
// TODO (bxcodec): implement sns publisher
default:
panic("unknown publisher platform")
}
return nil
}
9 changes: 5 additions & 4 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,25 @@ import (
"sync"

headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/interfaces"
)

// EncoderFn is a function type that encodes a message into a byte slice.
// It takes a context and a message as input and returns the encoded data and an error (if any).
type EncoderFn func(ctx context.Context, m Message) (data []byte, err error)
type EncoderFn func(ctx context.Context, m interfaces.Message) (data []byte, err error)

// DecoderFn is a function type that decodes a byte slice into a Message.
// It takes a context and a byte slice as input and returns a Message and an error.
type DecoderFn func(ctx context.Context, data []byte) (m Message, err error)
type DecoderFn func(ctx context.Context, data []byte) (m interfaces.Message, err error)

var (
// JSONEncoder is an implementation of the EncoderFn interface
// that encodes a Message into JSON format.
JSONEncoder EncoderFn = func(ctx context.Context, m Message) (data []byte, err error) {
JSONEncoder EncoderFn = func(ctx context.Context, m interfaces.Message) (data []byte, err error) {
return json.Marshal(m)
}
// JSONDecoder is a DecoderFn implementation that decodes JSON data into a Message.
JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m Message, err error) {
JSONDecoder DecoderFn = func(ctx context.Context, data []byte) (m interfaces.Message, err error) {
err = json.Unmarshal(data, &m)
return
}
Expand Down
54 changes: 31 additions & 23 deletions examples/rabbitmq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (

"github.com/bxcodec/goqueue"
"github.com/bxcodec/goqueue/consumer"
rmqConsumer "github.com/bxcodec/goqueue/consumer/rabbitmq"
"github.com/bxcodec/goqueue/interfaces"
"github.com/bxcodec/goqueue/middleware"
"github.com/bxcodec/goqueue/options"
consumerOpts "github.com/bxcodec/goqueue/options/consumer"
publisherOpts "github.com/bxcodec/goqueue/options/publisher"
"github.com/bxcodec/goqueue/publisher"
rmqPublisher "github.com/bxcodec/goqueue/publisher/rabbitmq"
)

func initExchange(ch *amqp.Channel, exchangeName string) error {
Expand All @@ -35,9 +37,14 @@ func main() {
panic(err)
}

rmqPub := rmqPublisher.NewPublisher(rmqConn,
publisher.WithPublisherID("publisher_id"),
publisher.WithMiddlewares(
rmqPub := publisher.NewPublisher(
publisherOpts.PublisherPlatformRabbitMQ,
publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{
Conn: rmqConn,
PublisherChannelPoolSize: 5,
}),
publisherOpts.WithPublisherID("publisher_id"),
publisherOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteBeforePublisher(),
middleware.HelloWorldMiddlewareExecuteAfterPublisher(),
),
Expand All @@ -56,35 +63,36 @@ func main() {
panic(err)
}
defer consumerChannel.Close()

rmqConsumer := rmqConsumer.NewConsumer(
publisherChannel,
consumerChannel,
consumer.WithMiddlewares(
rmqConsumer := consumer.NewConsumer(
consumerOpts.ConsumerPlatformRabbitMQ,
consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{
ConsumerChannel: consumerChannel,
ReQueueChannel: publisherChannel,
}),
consumerOpts.WithConsumerID("consumer_id"),
consumerOpts.WithMiddlewares(
middleware.HelloWorldMiddlewareExecuteAfterInboundMessageHandler(),
middleware.HelloWorldMiddlewareExecuteBeforeInboundMessageHandler(),
),
consumer.WithQueueName("consumer_queue"),
consumer.WithConsumerID("consumer_id"),
consumer.WithBatchMessageSize(1),
consumer.WithMaxRetryFailedMessage(3),
consumer.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumer.WithTopicName("goqueue"),
consumerOpts.WithMaxRetryFailedMessage(3),
consumerOpts.WithBatchMessageSize(1),
consumerOpts.WithActionsPatternSubscribed("goqueue.payments.#", "goqueue.users.#"),
consumerOpts.WithTopicName("goqueue"),
consumerOpts.WithQueueName("consumer_queue"),
)

queueSvc := goqueue.NewQueueService(
goqueue.WithPublisher(rmqPub),
goqueue.WithConsumer(rmqConsumer),
goqueue.WithMessageHandler(handler()),
options.WithConsumer(rmqConsumer),
options.WithPublisher(rmqPub),
options.WithMessageHandler(handler()),
)

go func() {
for i := 0; i < 10; i++ {
data := map[string]interface{}{
"message": fmt.Sprintf("Hello World %d", i),
}
jbyt, _ := json.Marshal(data)
err := queueSvc.Publish(context.Background(), goqueue.Message{
err := queueSvc.Publish(context.Background(), interfaces.Message{
Data: data,
Action: "goqueue.payments.create",
Topic: "goqueue",
Expand All @@ -105,8 +113,8 @@ func main() {
}
}

func handler() goqueue.InboundMessageHandlerFunc {
return func(ctx context.Context, m goqueue.InboundMessage) (err error) {
func handler() interfaces.InboundMessageHandlerFunc {
return func(ctx context.Context, m interfaces.InboundMessage) (err error) {
data := m.Data
jbyt, _ := json.Marshal(data)
fmt.Println("Message Received: ", string(jbyt))
Expand Down
2 changes: 1 addition & 1 deletion delayfn.go → interfaces/delayfn.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqueue
package interfaces

type DelayFn func(retries int64) (delay int64)

Expand Down
16 changes: 1 addition & 15 deletions consumer.go → interfaces/inboundmessagehandler.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,7 @@
package goqueue
package interfaces

import "context"

// Consumer represents an entity that consumes messages from a queue.
//
//go:generate mockery --name Consumer
type Consumer interface {
// Consume consumes messages from the queue and passes them to the provided handler.
// It takes a context, an InboundMessageHandler, and a map of metadata as parameters.
// It returns an error if there was a problem consuming the messages.
Consume(ctx context.Context, handler InboundMessageHandler, meta map[string]interface{}) (err error)

// Stop stops the consumer from consuming messages.
// It takes a context as a parameter and returns an error if there was a problem stopping the consumer.
Stop(ctx context.Context) (err error)
}

//go:generate mockery --name InboundMessageHandler
type InboundMessageHandler interface {
HandleMessage(ctx context.Context, m InboundMessage) (err error)
Expand Down
2 changes: 1 addition & 1 deletion message.go → interfaces/message.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package goqueue
package interfaces

import (
"time"
Expand Down
10 changes: 1 addition & 9 deletions publisher.go → interfaces/publisher.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package goqueue
package interfaces

import "context"

// Publisher represents an interface for publishing messages.
//
//go:generate mockery --name Publisher
type Publisher interface {
PublisherHandler
Close(ctx context.Context) (err error)
}

// PublisherHandler is an interface that defines the behavior of a message publisher.
//
//go:generate mockery --name PublisherHandler
Expand Down
21 changes: 21 additions & 0 deletions internal/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package consumer

import (
"context"

"github.com/bxcodec/goqueue/interfaces"
)

// Consumer represents an entity that consumes messages from a queue.
//
//go:generate mockery --name Consumer
type Consumer interface {
// Consume consumes messages from the queue and passes them to the provided handler.
// It takes a context, an InboundMessageHandler, and a map of metadata as parameters.
// It returns an error if there was a problem consuming the messages.
Consume(ctx context.Context, handler interfaces.InboundMessageHandler, meta map[string]interface{}) (err error)

// Stop stops the consumer from consuming messages.
// It takes a context as a parameter and returns an error if there was a problem stopping the consumer.
Stop(ctx context.Context) (err error)
}
Loading

0 comments on commit 90f5e49

Please sign in to comment.