Package jobworker provides a generic interface around message queue.
The jobworker package must be used in conjunction with some message queue connector.
list of connectors:
- go-jwdk/aws-sqs-connector/sqs
- go-jwdk/activemq-connector/activemq
- go-jwdk/db-connector/mysql
- go-jwdk/db-connector/postgres
- go-jwdk/db-connector/sqlite3
Go 1.13+
This package can be installed with the go get command:
$ go get -u github.com/go-jwdk/jobworker
Implements worker processes using go-jwdk/awa-sqs-connector.
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
_ "github.com/go-jwdk/aws-sqs-connector"
jw "github.com/go-jwdk/jobworker"
)
func main() {
sqs, err := jw.Open("sqs", map[string]interface{}{
"Region": os.Getenv("REGION"),
"NumMaxRetries": 3,
})
if err != nil {
log.Println("Could not open a sqs conn", err)
return
}
sqs.SetLoggerFunc(log.Println)
worker, err := jw.New(&jw.Setting{
Primary: sqs,
LoggerFunc: log.Println,
})
if err != nil {
log.Println("Could not create a job worker", err)
return
}
worker.Register("test", &HelloWorker{},
jw.SubscribeMetadata("PollingInterval", "3"),
jw.SubscribeMetadata("VisibilityTimeout", "20"),
jw.SubscribeMetadata("WaitTimeSeconds", "10"),
jw.SubscribeMetadata("MaxNumberOfJobs", "4"))
go func() {
log.Println("Start work")
err := worker.Work(&jw.WorkSetting{
WorkerConcurrency: 5,
})
if err != nil {
log.Println("Failed to work", err)
return
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGTERM)
<-quit
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
log.Println("Received a signal of graceful shutdown")
if err := worker.Shutdown(ctx); err != nil {
log.Println("Failed to graceful shutdown:", err)
}
log.Println("Completed graceful shutdown")
}
type HelloWorker struct {
}
func (HelloWorker) Work(job *jw.Job) error {
log.Println("[HelloWorker]", job.Content)
return nil
}
Implements job enqueue.
sqs, err := jw.Open("sqs", map[string]interface{}{
"Region": os.Getenv("REGION"),
"NumMaxRetries": 3,
})
if err != nil {
log.Println("Could not open a sqs conn", err)
return
}
sqs.SetLoggerFunc(log.Println)
worker, err := jw.New(&jw.Setting{
Primary: sqs,
LoggerFunc: log.Println,
})
if err != nil {
log.Println("Could not create a job worker", err)
return
}
_, err := worker.Enqueue(context.Background(), &jw.EnqueueInput{
Queue: "test",
Content: fmt.Sprintf(`{"msg":"%s"}`, uuid.NewV4().String()),
Metadata: map[string]string{
"MessageDelaySeconds": "3",
},
})
if err != nil {
log.Println("Failed to enqueue", err)
}
Set up primary and secondary connectors.
- Primary: go-jwdk/awa-sqs-connector/sqs
- Secondary: go-jwdk/db-connector/mysql
import (
jw "github.com/go-jwdk/jobworker"
_ "github.com/go-jwdk/awa-sqs-connector"
_ "github.com/go-jwdk/db-connector/mysql"
)
sqs, err := jobworker.Open("sqs", map[string]interface{}{
"Region": os.Getenv("REGION"),
})
mysql, err := jobworker.Open("mysql", map[string]interface{}{
"DSN": "test-db",
"MaxOpenConns": 3,
"MaxMaxIdleConns": 3,
"ConnMaxLifetime": time.Minute,
"NumMaxRetries": 3,
})
jw, err := jw.New(&jw.Setting{
Primary: sqs,
Secondary: mysql,
})