Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding getDbSize functionality #135

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ public interface DKVClient extends Closeable {
*/
Iterator<DKVEntry> iterate(byte[] startKey, byte[] keyPref);

/**
* This methods returns the approximate count of keys in the DKV database
* @return retunrn the count of keys
*/
long getDbSize();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to add get as a prefix as its implicit that dBSize is a read-only operation


@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,12 @@ public Iterator<DKVEntry> iterate(byte[] startKey, byte[] keyPref) {
return dkvClient.iterate(startKey, keyPref);
}

@Override
public long getDbSize() {
//TODO: implement this behaviour
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably this shouldn't even be exposed to java client unless its an exact value.

return 0;
}

@Override
public void close() {
pool.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jmx.JmxReporter;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import dkv.serverpb.Api;
import dkv.serverpb.DKVGrpc;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -272,6 +273,17 @@ public Iterator<DKVEntry> iterate(byte[] startKey, byte[] keyPref) {
return iterate(copyFrom(startKey), copyFrom(keyPref));
}

@Override
public long getDbSize() {
Empty emptyReq = Empty.newBuilder().build();
Api.KeySpaceSizeResponse res = blockingStub.getKeySpaceSize(emptyReq);
Api.Status status = res.getStatus();
if (status.getCode() != 0) {
throw new DKVException(status, "getDbSize", new Object[]{});
}
return res.getDbSize();
}

@Override
public void close() {
reporter.stop();
Expand Down
10 changes: 10 additions & 0 deletions cmd/dkvctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var cmds = []*cmd{
{"removeNode", "<nexusUrl>", "Remove a master node from DKV cluster", (*cmd).removeNode, "", false},
{"listNodes", "", "Lists the various DKV nodes that are part of the Nexus cluster", (*cmd).listNodes, "", true},
{"getClusterInfo", "<dcId> <database> <vBucket>", "Gets the latest cluster info", (*cmd).getStatus, "", true},
{"getDbSize", "", "Fetched the approximate count of keys in the db", (*cmd).getDbSize, "", true},
}

func (c *cmd) usage() {
Expand Down Expand Up @@ -220,6 +221,15 @@ func (c *cmd) getStatus(client *ctl.DKVClient, args ...string) {
}
}

func (c *cmd) getDbSize(client *ctl.DKVClient, args ...string) {
dbSize, err := client.GetDbSize()
if err != nil {
fmt.Println("Unable to perform getDbSize. Error: %v\n", err)
}
fmt.Printf("%d\n", dbSize.DbSize)

}

var dkvAddr, dkvAuthority string

func init() {
Expand Down
18 changes: 18 additions & 0 deletions internal/master/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ func (ss *standaloneService) CompareAndSet(ctx context.Context, casReq *serverpb
return res, err
}

func (ss *standaloneService) GetKeySpaceSize(ctx context.Context, e *emptypb.Empty) (*serverpb.KeySpaceSizeResponse, error) {
ss.rwl.RLock()
defer ss.rwl.RUnlock()

dbSizeResults, err := ss.store.GetKeySpaceSize()
res := &serverpb.KeySpaceSizeResponse{Status: newEmptyStatus(), DbSize: dbSizeResults}
if err != nil {
ss.opts.Logger.Error("Unable to fetch DBKeySpaceSize", zap.Error(err))
res.Status = newErrorStatus(err)
}
res.DbSize = dbSizeResults
return res, err
}

func (ss *standaloneService) GetChanges(ctx context.Context, getChngsReq *serverpb.GetChangesRequest) (*serverpb.GetChangesResponse, error) {
ss.rwl.RLock()
defer ss.rwl.RUnlock()
Expand Down Expand Up @@ -417,6 +431,10 @@ func (ds *distributedService) CompareAndSet(ctx context.Context, casReq *serverp
return res, err
}

func (ds *distributedService) GetKeySpaceSize(ctx context.Context, req *emptypb.Empty) (*serverpb.KeySpaceSizeResponse, error) {
return nil, nil
}

func (ds *distributedService) Delete(ctx context.Context, delReq *serverpb.DeleteRequest) (*serverpb.DeleteResponse, error) {
reqBts, err := proto.Marshal(&raftpb.InternalRaftRequest{Delete: delReq})
res := &serverpb.DeleteResponse{Status: newEmptyStatus()}
Expand Down
5 changes: 5 additions & 0 deletions internal/slave/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (ss *slaveService) CompareAndSet(_ context.Context, _ *serverpb.CompareAndS
return nil, errors.New("DKV slave service does not support keyspace mutations")
}

func (ss *slaveService) GetKeySpaceSize(ctx context.Context, empty *emptypb.Empty) (*serverpb.KeySpaceSizeResponse, error) {
//TODO implement me
panic("implement me")
}

func (ss *slaveService) Get(ctx context.Context, getReq *serverpb.GetRequest) (*serverpb.GetResponse, error) {
readResults, err := ss.store.Get(getReq.Key)
res := &serverpb.GetResponse{Status: newEmptyStatus()}
Expand Down
4 changes: 4 additions & 0 deletions internal/storage/badger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) {
return err == nil, err
}

func (bdb *badgerDB) GetKeySpaceSize() (int64, error) {
return 0, nil
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

const (
badgerSSTPrefix = "badger-snapshot-"
)
Expand Down
12 changes: 12 additions & 0 deletions internal/storage/rocksdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -380,6 +381,17 @@ func (rdb *rocksDB) CompareAndSet(key, expect, update []byte) (bool, error) {
return err == nil, err
}

func (rdb *rocksDB) GetKeySpaceSize() (int64, error) {
defer rdb.opts.statsCli.Timing("rocksdb.property.latency.dbSize", time.Now())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric name doesn't match function name. Let's be consistent.

keySize := rdb.db.GetProperty("rocksdb.estimate-num-keys")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be incorrect, Need to sum up sizes for multiple column families. Add test case to verify all scenarios

dbKeySize, err := strconv.ParseInt(keySize, 10, 64)
if err != nil {
rdb.opts.statsCli.Incr("rocksdb.property.latency.dbSize", 1)
return 0, err
}
return dbKeySize, nil
}

const (
sstPrefix = "rocksdb-sstfile-"
sstDefaultCF = "/default.cf"
Expand Down
2 changes: 2 additions & 0 deletions internal/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type KVStore interface {
// If the expected value is `nil`, then the key is created and
// initialized with the given value, atomically.
CompareAndSet(key, expect, update []byte) (bool, error)
//GetKeySpaceSize returns the estimated number of keys the db
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this estimated?

GetKeySpaceSize() (int64, error)
}

// A Backupable represents the capability of the underlying store
Expand Down
4 changes: 4 additions & 0 deletions internal/sync/repl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ func (ms *memStore) CompareAndSet(key, expect, update []byte) (bool, error) {
return true, nil
}

func (ms *memStore) GetKeySpaceSize() (int64, error) {
return 0, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be the count of keys in the db.

}

func (ms *memStore) Close() error {
ms.store = nil
return nil
Expand Down
8 changes: 8 additions & 0 deletions pkg/ctl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"github.com/flipkart-incubator/dkv/internal/hlc"
"github.com/flipkart-incubator/nexus/models"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"time"

Expand Down Expand Up @@ -262,6 +263,13 @@ func (dkvClnt *DKVClient) Iterate(keyPrefix, startKey []byte) (<-chan *KVPair, e
return ch, nil
}

// GetDbSize returns the approximate count of the number of the keys in the db
func (dkvClnt *DKVClient) GetDbSize() (*serverpb.KeySpaceSizeResponse, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be better named as DBSize

ctx, cancel := context.WithTimeout(context.Background(), Timeout)
defer cancel()
return dkvClnt.dkvCli.GetKeySpaceSize(ctx, &emptypb.Empty{})
}

// Close closes the underlying GRPC client connection to DKV service
func (dkvClnt *DKVClient) Close() error {
if dkvClnt.cliConn != nil {
Expand Down
Loading