Skip to content

Commit

Permalink
Merge pull request #43 from worldcoin/B-753_configs
Browse files Browse the repository at this point in the history
[B-753] Allow excluding visibility timeout extension
  • Loading branch information
mircobordoni authored Aug 22, 2022
2 parents 01915a5 + be0d164 commit 654a5f4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
52 changes: 27 additions & 25 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,33 @@ import (
log "github.com/sirupsen/logrus"
)

type Config struct {
QueueURL string
WorkersNum int
VisibilityTimeout int32
BatchSize int32
ExtendEnabled bool
}

type Consumer struct {
sqs *sqs.Client
handler Handler
queueURL string
workersNum int
visibilityTimeout int32
batchSize int32
wg *sync.WaitGroup
sqs *sqs.Client
handler Handler
wg *sync.WaitGroup
cfg Config
}

func NewConsumer(cfg aws.Config, queueURL string, visibilityTimeout, batchSize, workersNum int, handler Handler) *Consumer {
consumer := &Consumer{
sqs: sqs.NewFromConfig(cfg),
handler: handler,
queueURL: queueURL,
workersNum: workersNum,
visibilityTimeout: int32(visibilityTimeout),
batchSize: int32(batchSize),
wg: &sync.WaitGroup{},
func NewConsumer(awsCfg aws.Config, cfg Config, handler Handler) *Consumer {
return &Consumer{
sqs: sqs.NewFromConfig(awsCfg),
handler: handler,
wg: &sync.WaitGroup{},
cfg: cfg,
}

return consumer
}

func (c *Consumer) Consume(ctx context.Context) {
jobs := make(chan *Message)
for w := 1; w <= c.workersNum; w++ {
for w := 1; w <= c.cfg.WorkersNum; w++ {
go c.worker(ctx, jobs)
c.wg.Add(1)
}
Expand All @@ -51,8 +51,8 @@ loop:
break loop
default:
output, err := c.sqs.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: &c.queueURL,
MaxNumberOfMessages: c.batchSize,
QueueUrl: &c.cfg.QueueURL,
MaxNumberOfMessages: c.cfg.BatchSize,
WaitTimeSeconds: int32(5),
})
if err != nil {
Expand Down Expand Up @@ -81,7 +81,9 @@ func (c *Consumer) worker(ctx context.Context, messages <-chan *Message) {

func (c *Consumer) handleMsg(ctx context.Context, m *Message) error {
if c.handler != nil {
c.extend(ctx, m)
if c.cfg.ExtendEnabled {
c.extend(ctx, m)
}
if err := c.handler.Run(ctx, m); err != nil {
return m.ErrorResponse(err)
}
Expand All @@ -92,7 +94,7 @@ func (c *Consumer) handleMsg(ctx context.Context, m *Message) error {
}

func (c *Consumer) delete(ctx context.Context, m *Message) error {
_, err := c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{QueueUrl: &c.queueURL, ReceiptHandle: m.ReceiptHandle})
_, err := c.sqs.DeleteMessage(ctx, &sqs.DeleteMessageInput{QueueUrl: &c.cfg.QueueURL, ReceiptHandle: m.ReceiptHandle})
if err != nil {
log.WithError(err).Error("error removing message")
return fmt.Errorf("unable to delete message from the queue: %w", err)
Expand All @@ -103,9 +105,9 @@ func (c *Consumer) delete(ctx context.Context, m *Message) error {

func (c *Consumer) extend(ctx context.Context, m *Message) {
_, err := c.sqs.ChangeMessageVisibility(ctx, &sqs.ChangeMessageVisibilityInput{
QueueUrl: &c.queueURL,
QueueUrl: &c.cfg.QueueURL,
ReceiptHandle: m.ReceiptHandle,
VisibilityTimeout: c.visibilityTimeout,
VisibilityTimeout: c.cfg.VisibilityTimeout,
})
if err != nil {
log.WithError(err).Error("unable to extend message")
Expand Down
18 changes: 16 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ func TestConsume(t *testing.T) {
expectedMsg := TestMsg{Name: "TestName"}

msgHandler := handler(t, expectedMsg)
consumer := NewConsumer(awsCfg, *queueUrl, visibilityTimeout, batchSize, workersNum, msgHandler)
config := Config{
QueueURL: *queueUrl,
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
consumer := NewConsumer(awsCfg, config, msgHandler)
go consumer.Consume(ctx)

t.Cleanup(func() {
Expand Down Expand Up @@ -82,7 +89,14 @@ func TestConsume_GracefulShutdown(t *testing.T) {
queueName := strings.ToLower(t.Name())
queueUrl := createQueue(t, ctx, awsCfg, queueName)

consumer := NewConsumer(awsCfg, *queueUrl, visibilityTimeout, batchSize, workersNum, &MsgHandler{})
config := Config{
QueueURL: *queueUrl,
WorkersNum: workersNum,
VisibilityTimeout: visibilityTimeout,
BatchSize: batchSize,
ExtendEnabled: true,
}
consumer := NewConsumer(awsCfg, config, &MsgHandler{})
go func() {
time.Sleep(time.Second * 1)
// Cancel context to trigger graceful shutdown
Expand Down

0 comments on commit 654a5f4

Please sign in to comment.