Skip to content

Commit

Permalink
rocksdb: Fix segfault + p8s fixes(flipkart-incubator#174)
Browse files Browse the repository at this point in the history
- Fixes crash due to p8s accessing db which has been already closed post a snapshot load.  
- p8s separate out standalone, distributed since 2 service can't register using same name.
  • Loading branch information
kingster authored Mar 31, 2023
1 parent 17c323e commit 4236023
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 14 deletions.
8 changes: 5 additions & 3 deletions internal/master/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,18 @@ type dkvServiceStat struct {
ResponseError *prometheus.CounterVec
}

func newDKVServiceStat(registry prometheus.Registerer) *dkvServiceStat {
func newDKVServiceStat(registry prometheus.Registerer, mode string) *dkvServiceStat {
RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: stats.Namespace,
Subsystem: mode,
Name: "latency",
Help: "Latency statistics for dkv service",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
MaxAge: 10 * time.Second,
}, []string{"Ops"})
ResponseError := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: stats.Namespace,
Subsystem: mode,
Name: "error",
Help: "Error count for storage operations",
}, []string{"Ops"})
Expand Down Expand Up @@ -113,7 +115,7 @@ func (ss *standaloneService) Watch(req *health.HealthCheckRequest, watcher healt
func NewStandaloneService(store storage.KVStore, cp storage.ChangePropagator, br storage.Backupable, regionInfo *serverpb.RegionInfo, opts *opts.ServerOpts) DKVService {
rwl := &sync.RWMutex{}
regionInfo.Status = serverpb.RegionStatus_LEADER
return &standaloneService{store, cp, br, rwl, regionInfo, false, make(chan struct{}, 1), opts, newDKVServiceStat(opts.PrometheusRegistry)}
return &standaloneService{store, cp, br, rwl, regionInfo, false, make(chan struct{}, 1), opts, newDKVServiceStat(opts.PrometheusRegistry, "standalone")}
}

func (ss *standaloneService) Put(ctx context.Context, putReq *serverpb.PutRequest) (*serverpb.PutResponse, error) {
Expand Down Expand Up @@ -403,7 +405,7 @@ func NewDistributedService(kvs storage.KVStore, cp storage.ChangePropagator, br
raftRepl: raftRepl,
shutdown: make(chan struct{}, 1),
opts: opts,
stat: newDKVServiceStat(stats.NewPromethousNoopRegistry()),
stat: newDKVServiceStat(opts.PrometheusRegistry, "distributed"),
}
}

Expand Down
12 changes: 10 additions & 2 deletions internal/storage/badger/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package badger

import (
"github.com/flipkart-incubator/dkv/internal/stats"
"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/prometheus/client_golang/prometheus"
)

// NewBadgerCollector returns a prometheus Collector for Badger metrics from expvar.
func (bdb *badgerDB) metricsCollector() {
collector := prometheus.NewExpvarCollector(map[string]*prometheus.Desc{
bdb.stat = storage.NewStat("badger")
bdb.stat.StoreMetricsCollector = prometheus.NewExpvarCollector(map[string]*prometheus.Desc{
"badger_v3_disk_reads_total": prometheus.NewDesc(
prometheus.BuildFQName(stats.Namespace, "badger", "disk_reads_total"),
"Number of cumulative reads by Badger",
Expand Down Expand Up @@ -79,6 +81,12 @@ func (bdb *badgerDB) metricsCollector() {
nil, nil,
),
})
bdb.opts.promRegistry.MustRegister(bdb.stat.RequestLatency, bdb.stat.ResponseError)
bdb.opts.promRegistry.MustRegister(bdb.stat.StoreMetricsCollector)
}

bdb.opts.promRegistry.MustRegister(collector)
func (bdb *badgerDB) unRegisterMetricsCollector() {
bdb.opts.promRegistry.Unregister(bdb.stat.StoreMetricsCollector)
bdb.opts.promRegistry.Unregister(bdb.stat.RequestLatency)
bdb.opts.promRegistry.Unregister(bdb.stat.ResponseError)
}
3 changes: 2 additions & 1 deletion internal/storage/badger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,13 @@ func openStore(bdbOpts *bdgrOpts) (*badgerDB, error) {
return nil, err
}

bdb := badgerDB{db, bdbOpts, storage.NewStat(bdbOpts.promRegistry, "badger"), 0}
bdb := badgerDB{db: db, opts: bdbOpts, globalMutation: 0}
bdb.metricsCollector()
return &bdb, nil
}

func (bdb *badgerDB) Close() error {
bdb.unRegisterMetricsCollector()
bdb.db.Close()
return nil
}
Expand Down
13 changes: 11 additions & 2 deletions internal/storage/rocksdb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rocksdb

import (
"github.com/flipkart-incubator/dkv/internal/stats"
"github.com/flipkart-incubator/dkv/internal/storage"
"github.com/flipkart-incubator/gorocksdb"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -65,6 +66,14 @@ func (collector *rocksDBCollector) Collect(ch chan<- prometheus.Metric) {

// metricsCollector collects rocksdB metrics.
func (rdb *rocksDB) metricsCollector() {
collector := newRocksDBCollector(rdb)
rdb.opts.promRegistry.MustRegister(collector)
rdb.stat = storage.NewStat("rocksdb")
rdb.opts.promRegistry.MustRegister(rdb.stat.RequestLatency, rdb.stat.ResponseError)
rdb.stat.StoreMetricsCollector = newRocksDBCollector(rdb)
rdb.opts.promRegistry.MustRegister(rdb.stat.StoreMetricsCollector)
}

func (rdb *rocksDB) unRegisterMetricsCollector() {
rdb.opts.promRegistry.Unregister(rdb.stat.StoreMetricsCollector)
rdb.opts.promRegistry.Unregister(rdb.stat.RequestLatency)
rdb.opts.promRegistry.Unregister(rdb.stat.ResponseError)
}
2 changes: 1 addition & 1 deletion internal/storage/rocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ func openStore(opts *rocksDBOpts) (*rocksDB, error) {
optimTrxnDB: optimTrxnDB,
opts: opts,
globalMutation: 0,
stat: storage.NewStat(opts.promRegistry, "rocksdb"),
}
rocksdb.metricsCollector()

Expand Down Expand Up @@ -266,6 +265,7 @@ func (rdb *rocksDB) Compaction() error {
}

func (rdb *rocksDB) Close() error {
rdb.unRegisterMetricsCollector()
rdb.optimTrxnDB.Close()
//rdb.opts.destroy()
return nil
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type Stat struct {
RequestLatency *prometheus.SummaryVec
ResponseError *prometheus.CounterVec
RequestLatency *prometheus.SummaryVec
ResponseError *prometheus.CounterVec
StoreMetricsCollector prometheus.Collector
}

func NewStat(registry prometheus.Registerer, engine string) *Stat {
func NewStat(engine string) *Stat {
RequestLatency := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: stats.Namespace,
Name: fmt.Sprintf("storage_latency_%s", engine),
Expand All @@ -31,8 +32,7 @@ func NewStat(registry prometheus.Registerer, engine string) *Stat {
Name: fmt.Sprintf("storage_error_%s", engine),
Help: fmt.Sprintf("Error count for %s storage operations", engine),
}, []string{stats.Ops})
registry.MustRegister(RequestLatency, ResponseError)
return &Stat{RequestLatency, ResponseError}
return &Stat{RequestLatency: RequestLatency, ResponseError: ResponseError}
}

// A KVStore represents the key value store that provides
Expand Down

0 comments on commit 4236023

Please sign in to comment.