From 9b940fb5f00687be491c0800f0eb528b9c38d61f Mon Sep 17 00:00:00 2001 From: Jeffrey Koehler Date: Sun, 13 Oct 2024 04:55:54 -0500 Subject: [PATCH] remove WithoutCancle that was causing improper panics after rediss shut down. have metrics init to 0 on start --- go.mod | 4 ++ go.sum | 9 ++++ internal/redisstreams/streams.go | 5 +-- redisconsistent/metrics.go | 71 ++++++++++++++++++++++++++++++++ redisconsistent/redis.go | 4 +- 5 files changed, 88 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 7de874b..c9dc0c8 100644 --- a/go.mod +++ b/go.mod @@ -13,10 +13,14 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/k0kubun/pp/v3 v3.2.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.18.0 // indirect google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index 07a88ac..2bdf3d5 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,14 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs= +github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -34,8 +40,11 @@ github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0 github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/redisstreams/streams.go b/internal/redisstreams/streams.go index 519808b..b700224 100644 --- a/internal/redisstreams/streams.go +++ b/internal/redisstreams/streams.go @@ -34,11 +34,8 @@ func (s *StreamListener) Listen() { Count: 1, }).Result() if err != nil { - select { - case <-s.Ctx.Done(): - //context done. Able to just return. + if errors.Is(err, context.Canceled) { return - default: } s.Logger(s.Ctx).Panic("failed to read data from stream", slog.String("err.error", err.Error()), slog.String("streamlistener.stream", s.Key)) } diff --git a/redisconsistent/metrics.go b/redisconsistent/metrics.go index 07d063f..f193f57 100644 --- a/redisconsistent/metrics.go +++ b/redisconsistent/metrics.go @@ -126,3 +126,74 @@ func ObserveGauge(metric *prometheus.GaugeVec, name string, value int) { met.Set(float64(value)) } + +func InitMetrics(name string) { + + var err error + + _, err = NewWorkerEventTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = RemoveWorkerEventTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = NewWorkEventTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = RemoveWorkEventTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = MasterUpdateWorkTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = WorkerRectifyTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = MasterPingTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = WorkerPingTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = StartProcessingTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = StopProcessingTime.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = StartProcessingKeyCount.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = StopProcessingKeyCount.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + + _, err = DividerAssignedItemsGauge.GetMetricWithLabelValues(name) + if err != nil { + panic(errors.Wrap(err, "failed to get metric")) + } + +} diff --git a/redisconsistent/redis.go b/redisconsistent/redis.go index e44153d..a08a70d 100644 --- a/redisconsistent/redis.go +++ b/redisconsistent/redis.go @@ -35,7 +35,7 @@ func (d *dividerWorker) StartWorker(ctx context.Context) { panic("missing work fetcher func") } - d.ctx, d.cancel = context.WithCancel(context.WithoutCancel(ctx)) + d.ctx, d.cancel = context.WithCancel(ctx) var logger divider.LoggerGen //start tickers and listeners @@ -93,6 +93,8 @@ func (d *dividerWorker) StartWorker(ctx context.Context) { F: d.workerPingFunc, } + InitMetrics(d.conf.metricsName) + d.knownWork = set.New[string]() ObserveGauge(DividerAssignedItemsGauge, d.conf.metricsName, d.knownWork.Len())