Skip to content

Commit

Permalink
feat: add publisher test
Browse files Browse the repository at this point in the history
  • Loading branch information
bxcodec committed Jun 9, 2024
1 parent faa9393 commit 011f0d6
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 29 deletions.
10 changes: 7 additions & 3 deletions encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var (
err = json.Unmarshal(data, &m)
return
}

DefaultEncoder EncoderFn = JSONEncoder
DefaultDecoder DecoderFn = JSONDecoder
)

var (
Expand All @@ -32,11 +35,11 @@ func AddGoquEncoding(contentType headerVal.ContentType, encoding *Encoding) {
GoquEncodingMap.Store(contentType, encoding)
}

func GetGoquEncoding(contentType headerVal.ContentType) *Encoding {
func GetGoquEncoding(contentType headerVal.ContentType) (res *Encoding, ok bool) {
if encoding, ok := GoquEncodingMap.Load(contentType); ok {
return encoding.(*Encoding)
return encoding.(*Encoding), ok
}
return nil
return nil, false
}

type Encoding struct {
Expand All @@ -51,4 +54,5 @@ var (
Encode: JSONEncoder,
Decode: JSONDecoder,
}
DefaultEncoding = JSONEncoding
)
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
Expand Down
186 changes: 186 additions & 0 deletions publisher/rabbitmq/blackbox_publisher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package rabbitmq_test

import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"

"github.com/bxcodec/goqueue"
headerKey "github.com/bxcodec/goqueue/headers/key"
headerVal "github.com/bxcodec/goqueue/headers/value"
"github.com/bxcodec/goqueue/publisher"
rmq "github.com/bxcodec/goqueue/publisher/rabbitmq"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

const (
rabbitMQTestQueueName = "testqueuesubscriber"
testExchange = "goqueue-exchange-test"
testAction = "goqueue.action.test"
testActionRequeue = "goqueue.action.testrequeue"
)

type rabbitMQTestSuite struct {
suite.Suite
rmqURL string
conn *amqp.Connection
publishChannel *amqp.Channel
consumerChannel *amqp.Channel
queue *amqp.Queue
}

func TestSuiteRabbitMQPublisher(t *testing.T) {
if testing.Short() {
t.Skip("Skip the Test Suite for RabbitMQ Publisher")
}

rmqURL := os.Getenv("RABBITMQ_TEST_URL")
if rmqURL == "" {
rmqURL = "amqp://test:test@localhost:5672/test"
}

rabbitMQTestSuite := &rabbitMQTestSuite{
rmqURL: rmqURL,
}
logrus.SetLevel(logrus.DebugLevel)
logrus.SetFormatter(&logrus.JSONFormatter{})
rabbitMQTestSuite.initConnection(t)
suite.Run(t, rabbitMQTestSuite)
}

func (s *rabbitMQTestSuite) BeforeTest(_, _ string) {
}

func (s *rabbitMQTestSuite) AfterTest(_, _ string) {
_, err := s.consumerChannel.QueuePurge(rabbitMQTestQueueName, true) // force purge the queue after test
s.Require().NoError(err)
}

func (s *rabbitMQTestSuite) TearDownSuite() {
err := s.publishChannel.Close()
s.Require().NoError(err)
_, err = s.consumerChannel.QueuePurge(rabbitMQTestQueueName, true) // force purge the queue after test suite done
s.Require().NoError(err)
err = s.consumerChannel.Close()
s.Require().NoError(err)
err = s.conn.Close()
s.Require().NoError(err)
}

func (s *rabbitMQTestSuite) initConnection(t *testing.T) {
var err error
s.conn, err = amqp.Dial(s.rmqURL)
require.NoError(t, err)
s.publishChannel, err = s.conn.Channel()
require.NoError(t, err)
s.consumerChannel, err = s.conn.Channel()
require.NoError(t, err)
err = s.publishChannel.ExchangeDeclare(
testExchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
require.NoError(t, err)
}

func (s *rabbitMQTestSuite) getMockData(action string, identifier string) (res goqueue.Message) {
res = goqueue.Message{
Action: action,
ID: identifier,
Topic: testExchange,
ContentType: headerVal.ContentTypeJSON,
Data: map[string]interface{}{
"message": "hello-world-test",
},
Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
}

return res
}

func (s *rabbitMQTestSuite) TestPublisher() {
publisher := rmq.NewPublisher(s.conn,
publisher.WithPublisherID("test-publisher"),
)

queueSvc := goqueue.NewQueueService(
goqueue.WithPublisher(publisher),
)
var err error
totalPublishedMessage := 10
for i := 0; i < totalPublishedMessage; i++ {
err = queueSvc.Publish(context.Background(), s.getMockData(testAction, fmt.Sprintf("test-id-%d", i)))
s.Require().NoError(err)
}

// assertion event
// Ensure the published message is correct
msgs, err := s.consumerChannel.Consume(
rabbitMQTestQueueName, // queue
"testing-consumer", // consumer
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
s.Require().NoError(err)

done := make(chan bool)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*6) // increase this context if want to test a long running worker
defer cancel()
go func() {
totalMessages := 0
for {
select {
case <-ctx.Done():
done <- true
case d := <-msgs:
var content map[string]interface{}
inErr := json.Unmarshal(d.Body, &content)

/*
headerKey.MessageID: id,
headerKey.PublishedTimestamp: timestamp.Format(time.RFC3339),
headerKey.RetryCount: 0,
headerKey.ContentType: m.ContentType,
headerKey.QueueServiceAgent: headerVal.RabbitMQ,
headerKey.SchemaVer: headerVal.GoquMessageSchemaVersionV1,
*/
s.Require().NoError(inErr)
s.Require().Contains(content, "data")
s.Require().Equal(string(headerVal.ContentTypeJSON), d.ContentType)
s.Require().Equal(string(headerVal.ContentTypeJSON), d.Headers[headerKey.ContentType])
s.Require().Contains(d.Headers, headerKey.MessageID)
s.Require().Contains(d.Headers, headerKey.PublishedTimestamp)
s.Require().Contains(d.Headers, headerKey.RetryCount)
s.Require().Contains(d.Headers, headerKey.ContentType)
s.Require().Contains(d.Headers, headerKey.QueueServiceAgent)
s.Require().Contains(d.Headers, headerKey.SchemaVer)
s.Require().Equal("test-publisher", d.Headers[headerKey.AppID])
inErr = d.Ack(false)
s.Require().NoError(inErr)
totalMessages++
if totalMessages == totalPublishedMessage {
done <- true
}
}
}
}()

logrus.Printf("waiting for the message to be consumed")
<-done

err = publisher.Close(context.Background())
s.Require().NoError(err)
}
72 changes: 72 additions & 0 deletions publisher/rabbitmq/channel_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package rabbitmq

import (
"log"
"sync"

amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/multierr"
)

type ChannelPool struct {
conn *amqp.Connection
mutex sync.Mutex
pool chan *amqp.Channel
maxSize int
}

func NewChannelPool(conn *amqp.Connection, maxSize int) *ChannelPool {
return &ChannelPool{
conn: conn,
maxSize: maxSize,
pool: make(chan *amqp.Channel, maxSize),
}
}

func (cp *ChannelPool) Get() (*amqp.Channel, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()

select {
case ch := <-cp.pool:
return ch, nil
default:
return cp.conn.Channel()
}
}

func (cp *ChannelPool) Return(ch *amqp.Channel) {
cp.mutex.Lock()
defer cp.mutex.Unlock()

select {
case cp.pool <- ch:
// channel returned to pool
default:
// pool is full, close the channel
err := ch.Close()
if err != nil {
log.Printf("Error closing RabbitMQ channel: %s", err)
}
}
}

func (cp *ChannelPool) Close() (err error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()

var errs = make([]error, 0)
close(cp.pool)
for ch := range cp.pool {
err := ch.Close()
if err != nil {
log.Printf("Error closing channel: %s", err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
log.Printf("Error closing channels: %v", errs)
return multierr.Combine(errs...)
}
return nil
}
Loading

0 comments on commit 011f0d6

Please sign in to comment.