Skip to content

high performance golang AWS SQS consumer and SNS/SQS message publisher

License

Notifications You must be signed in to change notification settings

bernardopericacho/htsqs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

16 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

HTSQS

Latest Version PkgGoDev Build Status

HTSQS is a high throughput golang AWS SQS consumer and SNS/SQS message publisher.

Install

go get -u github.com/bernardopericacho/htsqs

Features

  • High throughput - a subscriber has the ability to create multiple consumers that concurrently receive messages from AWS SQS and push them into a single channel for consumption
  • Late ACK - mechanism for acknowledging messages once they have been processed
  • Message visibility modify message visibility
  • Error processing - error processing to decide whether to stop consuming and exponential backoff setup when errors occur
  • Graceful shutdown

Getting started

Consume messages from an AWS SQS Queue

package main

import (
    
    "log"
    
    "github.com/bernardopericacho/htsqs/subscriber"
)

func main() {
    // Create a new subscriber, assuming we are configuring our credentials following 
	// environment variables or IAM Roles: https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html
    subs := subscriber.New(subscriber.Config{SqsQueueURL: <MY_SQS_QUEUE_URL>})
    // Call consume
    messagesCh, errCh, err := subs.Consume()
    if err != nil {
        log.Fatal("Error when trying to consume messages from the SQS Queue")
    }
    // Loop over all the messages or errors
    for {
        select {
            case msg := <-messagesCh:
                log.Println("received new message", msg)
            case err := <-errCh:
                log.Println("received new error", err)
        }
    }
}

Create a worker service to consume from a SQS queue

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/bernardopericacho/htsqs/subscriber"
)

func main() {
    cfg := subscriber.WorkerConfig{
		Subscriber: subscriber.New(subscriber.Config{
			SqsQueueURL: "",
		}),
	}
	
	worker := subscriber.NewWorker(cfg)
	ctx := context.TODO()
	if err := worker.Start(ctx); err != subscriber.ErrWorkerClosed {
		stopErr := worker.Stop()
		if stopErr != nil {
			log.Printf("Worker start failed: %v\n", fmt.Errorf("%s: %w", stopErr.Error(), err))
		} else {
			log.Println("Worker start failed")
		}
	}
}

License

This project is licensed under MIT License.