Skip to content

Commit

Permalink
Added backoff to check indexes and better k8 readiness probe
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepone committed Feb 29, 2020
1 parent 7bdfb7f commit 1a5e258
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 20 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/deckarep/golang-set v0.0.0-20171013212420-1d4478f51bed
github.com/gleez/pkg v0.0.0-20200218023659-166e6bcfab6c // indirect
github.com/gocarina/gocsv v0.0.0-20191122093448-c6a9c812ac26
github.com/jpillora/backoff v1.0.0
github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68
github.com/kr/pretty v0.1.0
github.com/kr/text v0.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gosimple/slug v1.9.1-0.20191111214030-853565075b0c/go.mod h1:U+aVV2WqQwxelhMY5iclMspW2WKUpbEMxecdkELxBLY=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68 h1:d2hBkTvi7B89+OXY8+bBBshPlc+7JYacGrG/dFak8SQ=
github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
40 changes: 37 additions & 3 deletions river/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/jpillora/backoff"
"github.com/juju/errors"
"github.com/thejerf/suture"

Expand Down Expand Up @@ -184,11 +185,39 @@ func (r *River) run() error {
r.sphinxToken = r.sup.Add(r.sphinxService)
r.sphinxService.WaitUntilStarted()

err = r.sphinxService.LoadSyncState(r.master.syncState())
b := &backoff.Backoff{
Min: 1 * time.Second,
Max: 20 * time.Minute,
Factor: 2,
Jitter: true,
}
defer b.Reset()

// get master state - wait until get state or timeout
for {
time.Sleep(b.Duration())

err = r.sphinxService.LoadSyncState(r.master.syncState())
if err == nil {
b.Reset()
break
}
}

if err != nil {
r.l.Errorf("one or more manticore backends are not up to date: %v", err)
} else {
return errors.Trace(err)
}

// check indexes are ready - wait until ready or timeout
for {
time.Sleep(b.Duration())

err = r.checkAllIndexesReady()
if err == nil {
b.Reset()
break
}
}

if err != nil {
Expand Down Expand Up @@ -333,7 +362,12 @@ func (r *River) checkAllIndexesReady() error {
}

if len(indexes) == len(r.c.DataSource) {
r.l.Errorf("Indexes not ready")
r.l.Errorf("All indexes not ready")
return errors.Trace(errIndexesNotReady)
}

if len(indexes) > 0 {
r.l.Errorf("%d index(s) not ready", len(indexes))
return errors.Trace(errIndexesNotReady)
}

Expand Down
43 changes: 26 additions & 17 deletions river/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,29 +223,29 @@ func (s *stat) run() (err error) {
s.startedAt = time.Now()

mux := http.NewServeMux()
mux.Handle("/stat", s)
mux.Handle("/healthz", handleHealthz(s.r))
mux.Handle("/readyz", handleReadyz(s.r))

mux.Handle("/debug/vars", expvar.Handler())
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
// kubernetes probes
mux.Handle("/healthz", handleHealthz(s.r))
mux.Handle("/readyz", handleReadyz(s))

// mux.Handle("/rebuild", handleRebuildRedir())
// mux.Handle("/rebuild/sync", handleRebuild(s.r, true))
// mux.Handle("/rebuild/async", handleRebuild(s.r, false))
// endpoints
mux.Handle("/stats", s)
mux.Handle("/maint", handleMaint(s.r))
mux.Handle("/wait", handleWaitForGTID(s.r))

// syncing - start/stop
mux.Handle("/syncing/start", handleStartSync(s.r, true))
mux.Handle("/syncing/stop", handleStopSync(s.r, true))
mux.Handle("/syncing/start/async", handleStopSync(s.r, false))
mux.Handle("/syncing/start/async", handleStartSync(s.r, false))
mux.Handle("/syncing/stop/async", handleStopSync(s.r, false))

mux.Handle("/maint", handleMaint(s.r))
mux.Handle("/wait", handleWaitForGTID(s.r))
// profiling
mux.Handle("/debug/vars", expvar.Handler())
mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))

stdLogger, logWriter := util.NewStdLogger(s.r.Log)
defer logWriter.Close()
Expand Down Expand Up @@ -286,9 +286,18 @@ func handleHealthz(r *River) http.HandlerFunc {
}

// Kubernetes readiness probe - ready
func handleReadyz(r *River) http.HandlerFunc {
func handleReadyz(s *stat) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if atomic.LoadInt32(&healthy) == 1 {
d := time.Now().Sub(s.startedAt)

// allows to take leadership or rolling update.
// wait for 2 mins and inform kubernetes if unsuccessful
if !s.r.isRunning && d.Seconds() < 65 {
w.WriteHeader(http.StatusNoContent)
return
}

if s.r.isRunning && atomic.LoadInt32(&healthy) == 1 {
w.WriteHeader(http.StatusNoContent)
return
}
Expand Down

0 comments on commit 1a5e258

Please sign in to comment.