Skip to content

Commit

Permalink
remove WithoutCancle that was causing improper panics after rediss sh…
Browse files Browse the repository at this point in the history
…ut down. have metrics init to 0 on start
  • Loading branch information
Jeffrey Koehler committed Oct 13, 2024
1 parent a47c6d0 commit 9b940fb
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 5 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/k0kubun/pp/v3 v3.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/protobuf v1.35.1 // indirect
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs=
github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand All @@ -34,8 +40,11 @@ github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
5 changes: 1 addition & 4 deletions internal/redisstreams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ func (s *StreamListener) Listen() {
Count: 1,
}).Result()
if err != nil {
select {
case <-s.Ctx.Done():
//context done. Able to just return.
if errors.Is(err, context.Canceled) {
return
default:
}
s.Logger(s.Ctx).Panic("failed to read data from stream", slog.String("err.error", err.Error()), slog.String("streamlistener.stream", s.Key))
}
Expand Down
71 changes: 71 additions & 0 deletions redisconsistent/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,74 @@ func ObserveGauge(metric *prometheus.GaugeVec, name string, value int) {

met.Set(float64(value))
}

func InitMetrics(name string) {

var err error

_, err = NewWorkerEventTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = RemoveWorkerEventTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = NewWorkEventTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = RemoveWorkEventTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = MasterUpdateWorkTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = WorkerRectifyTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = MasterPingTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = WorkerPingTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = StartProcessingTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = StopProcessingTime.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = StartProcessingKeyCount.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = StopProcessingKeyCount.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

_, err = DividerAssignedItemsGauge.GetMetricWithLabelValues(name)
if err != nil {
panic(errors.Wrap(err, "failed to get metric"))
}

}
4 changes: 3 additions & 1 deletion redisconsistent/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (d *dividerWorker) StartWorker(ctx context.Context) {
panic("missing work fetcher func")
}

d.ctx, d.cancel = context.WithCancel(context.WithoutCancel(ctx))
d.ctx, d.cancel = context.WithCancel(ctx)

var logger divider.LoggerGen
//start tickers and listeners
Expand Down Expand Up @@ -93,6 +93,8 @@ func (d *dividerWorker) StartWorker(ctx context.Context) {
F: d.workerPingFunc,
}

InitMetrics(d.conf.metricsName)

d.knownWork = set.New[string]()
ObserveGauge(DividerAssignedItemsGauge, d.conf.metricsName, d.knownWork.Len())

Expand Down

0 comments on commit 9b940fb

Please sign in to comment.