liveness and readiness probe for asynq server #479
-
I have built an Asynq server and am looking to deploy it on Kubernetes, however I am not sure what the liveness and readiness probes should be set to? I see there is a heartbeat and healthchecker processes but I'm not exactly sure which one is responsible for what and whether there is an API exposed to call them... I see there is also the |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
@jack-evans Thank you for the question. The heartbeat and healthchecker goroutines you mentioned are the internals of asynq server and not meant for external use. The package current provides this You can provide your custom function to update the health status of your server and export the health status via an HTTP endpoint, to which the probe can make request. Example: Export health status of the server via /healthz HTTP endpoint package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"github.com/hibiken/asynq"
)
type Status int
const (
StatusUnkown Status = iota
StatusOK
StatusUnavailable
// ... other statuses
)
func (s Status) String() string {
switch s {
case StatusOK:
return "ok"
case StatusUnavailable:
return "unavailable"
}
return "unknown"
}
type HealthStatus struct {
mu sync.Mutex
s Status // guarded by mu
}
func (hs *HealthStatus) Get() Status {
hs.mu.Lock()
defer hs.mu.Unlock()
return hs.s
}
func (hs *HealthStatus) Set(s Status) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.s = s
}
func main() {
var hs HealthStatus
qsrv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{
HealthCheckFunc: func(err error) {
if err == nil {
hs.Set(StatusOK)
} else {
hs.Set(StatusUnavailable)
}
},
// ... [omitted]: Other Asynq server configuration
},
)
mux := asynq.NewServeMux()
// ... [omitted] task handler registeration
qsrv.Start(mux)
// Set up HTTP server to export health status via /healthz endpoint
var httpsrv http.Server
done := make(chan struct{})
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)
<-sigint
// We received an interrupt signal, shut down.
if err := httpsrv.Shutdown(context.Background()); err != nil {
log.Printf("HTTP server Shutdown: %v", err)
}
close(done)
}()
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
if s := hs.Get(); s != StatusOK {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error: %v", s)))
} else {
w.WriteHeader(200)
w.Write([]byte("ok"))
}
})
if err := httpsrv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("HTTP server ListenAndServe: %v", err)
}
<-done // Wait for termination signal
qsrv.Stop()
} Let me know if you have any questions and any feedback on the current API is appreciated :) |
Beta Was this translation helpful? Give feedback.
@jack-evans Thank you for the question.
(Disclaimer: I'm not an expert on Kubernetes so please take my advice with a grain of salt 😄 )
The heartbeat and healthchecker goroutines you mentioned are the internals of asynq server and not meant for external use.
The package current provides this
HealthCheckFunc
(and associatedHealthCheckInterval
) which checks whether the connection to the redis server (which is a critical dependency of asynq server) is healthy.You can provide your custom function to update the health status of your server and export the health status via an HTTP endpoint, to which the probe can make request.
Example: Export health status of the server via /healthz HTTP endpoint