From 42360235fb1edcc1437dffbad37ad6e4473ed9c3 Mon Sep 17 00:00:00 2001 From: Kinshuk Bairagi Date: Fri, 31 Mar 2023 13:39:53 +0530 Subject: [PATCH] rocksdb: Fix segfault + p8s fixes(#174) - Fixes crash due to p8s accessing db which has been already closed post a snapshot load. - p8s separate out standalone, distributed since 2 service can't register using same name. --- internal/master/service.go | 8 +++++--- internal/storage/badger/metrics.go | 12 ++++++++++-- internal/storage/badger/store.go | 3 ++- internal/storage/rocksdb/metrics.go | 13 +++++++++++-- internal/storage/rocksdb/store.go | 2 +- internal/storage/store.go | 10 +++++----- 6 files changed, 34 insertions(+), 14 deletions(-) diff --git a/internal/master/service.go b/internal/master/service.go index ebe1c2ad..ec029e38 100644 --- a/internal/master/service.go +++ b/internal/master/service.go @@ -47,9 +47,10 @@ type dkvServiceStat struct { ResponseError *prometheus.CounterVec } -func newDKVServiceStat(registry prometheus.Registerer) *dkvServiceStat { +func newDKVServiceStat(registry prometheus.Registerer, mode string) *dkvServiceStat { RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: stats.Namespace, + Subsystem: mode, Name: "latency", Help: "Latency statistics for dkv service", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, @@ -57,6 +58,7 @@ func newDKVServiceStat(registry prometheus.Registerer) *dkvServiceStat { }, []string{"Ops"}) ResponseError := prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: stats.Namespace, + Subsystem: mode, Name: "error", Help: "Error count for storage operations", }, []string{"Ops"}) @@ -113,7 +115,7 @@ func (ss *standaloneService) Watch(req *health.HealthCheckRequest, watcher healt func NewStandaloneService(store storage.KVStore, cp storage.ChangePropagator, br storage.Backupable, regionInfo *serverpb.RegionInfo, opts *opts.ServerOpts) DKVService { rwl := &sync.RWMutex{} regionInfo.Status = serverpb.RegionStatus_LEADER - return &standaloneService{store, cp, br, rwl, regionInfo, false, make(chan struct{}, 1), opts, newDKVServiceStat(opts.PrometheusRegistry)} + return &standaloneService{store, cp, br, rwl, regionInfo, false, make(chan struct{}, 1), opts, newDKVServiceStat(opts.PrometheusRegistry, "standalone")} } func (ss *standaloneService) Put(ctx context.Context, putReq *serverpb.PutRequest) (*serverpb.PutResponse, error) { @@ -403,7 +405,7 @@ func NewDistributedService(kvs storage.KVStore, cp storage.ChangePropagator, br raftRepl: raftRepl, shutdown: make(chan struct{}, 1), opts: opts, - stat: newDKVServiceStat(stats.NewPromethousNoopRegistry()), + stat: newDKVServiceStat(opts.PrometheusRegistry, "distributed"), } } diff --git a/internal/storage/badger/metrics.go b/internal/storage/badger/metrics.go index ba6334c6..b691baa1 100644 --- a/internal/storage/badger/metrics.go +++ b/internal/storage/badger/metrics.go @@ -2,12 +2,14 @@ package badger import ( "github.com/flipkart-incubator/dkv/internal/stats" + "github.com/flipkart-incubator/dkv/internal/storage" "github.com/prometheus/client_golang/prometheus" ) // NewBadgerCollector returns a prometheus Collector for Badger metrics from expvar. func (bdb *badgerDB) metricsCollector() { - collector := prometheus.NewExpvarCollector(map[string]*prometheus.Desc{ + bdb.stat = storage.NewStat("badger") + bdb.stat.StoreMetricsCollector = prometheus.NewExpvarCollector(map[string]*prometheus.Desc{ "badger_v3_disk_reads_total": prometheus.NewDesc( prometheus.BuildFQName(stats.Namespace, "badger", "disk_reads_total"), "Number of cumulative reads by Badger", @@ -79,6 +81,12 @@ func (bdb *badgerDB) metricsCollector() { nil, nil, ), }) + bdb.opts.promRegistry.MustRegister(bdb.stat.RequestLatency, bdb.stat.ResponseError) + bdb.opts.promRegistry.MustRegister(bdb.stat.StoreMetricsCollector) +} - bdb.opts.promRegistry.MustRegister(collector) +func (bdb *badgerDB) unRegisterMetricsCollector() { + bdb.opts.promRegistry.Unregister(bdb.stat.StoreMetricsCollector) + bdb.opts.promRegistry.Unregister(bdb.stat.RequestLatency) + bdb.opts.promRegistry.Unregister(bdb.stat.ResponseError) } diff --git a/internal/storage/badger/store.go b/internal/storage/badger/store.go index f90a431f..0c3960a8 100644 --- a/internal/storage/badger/store.go +++ b/internal/storage/badger/store.go @@ -186,12 +186,13 @@ func openStore(bdbOpts *bdgrOpts) (*badgerDB, error) { return nil, err } - bdb := badgerDB{db, bdbOpts, storage.NewStat(bdbOpts.promRegistry, "badger"), 0} + bdb := badgerDB{db: db, opts: bdbOpts, globalMutation: 0} bdb.metricsCollector() return &bdb, nil } func (bdb *badgerDB) Close() error { + bdb.unRegisterMetricsCollector() bdb.db.Close() return nil } diff --git a/internal/storage/rocksdb/metrics.go b/internal/storage/rocksdb/metrics.go index 5322099e..a33b3ecc 100644 --- a/internal/storage/rocksdb/metrics.go +++ b/internal/storage/rocksdb/metrics.go @@ -2,6 +2,7 @@ package rocksdb import ( "github.com/flipkart-incubator/dkv/internal/stats" + "github.com/flipkart-incubator/dkv/internal/storage" "github.com/flipkart-incubator/gorocksdb" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -65,6 +66,14 @@ func (collector *rocksDBCollector) Collect(ch chan<- prometheus.Metric) { // metricsCollector collects rocksdB metrics. func (rdb *rocksDB) metricsCollector() { - collector := newRocksDBCollector(rdb) - rdb.opts.promRegistry.MustRegister(collector) + rdb.stat = storage.NewStat("rocksdb") + rdb.opts.promRegistry.MustRegister(rdb.stat.RequestLatency, rdb.stat.ResponseError) + rdb.stat.StoreMetricsCollector = newRocksDBCollector(rdb) + rdb.opts.promRegistry.MustRegister(rdb.stat.StoreMetricsCollector) +} + +func (rdb *rocksDB) unRegisterMetricsCollector() { + rdb.opts.promRegistry.Unregister(rdb.stat.StoreMetricsCollector) + rdb.opts.promRegistry.Unregister(rdb.stat.RequestLatency) + rdb.opts.promRegistry.Unregister(rdb.stat.ResponseError) } diff --git a/internal/storage/rocksdb/store.go b/internal/storage/rocksdb/store.go index 6c8d8872..3016dc32 100644 --- a/internal/storage/rocksdb/store.go +++ b/internal/storage/rocksdb/store.go @@ -237,7 +237,6 @@ func openStore(opts *rocksDBOpts) (*rocksDB, error) { optimTrxnDB: optimTrxnDB, opts: opts, globalMutation: 0, - stat: storage.NewStat(opts.promRegistry, "rocksdb"), } rocksdb.metricsCollector() @@ -266,6 +265,7 @@ func (rdb *rocksDB) Compaction() error { } func (rdb *rocksDB) Close() error { + rdb.unRegisterMetricsCollector() rdb.optimTrxnDB.Close() //rdb.opts.destroy() return nil diff --git a/internal/storage/store.go b/internal/storage/store.go index 6bb92885..fb199775 100644 --- a/internal/storage/store.go +++ b/internal/storage/store.go @@ -14,11 +14,12 @@ import ( ) type Stat struct { - RequestLatency *prometheus.SummaryVec - ResponseError *prometheus.CounterVec + RequestLatency *prometheus.SummaryVec + ResponseError *prometheus.CounterVec + StoreMetricsCollector prometheus.Collector } -func NewStat(registry prometheus.Registerer, engine string) *Stat { +func NewStat(engine string) *Stat { RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: stats.Namespace, Name: fmt.Sprintf("storage_latency_%s", engine), @@ -31,8 +32,7 @@ func NewStat(registry prometheus.Registerer, engine string) *Stat { Name: fmt.Sprintf("storage_error_%s", engine), Help: fmt.Sprintf("Error count for %s storage operations", engine), }, []string{stats.Ops}) - registry.MustRegister(RequestLatency, ResponseError) - return &Stat{RequestLatency, ResponseError} + return &Stat{RequestLatency: RequestLatency, ResponseError: ResponseError} } // A KVStore represents the key value store that provides