From 1a5e258b8f4db814df7cf2f108997052d492e7ce Mon Sep 17 00:00:00 2001 From: Gleez Technologies Date: Sat, 29 Feb 2020 11:11:48 +0000 Subject: [PATCH] Added backoff to check indexes and better k8 readiness probe --- go.mod | 1 + go.sum | 2 ++ river/river.go | 40 +++++++++++++++++++++++++++++++++++++--- river/status.go | 43 ++++++++++++++++++++++++++----------------- 4 files changed, 66 insertions(+), 20 deletions(-) diff --git a/go.mod b/go.mod index 2116d5d..99bced1 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/deckarep/golang-set v0.0.0-20171013212420-1d4478f51bed github.com/gleez/pkg v0.0.0-20200218023659-166e6bcfab6c // indirect github.com/gocarina/gocsv v0.0.0-20191122093448-c6a9c812ac26 + github.com/jpillora/backoff v1.0.0 github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68 github.com/kr/pretty v0.1.0 github.com/kr/text v0.1.0 diff --git a/go.sum b/go.sum index f3be55a..8e7ea0b 100644 --- a/go.sum +++ b/go.sum @@ -31,6 +31,8 @@ github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/gosimple/slug v1.9.1-0.20191111214030-853565075b0c/go.mod h1:U+aVV2WqQwxelhMY5iclMspW2WKUpbEMxecdkELxBLY= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68 h1:d2hBkTvi7B89+OXY8+bBBshPlc+7JYacGrG/dFak8SQ= github.com/juju/errors v0.0.0-20170703010042-c7d06af17c68/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= diff --git a/river/river.go b/river/river.go index 8b13c8a..70424b7 100644 --- a/river/river.go +++ b/river/river.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/jpillora/backoff" "github.com/juju/errors" "github.com/thejerf/suture" @@ -184,11 +185,39 @@ func (r *River) run() error { r.sphinxToken = r.sup.Add(r.sphinxService) r.sphinxService.WaitUntilStarted() - err = r.sphinxService.LoadSyncState(r.master.syncState()) + b := &backoff.Backoff{ + Min: 1 * time.Second, + Max: 20 * time.Minute, + Factor: 2, + Jitter: true, + } + defer b.Reset() + + // get master state - wait until get state or timeout + for { + time.Sleep(b.Duration()) + + err = r.sphinxService.LoadSyncState(r.master.syncState()) + if err == nil { + b.Reset() + break + } + } + if err != nil { r.l.Errorf("one or more manticore backends are not up to date: %v", err) - } else { + return errors.Trace(err) + } + + // check indexes are ready - wait until ready or timeout + for { + time.Sleep(b.Duration()) + err = r.checkAllIndexesReady() + if err == nil { + b.Reset() + break + } } if err != nil { @@ -333,7 +362,12 @@ func (r *River) checkAllIndexesReady() error { } if len(indexes) == len(r.c.DataSource) { - r.l.Errorf("Indexes not ready") + r.l.Errorf("All indexes not ready") + return errors.Trace(errIndexesNotReady) + } + + if len(indexes) > 0 { + r.l.Errorf("%d index(s) not ready", len(indexes)) return errors.Trace(errIndexesNotReady) } diff --git a/river/status.go b/river/status.go index d023fb1..c64aa5a 100644 --- a/river/status.go +++ b/river/status.go @@ -223,29 +223,29 @@ func (s *stat) run() (err error) { s.startedAt = time.Now() mux := http.NewServeMux() - mux.Handle("/stat", s) - mux.Handle("/healthz", handleHealthz(s.r)) - mux.Handle("/readyz", handleReadyz(s.r)) - mux.Handle("/debug/vars", expvar.Handler()) - mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) - mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) - mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) - mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) - mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + // kubernetes probes + mux.Handle("/healthz", handleHealthz(s.r)) + mux.Handle("/readyz", handleReadyz(s)) - // mux.Handle("/rebuild", handleRebuildRedir()) - // mux.Handle("/rebuild/sync", handleRebuild(s.r, true)) - // mux.Handle("/rebuild/async", handleRebuild(s.r, false)) + // endpoints + mux.Handle("/stats", s) + mux.Handle("/maint", handleMaint(s.r)) + mux.Handle("/wait", handleWaitForGTID(s.r)) // syncing - start/stop mux.Handle("/syncing/start", handleStartSync(s.r, true)) mux.Handle("/syncing/stop", handleStopSync(s.r, true)) - mux.Handle("/syncing/start/async", handleStopSync(s.r, false)) + mux.Handle("/syncing/start/async", handleStartSync(s.r, false)) mux.Handle("/syncing/stop/async", handleStopSync(s.r, false)) - mux.Handle("/maint", handleMaint(s.r)) - mux.Handle("/wait", handleWaitForGTID(s.r)) + // profiling + mux.Handle("/debug/vars", expvar.Handler()) + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) stdLogger, logWriter := util.NewStdLogger(s.r.Log) defer logWriter.Close() @@ -286,9 +286,18 @@ func handleHealthz(r *River) http.HandlerFunc { } // Kubernetes readiness probe - ready -func handleReadyz(r *River) http.HandlerFunc { +func handleReadyz(s *stat) http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if atomic.LoadInt32(&healthy) == 1 { + d := time.Now().Sub(s.startedAt) + + // allows to take leadership or rolling update. + // wait for 2 mins and inform kubernetes if unsuccessful + if !s.r.isRunning && d.Seconds() < 65 { + w.WriteHeader(http.StatusNoContent) + return + } + + if s.r.isRunning && atomic.LoadInt32(&healthy) == 1 { w.WriteHeader(http.StatusNoContent) return }