From 2fd444ba4b08beb9972690b400a98ba0479200f1 Mon Sep 17 00:00:00 2001 From: Nicolas Lara Date: Tue, 30 May 2023 13:49:18 +0200 Subject: [PATCH] new cachekv impl backported from v0.47 (#438) --- go.mod | 3 +- go.sum | 6 +- store/cachekv/README.md | 140 ++++++++++++++ store/cachekv/bench_helper_test.go | 2 +- store/cachekv/benchmark_test.go | 135 +++++++++++++ store/cachekv/internal/btree.go | 90 +++++++++ store/cachekv/internal/btree_test.go | 221 +++++++++++++++++++++ store/cachekv/internal/memiterator.go | 119 ++++++++++++ store/cachekv/internal/mergeiterator.go | 235 +++++++++++++++++++++++ store/cachekv/search_benchmark_test.go | 44 +++++ store/cachekv/store.go | 244 ++++++++++++------------ store/cachekv/store_bench_test.go | 20 +- store/cachekv/store_test.go | 159 ++++++++++++++- 13 files changed, 1278 insertions(+), 140 deletions(-) create mode 100644 store/cachekv/README.md create mode 100644 store/cachekv/benchmark_test.go create mode 100644 store/cachekv/internal/btree.go create mode 100644 store/cachekv/internal/btree_test.go create mode 100644 store/cachekv/internal/memiterator.go create mode 100644 store/cachekv/internal/mergeiterator.go create mode 100644 store/cachekv/search_benchmark_test.go diff --git a/go.mod b/go.mod index bf23b2faf555..466457162196 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/99designs/keyring v1.1.6 github.com/armon/go-metrics v0.3.11 github.com/bgentry/speakeasy v0.1.0 - github.com/btcsuite/btcd v0.22.1 + github.com/btcsuite/btcd v0.22.3 github.com/coinbase/rosetta-sdk-go v0.7.0 github.com/confio/ics23/go v0.7.0 github.com/cosmos/btcutil v1.0.4 @@ -44,6 +44,7 @@ require ( github.com/tendermint/go-amino v0.16.0 github.com/tendermint/tendermint v0.34.21 github.com/tendermint/tm-db v0.6.6 + github.com/tidwall/btree v1.6.0 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b google.golang.org/grpc v1.49.0 diff --git a/go.sum b/go.sum index 2832e516d491..4a2561e51097 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,8 @@ github.com/btcsuite/btcd v0.0.0-20190115013929-ed77733ec07d/go.mod h1:d3C0AkH6BR github.com/btcsuite/btcd v0.0.0-20190315201642-aa6e0f35703c/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.21.0-beta/go.mod h1:ZSWyehm27aAuS9bvkATT+Xte3hjHZ+MRgMY/8NJ7K94= -github.com/btcsuite/btcd v0.22.1 h1:CnwP9LM/M9xuRrGSCGeMVs9iv09uMqwsVX7EeIpgV2c= -github.com/btcsuite/btcd v0.22.1/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y= +github.com/btcsuite/btcd v0.22.3 h1:kYNaWFvOw6xvqP0vR20RP1Zq1DVMBxEO8QN5d1/EfNg= +github.com/btcsuite/btcd v0.22.3/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= @@ -767,6 +767,8 @@ github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2l github.com/tendermint/go-amino v0.16.0/go.mod h1:TQU0M1i/ImAo+tYpZi73AU3V/dKeCoMC9Sphe2ZwGME= github.com/tendermint/tendermint v0.34.21 h1:UiGGnBFHVrZhoQVQ7EfwSOLuCtarqCSsRf8VrklqB7s= github.com/tendermint/tendermint v0.34.21/go.mod h1:XDvfg6U7grcFTDx7VkzxnhazQ/bspGJAn4DZ6DcLLjQ= +github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg= +github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tidwall/gjson v1.6.7/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI= github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.2/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/store/cachekv/README.md b/store/cachekv/README.md new file mode 100644 index 000000000000..66f0916dea4b --- /dev/null +++ b/store/cachekv/README.md @@ -0,0 +1,140 @@ +# CacheKVStore specification + +A `CacheKVStore` is cache wrapper for a `KVStore`. It extends the operations of the `KVStore` to work with a write-back cache, allowing for reduced I/O operations and more efficient disposing of changes (e.g. after processing a failed transaction). + +The core goals the CacheKVStore seeks to solve are: + +* Buffer all writes to the parent store, so they can be dropped if they need to be reverted +* Allow iteration over contiguous spans of keys +* Act as a cache, improving access time for reads that have already been done (by replacing tree access with hashtable access, avoiding disk I/O) + * Note: We actually fail to achieve this for iteration right now + * Note: Need to consider this getting too large and dropping some cached reads +* Make subsequent reads account for prior buffered writes +* Write all buffered changes to the parent store + +We should revisit these goals with time (for instance it's unclear that all disk writes need to be buffered to the end of the block), but this is the current status. + +## Types and Structs + +```go +type Store struct { + mtx sync.Mutex + cache map[string]*cValue + deleted map[string]struct{} + unsortedCache map[string]struct{} + sortedCache *dbm.MemDB // always ascending sorted + parent types.KVStore +} +``` + +The Store struct wraps the underlying `KVStore` (`parent`) with additional data structures for implementing the cache. Mutex is used as IAVL trees (the `KVStore` in application) are not safe for concurrent use. + +### `cache` + +The main mapping of key-value pairs stored in cache. This map contains both keys that are cached from read operations as well as ‘dirty’ keys which map to a value that is potentially different than what is in the underlying `KVStore`. + +Values that are mapped to in `cache` are wrapped in a `cValue` struct, which contains the value and a boolean flag (`dirty`) representing whether the value has been written since the last write-back to `parent`. + +```go +type cValue struct { + value []byte + dirty bool +} +``` + +### `deleted` + +Key-value pairs that are to be deleted from `parent` are stored in the `deleted` map. Keys are mapped to an empty struct to implement a set. + +### `unsortedCache` + +Similar to `deleted`, this is a set of keys that are dirty and will need to be updated in the parent `KVStore` upon a write. Keys are mapped to an empty struct to implement a set. + +### `sortedCache` + +A database that will be populated by the keys in `unsortedCache` during iteration over the cache. The keys are always held in sorted order. + +## CRUD Operations and Writing + +The `Set`, `Get`, and `Delete` functions all call `setCacheValue()`, which is the only entry point to mutating `cache` (besides `Write()`, which clears it). + +`setCacheValue()` inserts a key-value pair into `cache`. Two boolean parameters, `deleted` and `dirty`, are passed in to flag whether the inserted key should also be inserted into the `deleted` and `dirty` sets. Keys will be removed from the `deleted` set if they are written to after being deleted. + +### `Get` + +`Get` first attempts to return the value from `cache`. If the key does not exist in `cache`, `parent.Get()` is called instead. This value from the parent is passed into `setCacheValue()` with `deleted=false` and `dirty=false`. + +### `Has` + +`Has` returns true if `Get` returns a non-nil value. As a result of calling `Get`, it may mutate the cache by caching the read. + +### `Set` + +New values are written by setting or updating the value of a key in `cache`. `Set` does not write to `parent`. + +Calls `setCacheValue()` with `deleted=false` and `dirty=true`. + +### `Delete` + +A value being deleted from the `KVStore` is represented with a `nil` value in `cache`, and an insertion of the key into the `deleted` set. `Delete` does not write to `parent`. + +Calls `setCacheValue()` with `deleted=true` and `dirty=true`. + +### `Write` + +Key-value pairs in the cache are written to `parent` in ascending order of their keys. + +A slice of all dirty keys in `cache` is made, then sorted in increasing order. These keys are iterated over to update `parent`. + +If a key is marked for deletion (checked with `isDeleted()`), then `parent.Delete()` is called. Otherwise, `parent.Set()` is called to update the underlying `KVStore` with the value in cache. + +## Iteration + +Efficient iteration over keys in `KVStore` is important for generating Merkle range proofs. Iteration over `CacheKVStore` requires producing all key-value pairs from the underlying `KVStore` while taking into account updated values from the cache. + +In the current implementation, there is no guarantee that all values in `parent` have been cached. As a result, iteration is achieved by interleaved iteration through both `parent` and the cache (failing to actually benefit from caching). + +[cacheMergeIterator](https://github.com/cosmos/cosmos-sdk/blob/d8391cb6796d770b02448bee70b865d824e43449/store/cachekv/mergeiterator.go) implements functions to provide a single iterator with an input of iterators over `parent` and the cache. This iterator iterates over keys from both iterators in a shared lexicographic order, and overrides the value provided by the parent iterator if the same key is dirty or deleted in the cache. + +### Implementation Overview + +Iterators over `parent` and the cache are generated and passed into `cacheMergeIterator`, which returns a single, interleaved iterator. Implementation of the `parent` iterator is up to the underlying `KVStore`. The remainder of this section covers the generation of the cache iterator. + +Recall that `unsortedCache` is an unordered set of dirty cache keys. Our goal is to construct an ordered iterator over cache keys that fall within the `start` and `end` bounds requested. + +Generating the cache iterator can be decomposed into four parts: + +1. Finding all keys that exist in the range we are iterating over +2. Sorting this list of keys +3. Inserting these keys into `sortedCache` and removing them from `unsortedCache` +4. Returning an iterator over `sortedCache` with the desired range + +Currently, the implementation for the first two parts is split into two cases, depending on the size of the unsorted cache. The two cases are as follows. + +If the size of `unsortedCache` is less than `minSortSize` (currently 1024), a linear time approach is taken to search over keys. + +```go +n := len(store.unsortedCache) +unsorted := make([]*kv.Pair, 0) + +if n < minSortSize { + for key := range store.unsortedCache { + if dbm.IsKeyInDomain(conv.UnsafeStrToBytes(key), start, end) { + cacheValue := store.cache[key] + unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value}) + } + } + store.clearUnsortedCacheSubset(unsorted, stateUnsorted) + return +} +``` + +Here, we iterate through all the keys in `unsortedCache` (i.e., the dirty cache keys), collecting those within the requested range in an unsorted slice called `unsorted`. + +At this point, part 3. is achieved in `clearUnsortedCacheSubset()`. This function iterates through `unsorted`, removing each key from `unsortedCache`. Afterwards, `unsorted` is sorted. Lastly, it iterates through the now sorted slice, inserting key-value pairs into `sortedCache`. Any key marked for deletion is mapped to an arbitrary value (`[]byte{}`). + +In the case that the size of `unsortedCache` is larger than `minSortSize`, a linear time approach to finding keys within the desired range is too slow to use. Instead, a slice of all keys in `unsortedCache` is sorted, and binary search is used to find the beginning and ending indices of the desired range. This produces an already-sorted slice that is passed into the same `clearUnsortedCacheSubset()` function. An iota identifier (`sortedState`) is used to skip the sorting step in the function. + +Finally, part 4. is achieved with `memIterator`, which implements an iterator over the items in `sortedCache`. + +As of [PR #12885](https://github.com/cosmos/cosmos-sdk/pull/12885), an optimization to the binary search case mitigates the overhead of sorting the entirety of the key set in `unsortedCache`. To avoid wasting the compute spent sorting, we should ensure that a reasonable amount of values are removed from `unsortedCache`. If the length of the range for iteration is less than `minSortedCache`, we widen the range of values for removal from `unsortedCache` to be up to `minSortedCache` in length. This amortizes the cost of processing elements across multiple calls. \ No newline at end of file diff --git a/store/cachekv/bench_helper_test.go b/store/cachekv/bench_helper_test.go index fe5be27fabc9..be7fec4b3a7b 100644 --- a/store/cachekv/bench_helper_test.go +++ b/store/cachekv/bench_helper_test.go @@ -34,7 +34,7 @@ func generateSequentialKeys(startKey []byte, numKeys int) [][]byte { } // Generate many random, unsorted keys -func generateRandomKeys(keySize int, numKeys int) [][]byte { +func generateRandomKeys(keySize, numKeys int) [][]byte { toReturn := make([][]byte, 0, numKeys) for i := 0; i < numKeys; i++ { newKey := randSlice(keySize) diff --git a/store/cachekv/benchmark_test.go b/store/cachekv/benchmark_test.go new file mode 100644 index 000000000000..484238c14454 --- /dev/null +++ b/store/cachekv/benchmark_test.go @@ -0,0 +1,135 @@ +package cachekv_test + +import ( + "fmt" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "testing" + + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/types" + + "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" +) + +func DoBenchmarkDeepCacheStack(b *testing.B, depth int) { + db := dbm.NewMemDB() + initialStore := cachekv.NewStore(dbadapter.Store{DB: db}) + + nItems := 20 + for i := 0; i < nItems; i++ { + initialStore.Set([]byte(fmt.Sprintf("hello%03d", i)), []byte{0}) + } + + var stack CacheStack + stack.Reset(initialStore) + + for i := 0; i < depth; i++ { + stack.Snapshot() + + store := stack.CurrentStore() + store.Set([]byte(fmt.Sprintf("hello%03d", i)), []byte{byte(i)}) + } + + store := stack.CurrentStore() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + it := store.Iterator(nil, nil) + items := make([][]byte, 0, nItems) + for ; it.Valid(); it.Next() { + items = append(items, it.Key()) + it.Value() + } + it.Close() + require.Equal(b, nItems, len(items)) + } +} + +func BenchmarkDeepCacheStack1(b *testing.B) { + DoBenchmarkDeepCacheStack(b, 1) +} + +func BenchmarkDeepCacheStack3(b *testing.B) { + DoBenchmarkDeepCacheStack(b, 3) +} + +func BenchmarkDeepCacheStack10(b *testing.B) { + DoBenchmarkDeepCacheStack(b, 10) +} + +func BenchmarkDeepCacheStack13(b *testing.B) { + DoBenchmarkDeepCacheStack(b, 13) +} + +// CacheStack manages a stack of nested cache store to +// support the evm `StateDB`'s `Snapshot` and `RevertToSnapshot` methods. +type CacheStack struct { + initialStore types.CacheKVStore + // Context of the initial state before transaction execution. + // It's the context used by `StateDB.CommitedState`. + cacheStores []types.CacheKVStore +} + +// CurrentContext returns the top context of cached stack, +// if the stack is empty, returns the initial context. +func (cs *CacheStack) CurrentStore() types.CacheKVStore { + l := len(cs.cacheStores) + if l == 0 { + return cs.initialStore + } + return cs.cacheStores[l-1] +} + +// Reset sets the initial context and clear the cache context stack. +func (cs *CacheStack) Reset(initialStore types.CacheKVStore) { + cs.initialStore = initialStore + cs.cacheStores = nil +} + +// IsEmpty returns true if the cache context stack is empty. +func (cs *CacheStack) IsEmpty() bool { + return len(cs.cacheStores) == 0 +} + +// Commit commits all the cached contexts from top to bottom in order and clears the stack by setting an empty slice of cache contexts. +func (cs *CacheStack) Commit() { + // commit in order from top to bottom + for i := len(cs.cacheStores) - 1; i >= 0; i-- { + cs.cacheStores[i].Write() + } + cs.cacheStores = nil +} + +// CommitToRevision commit the cache after the target revision, +// to improve efficiency of db operations. +func (cs *CacheStack) CommitToRevision(target int) error { + if target < 0 || target >= len(cs.cacheStores) { + return fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cacheStores)) + } + + // commit in order from top to bottom + for i := len(cs.cacheStores) - 1; i > target; i-- { + cs.cacheStores[i].Write() + } + cs.cacheStores = cs.cacheStores[0 : target+1] + + return nil +} + +// Snapshot pushes a new cached context to the stack, +// and returns the index of it. +func (cs *CacheStack) Snapshot() int { + cs.cacheStores = append(cs.cacheStores, cachekv.NewStore(cs.CurrentStore())) + return len(cs.cacheStores) - 1 +} + +// RevertToSnapshot pops all the cached contexts after the target index (inclusive). +// the target should be snapshot index returned by `Snapshot`. +// This function panics if the index is out of bounds. +func (cs *CacheStack) RevertToSnapshot(target int) { + if target < 0 || target >= len(cs.cacheStores) { + panic(fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cacheStores))) + } + cs.cacheStores = cs.cacheStores[:target] +} diff --git a/store/cachekv/internal/btree.go b/store/cachekv/internal/btree.go new file mode 100644 index 000000000000..d0340022f2c5 --- /dev/null +++ b/store/cachekv/internal/btree.go @@ -0,0 +1,90 @@ +package internal + +import ( + "bytes" + "errors" + + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/tidwall/btree" +) + +const ( + // The approximate number of items and children per B-tree node. Tuned with benchmarks. + // copied from memdb. + bTreeDegree = 32 +) + +var errKeyEmpty = errors.New("key cannot be empty") + +// BTree implements the sorted cache for cachekv store, +// we don't use MemDB here because cachekv is used extensively in sdk core path, +// we need it to be as fast as possible, while `MemDB` is mainly used as a mocking db in unit tests. +// +// We choose tidwall/btree over google/btree here because it provides API to implement step iterator directly. +type BTree struct { + tree *btree.BTreeG[item] +} + +// NewBTree creates a wrapper around `btree.BTreeG`. +func NewBTree() BTree { + return BTree{ + tree: btree.NewBTreeGOptions(byKeys, btree.Options{ + Degree: bTreeDegree, + NoLocks: false, + }), + } +} + +func (bt BTree) Set(key, value []byte) { + bt.tree.Set(newItem(key, value)) +} + +func (bt BTree) Get(key []byte) []byte { + i, found := bt.tree.Get(newItem(key, nil)) + if !found { + return nil + } + return i.value +} + +func (bt BTree) Delete(key []byte) { + bt.tree.Delete(newItem(key, nil)) +} + +func (bt BTree) Iterator(start, end []byte) (types.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + return newMemIterator(start, end, bt, true), nil +} + +func (bt BTree) ReverseIterator(start, end []byte) (types.Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + return newMemIterator(start, end, bt, false), nil +} + +// Copy the tree. This is a copy-on-write operation and is very fast because +// it only performs a shadowed copy. +func (bt BTree) Copy() BTree { + return BTree{ + tree: bt.tree.Copy(), + } +} + +// item is a btree item with byte slices as keys and values +type item struct { + key []byte + value []byte +} + +// byKeys compares the items by key +func byKeys(a, b item) bool { + return bytes.Compare(a.key, b.key) == -1 +} + +// newItem creates a new pair item. +func newItem(key, value []byte) item { + return item{key: key, value: value} +} diff --git a/store/cachekv/internal/btree_test.go b/store/cachekv/internal/btree_test.go new file mode 100644 index 000000000000..2654e1fdeb56 --- /dev/null +++ b/store/cachekv/internal/btree_test.go @@ -0,0 +1,221 @@ +package internal + +import ( + "encoding/binary" + "testing" + + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/stretchr/testify/require" +) + +func TestGetSetDelete(t *testing.T) { + db := NewBTree() + + // A nonexistent key should return nil. + value := db.Get([]byte("a")) + require.Nil(t, value) + + // Set and get a value. + db.Set([]byte("a"), []byte{0x01}) + db.Set([]byte("b"), []byte{0x02}) + value = db.Get([]byte("a")) + require.Equal(t, []byte{0x01}, value) + + value = db.Get([]byte("b")) + require.Equal(t, []byte{0x02}, value) + + // Deleting a non-existent value is fine. + db.Delete([]byte("x")) + + // Delete a value. + db.Delete([]byte("a")) + + value = db.Get([]byte("a")) + require.Nil(t, value) + + db.Delete([]byte("b")) + + value = db.Get([]byte("b")) + require.Nil(t, value) +} + +func TestDBIterator(t *testing.T) { + db := NewBTree() + + for i := 0; i < 10; i++ { + if i != 6 { // but skip 6. + db.Set(int642Bytes(int64(i)), []byte{}) + } + } + + // Blank iterator keys should error + _, err := db.ReverseIterator([]byte{}, nil) + require.Equal(t, errKeyEmpty, err) + _, err = db.ReverseIterator(nil, []byte{}) + require.Equal(t, errKeyEmpty, err) + + itr, err := db.Iterator(nil, nil) + require.NoError(t, err) + verifyIterator(t, itr, []int64{0, 1, 2, 3, 4, 5, 7, 8, 9}, "forward iterator") + + ritr, err := db.ReverseIterator(nil, nil) + require.NoError(t, err) + verifyIterator(t, ritr, []int64{9, 8, 7, 5, 4, 3, 2, 1, 0}, "reverse iterator") + + itr, err = db.Iterator(nil, int642Bytes(0)) + require.NoError(t, err) + verifyIterator(t, itr, []int64(nil), "forward iterator to 0") + + ritr, err = db.ReverseIterator(int642Bytes(10), nil) + require.NoError(t, err) + verifyIterator(t, ritr, []int64(nil), "reverse iterator from 10 (ex)") + + itr, err = db.Iterator(int642Bytes(0), nil) + require.NoError(t, err) + verifyIterator(t, itr, []int64{0, 1, 2, 3, 4, 5, 7, 8, 9}, "forward iterator from 0") + + itr, err = db.Iterator(int642Bytes(1), nil) + require.NoError(t, err) + verifyIterator(t, itr, []int64{1, 2, 3, 4, 5, 7, 8, 9}, "forward iterator from 1") + + ritr, err = db.ReverseIterator(nil, int642Bytes(10)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64{9, 8, 7, 5, 4, 3, 2, 1, 0}, "reverse iterator from 10 (ex)") + + ritr, err = db.ReverseIterator(nil, int642Bytes(9)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64{8, 7, 5, 4, 3, 2, 1, 0}, "reverse iterator from 9 (ex)") + + ritr, err = db.ReverseIterator(nil, int642Bytes(8)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64{7, 5, 4, 3, 2, 1, 0}, "reverse iterator from 8 (ex)") + + itr, err = db.Iterator(int642Bytes(5), int642Bytes(6)) + require.NoError(t, err) + verifyIterator(t, itr, []int64{5}, "forward iterator from 5 to 6") + + itr, err = db.Iterator(int642Bytes(5), int642Bytes(7)) + require.NoError(t, err) + verifyIterator(t, itr, []int64{5}, "forward iterator from 5 to 7") + + itr, err = db.Iterator(int642Bytes(5), int642Bytes(8)) + require.NoError(t, err) + verifyIterator(t, itr, []int64{5, 7}, "forward iterator from 5 to 8") + + itr, err = db.Iterator(int642Bytes(6), int642Bytes(7)) + require.NoError(t, err) + verifyIterator(t, itr, []int64(nil), "forward iterator from 6 to 7") + + itr, err = db.Iterator(int642Bytes(6), int642Bytes(8)) + require.NoError(t, err) + verifyIterator(t, itr, []int64{7}, "forward iterator from 6 to 8") + + itr, err = db.Iterator(int642Bytes(7), int642Bytes(8)) + require.NoError(t, err) + verifyIterator(t, itr, []int64{7}, "forward iterator from 7 to 8") + + ritr, err = db.ReverseIterator(int642Bytes(4), int642Bytes(5)) + require.NoError(t, err) + verifyIterator(t, ritr, []int64{4}, "reverse iterator from 5 (ex) to 4") + + ritr, err = db.ReverseIterator(int642Bytes(4), int642Bytes(6)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64{5, 4}, "reverse iterator from 6 (ex) to 4") + + ritr, err = db.ReverseIterator(int642Bytes(4), int642Bytes(7)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64{5, 4}, "reverse iterator from 7 (ex) to 4") + + ritr, err = db.ReverseIterator(int642Bytes(5), int642Bytes(6)) + require.NoError(t, err) + verifyIterator(t, ritr, []int64{5}, "reverse iterator from 6 (ex) to 5") + + ritr, err = db.ReverseIterator(int642Bytes(5), int642Bytes(7)) + require.NoError(t, err) + verifyIterator(t, ritr, []int64{5}, "reverse iterator from 7 (ex) to 5") + + ritr, err = db.ReverseIterator(int642Bytes(6), int642Bytes(7)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64(nil), "reverse iterator from 7 (ex) to 6") + + ritr, err = db.ReverseIterator(int642Bytes(10), nil) + require.NoError(t, err) + verifyIterator(t, ritr, []int64(nil), "reverse iterator to 10") + + ritr, err = db.ReverseIterator(int642Bytes(6), nil) + require.NoError(t, err) + verifyIterator(t, ritr, []int64{9, 8, 7}, "reverse iterator to 6") + + ritr, err = db.ReverseIterator(int642Bytes(5), nil) + require.NoError(t, err) + verifyIterator(t, ritr, []int64{9, 8, 7, 5}, "reverse iterator to 5") + + ritr, err = db.ReverseIterator(int642Bytes(8), int642Bytes(9)) + require.NoError(t, err) + verifyIterator(t, ritr, []int64{8}, "reverse iterator from 9 (ex) to 8") + + ritr, err = db.ReverseIterator(int642Bytes(2), int642Bytes(4)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64{3, 2}, "reverse iterator from 4 (ex) to 2") + + ritr, err = db.ReverseIterator(int642Bytes(4), int642Bytes(2)) + require.NoError(t, err) + verifyIterator(t, ritr, + []int64(nil), "reverse iterator from 2 (ex) to 4") + + // Ensure that the iterators don't panic with an empty database. + db2 := NewBTree() + + itr, err = db2.Iterator(nil, nil) + require.NoError(t, err) + verifyIterator(t, itr, nil, "forward iterator with empty db") + + ritr, err = db2.ReverseIterator(nil, nil) + require.NoError(t, err) + verifyIterator(t, ritr, nil, "reverse iterator with empty db") +} + +func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg string) { + i := 0 + for itr.Valid() { + key := itr.Key() + require.Equal(t, expected[i], bytes2Int64(key), "iterator: %d mismatches", i) + itr.Next() + i++ + } + require.Equal(t, i, len(expected), "expected to have fully iterated over all the elements in iter") + require.NoError(t, itr.Close()) +} + +func int642Bytes(i int64) []byte { + return Uint64ToBigEndian(uint64(i)) +} + +func bytes2Int64(buf []byte) int64 { + return int64(BigEndianToUint64(buf)) +} + +// Backported +// Uint64ToBigEndian - marshals uint64 to a bigendian byte slice so it can be sorted +func Uint64ToBigEndian(i uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, i) + return b +} + +// BigEndianToUint64 returns an uint64 from big endian encoded bytes. If encoding +// is empty, zero is returned. +func BigEndianToUint64(bz []byte) uint64 { + if len(bz) == 0 { + return 0 + } + + return binary.BigEndian.Uint64(bz) +} diff --git a/store/cachekv/internal/memiterator.go b/store/cachekv/internal/memiterator.go new file mode 100644 index 000000000000..cc5a78caab7c --- /dev/null +++ b/store/cachekv/internal/memiterator.go @@ -0,0 +1,119 @@ +package internal + +import ( + "bytes" + "errors" + + "github.com/cosmos/cosmos-sdk/store/types" + "github.com/tidwall/btree" +) + +var _ types.Iterator = (*memIterator)(nil) + +// memIterator iterates over iterKVCache items. +// if value is nil, means it was deleted. +// Implements Iterator. +type memIterator struct { + iter btree.IterG[item] + + start []byte + end []byte + ascending bool + valid bool +} + +func newMemIterator(start, end []byte, items BTree, ascending bool) *memIterator { + iter := items.tree.Iter() + var valid bool + if ascending { + if start != nil { + valid = iter.Seek(newItem(start, nil)) + } else { + valid = iter.First() + } + } else { + if end != nil { + valid = iter.Seek(newItem(end, nil)) + if !valid { + valid = iter.Last() + } else { + // end is exclusive + valid = iter.Prev() + } + } else { + valid = iter.Last() + } + } + + mi := &memIterator{ + iter: iter, + start: start, + end: end, + ascending: ascending, + valid: valid, + } + + if mi.valid { + mi.valid = mi.keyInRange(mi.Key()) + } + + return mi +} + +func (mi *memIterator) Domain() (start, end []byte) { + return mi.start, mi.end +} + +func (mi *memIterator) Close() error { + mi.iter.Release() + return nil +} + +func (mi *memIterator) Error() error { + if !mi.Valid() { + return errors.New("invalid memIterator") + } + return nil +} + +func (mi *memIterator) Valid() bool { + return mi.valid +} + +func (mi *memIterator) Next() { + mi.assertValid() + + if mi.ascending { + mi.valid = mi.iter.Next() + } else { + mi.valid = mi.iter.Prev() + } + + if mi.valid { + mi.valid = mi.keyInRange(mi.Key()) + } +} + +func (mi *memIterator) keyInRange(key []byte) bool { + if mi.ascending && mi.end != nil && bytes.Compare(key, mi.end) >= 0 { + return false + } + if !mi.ascending && mi.start != nil && bytes.Compare(key, mi.start) < 0 { + return false + } + return true +} + +func (mi *memIterator) Key() []byte { + return mi.iter.Item().key +} + +func (mi *memIterator) Value() []byte { + return mi.iter.Item().value +} + +func (mi *memIterator) assertValid() { + if err := mi.Error(); err != nil { + panic(err) + } +} diff --git a/store/cachekv/internal/mergeiterator.go b/store/cachekv/internal/mergeiterator.go new file mode 100644 index 000000000000..8c3d0b0ba945 --- /dev/null +++ b/store/cachekv/internal/mergeiterator.go @@ -0,0 +1,235 @@ +package internal + +import ( + "bytes" + "errors" + + "github.com/cosmos/cosmos-sdk/store/types" +) + +// cacheMergeIterator merges a parent Iterator and a cache Iterator. +// The cache iterator may return nil keys to signal that an item +// had been deleted (but not deleted in the parent). +// If the cache iterator has the same key as the parent, the +// cache shadows (overrides) the parent. +// +// TODO: Optimize by memoizing. +type cacheMergeIterator struct { + parent types.Iterator + cache types.Iterator + ascending bool + + valid bool +} + +var _ types.Iterator = (*cacheMergeIterator)(nil) + +func NewCacheMergeIterator(parent, cache types.Iterator, ascending bool) types.Iterator { + iter := &cacheMergeIterator{ + parent: parent, + cache: cache, + ascending: ascending, + } + + iter.valid = iter.skipUntilExistsOrInvalid() + return iter +} + +// Domain implements Iterator. +// Returns parent domain because cache and parent domains are the same. +func (iter *cacheMergeIterator) Domain() (start, end []byte) { + return iter.parent.Domain() +} + +// Valid implements Iterator. +func (iter *cacheMergeIterator) Valid() bool { + return iter.valid +} + +// Next implements Iterator +func (iter *cacheMergeIterator) Next() { + iter.assertValid() + + switch { + case !iter.parent.Valid(): + // If parent is invalid, get the next cache item. + iter.cache.Next() + case !iter.cache.Valid(): + // If cache is invalid, get the next parent item. + iter.parent.Next() + default: + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + switch iter.compare(keyP, keyC) { + case -1: // parent < cache + iter.parent.Next() + case 0: // parent == cache + iter.parent.Next() + iter.cache.Next() + case 1: // parent > cache + iter.cache.Next() + } + } + iter.valid = iter.skipUntilExistsOrInvalid() +} + +// Key implements Iterator +func (iter *cacheMergeIterator) Key() []byte { + iter.assertValid() + + // If parent is invalid, get the cache key. + if !iter.parent.Valid() { + return iter.cache.Key() + } + + // If cache is invalid, get the parent key. + if !iter.cache.Valid() { + return iter.parent.Key() + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + return keyP + case 0: // parent == cache + return keyP + case 1: // parent > cache + return keyC + default: + panic("invalid compare result") + } +} + +// Value implements Iterator +func (iter *cacheMergeIterator) Value() []byte { + iter.assertValid() + + // If parent is invalid, get the cache value. + if !iter.parent.Valid() { + return iter.cache.Value() + } + + // If cache is invalid, get the parent value. + if !iter.cache.Valid() { + return iter.parent.Value() + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + return iter.parent.Value() + case 0: // parent == cache + return iter.cache.Value() + case 1: // parent > cache + return iter.cache.Value() + default: + panic("invalid comparison result") + } +} + +// Close implements Iterator +func (iter *cacheMergeIterator) Close() error { + err1 := iter.cache.Close() + if err := iter.parent.Close(); err != nil { + return err + } + + return err1 +} + +// Error returns an error if the cacheMergeIterator is invalid defined by the +// Valid method. +func (iter *cacheMergeIterator) Error() error { + if !iter.Valid() { + return errors.New("invalid cacheMergeIterator") + } + + return nil +} + +// If not valid, panics. +// NOTE: May have side-effect of iterating over cache. +func (iter *cacheMergeIterator) assertValid() { + if err := iter.Error(); err != nil { + panic(err) + } +} + +// Like bytes.Compare but opposite if not ascending. +func (iter *cacheMergeIterator) compare(a, b []byte) int { + if iter.ascending { + return bytes.Compare(a, b) + } + + return bytes.Compare(a, b) * -1 +} + +// Skip all delete-items from the cache w/ `key < until`. After this function, +// current cache item is a non-delete-item, or `until <= key`. +// If the current cache item is not a delete item, does nothing. +// If `until` is nil, there is no limit, and cache may end up invalid. +// CONTRACT: cache is valid. +func (iter *cacheMergeIterator) skipCacheDeletes(until []byte) { + for iter.cache.Valid() && + iter.cache.Value() == nil && + (until == nil || iter.compare(iter.cache.Key(), until) < 0) { + iter.cache.Next() + } +} + +// Fast forwards cache (or parent+cache in case of deleted items) until current +// item exists, or until iterator becomes invalid. +// Returns whether the iterator is valid. +func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() bool { + for { + // If parent is invalid, fast-forward cache. + if !iter.parent.Valid() { + iter.skipCacheDeletes(nil) + return iter.cache.Valid() + } + // Parent is valid. + + if !iter.cache.Valid() { + return true + } + // Parent is valid, cache is valid. + + // Compare parent and cache. + keyP := iter.parent.Key() + keyC := iter.cache.Key() + + switch iter.compare(keyP, keyC) { + case -1: // parent < cache. + return true + + case 0: // parent == cache. + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.parent.Next() + iter.cache.Next() + + continue + } + // Cache is not a delete. + + return true // cache exists. + case 1: // cache < parent + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.skipCacheDeletes(keyP) + continue + } + // Cache is not a delete. + + return true // cache exists. + } + } +} diff --git a/store/cachekv/search_benchmark_test.go b/store/cachekv/search_benchmark_test.go new file mode 100644 index 000000000000..4007c7cda202 --- /dev/null +++ b/store/cachekv/search_benchmark_test.go @@ -0,0 +1,44 @@ +package cachekv + +import ( + "strconv" + "testing" + + "github.com/cosmos/cosmos-sdk/store/cachekv/internal" +) + +func BenchmarkLargeUnsortedMisses(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + store := generateStore() + b.StartTimer() + + for k := 0; k < 10000; k++ { + // cache has A + Z values + // these are within range, but match nothing + store.dirtyItems([]byte("B1"), []byte("B2")) + } + } +} + +func generateStore() *Store { + cache := map[string]*cValue{} + unsorted := map[string]struct{}{} + for i := 0; i < 5000; i++ { + key := "A" + strconv.Itoa(i) + unsorted[key] = struct{}{} + cache[key] = &cValue{} + } + + for i := 0; i < 5000; i++ { + key := "Z" + strconv.Itoa(i) + unsorted[key] = struct{}{} + cache[key] = &cValue{} + } + + return &Store{ + cache: cache, + unsortedCache: unsorted, + sortedCache: internal.NewBTree(), + } +} diff --git a/store/cachekv/store.go b/store/cachekv/store.go index f599a01e8533..22d3e27a7670 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -2,25 +2,22 @@ package cachekv import ( "bytes" + "github.com/cosmos/cosmos-sdk/store/cachekv/internal" + "github.com/cosmos/cosmos-sdk/store/listenkv" "io" - "reflect" "sort" "sync" - "time" - "unsafe" dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/internal/conv" - "github.com/cosmos/cosmos-sdk/store/listenkv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" - "github.com/cosmos/cosmos-sdk/telemetry" "github.com/cosmos/cosmos-sdk/types/kv" ) -// If value is nil but deleted is false, it means the parent doesn't have the -// key. (No need to delete upon Write()) +// cValue represents a cached value. +// If dirty is true, it indicates the cached value is different from the underlying value. type cValue struct { value []byte dirty bool @@ -30,9 +27,8 @@ type cValue struct { type Store struct { mtx sync.Mutex cache map[string]*cValue - deleted map[string]struct{} unsortedCache map[string]struct{} - sortedCache *dbm.MemDB // always ascending sorted + sortedCache internal.BTree // always ascending sorted parent types.KVStore } @@ -42,9 +38,8 @@ var _ types.CacheKVStore = (*Store)(nil) func NewStore(parent types.KVStore) *Store { return &Store{ cache: make(map[string]*cValue), - deleted: make(map[string]struct{}), unsortedCache: make(map[string]struct{}), - sortedCache: dbm.NewMemDB(), + sortedCache: internal.NewBTree(), parent: parent, } } @@ -64,7 +59,7 @@ func (store *Store) Get(key []byte) (value []byte) { cacheValue, ok := store.cache[conv.UnsafeBytesToStr(key)] if !ok { value = store.parent.Get(key) - store.setCacheValue(key, value, false, false) + store.setCacheValue(key, value, false) } else { value = cacheValue.value } @@ -73,14 +68,13 @@ func (store *Store) Get(key []byte) (value []byte) { } // Set implements types.KVStore. -func (store *Store) Set(key []byte, value []byte) { - store.mtx.Lock() - defer store.mtx.Unlock() - +func (store *Store) Set(key, value []byte) { types.AssertValidKey(key) types.AssertValidValue(value) - store.setCacheValue(key, value, false, true) + store.mtx.Lock() + defer store.mtx.Unlock() + store.setCacheValue(key, value, true) } // Has implements types.KVStore. @@ -91,19 +85,23 @@ func (store *Store) Has(key []byte) bool { // Delete implements types.KVStore. func (store *Store) Delete(key []byte) { + types.AssertValidKey(key) + store.mtx.Lock() defer store.mtx.Unlock() - defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "delete") - types.AssertValidKey(key) - store.setCacheValue(key, nil, true, true) + store.setCacheValue(key, nil, true) } // Implements Cachetypes.KVStore. func (store *Store) Write() { store.mtx.Lock() defer store.mtx.Unlock() - defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "write") + + if len(store.cache) == 0 && len(store.unsortedCache) == 0 { + store.sortedCache = internal.NewBTree() + return + } // We need a copy of all of the keys. // Not the best, but probably not a bottleneck depending. @@ -120,19 +118,16 @@ func (store *Store) Write() { // TODO: Consider allowing usage of Batch, which would allow the write to // at least happen atomically. for _, key := range keys { - if store.isDeleted(key) { - // We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot - // be sure if the underlying store might do a save with the byteslice or - // not. Once we get confirmation that .Delete is guaranteed not to - // save the byteslice, then we can assume only a read-only copy is sufficient. - store.parent.Delete([]byte(key)) - continue - } - + // We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot + // be sure if the underlying store might do a save with the byteslice or + // not. Once we get confirmation that .Delete is guaranteed not to + // save the byteslice, then we can assume only a read-only copy is sufficient. cacheValue := store.cache[key] if cacheValue.value != nil { - // It already exists in the parent, hence delete it. + // It already exists in the parent, hence update it. store.parent.Set([]byte(key), cacheValue.value) + } else { + store.parent.Delete([]byte(key)) } } @@ -142,13 +137,10 @@ func (store *Store) Write() { for key := range store.cache { delete(store.cache, key) } - for key := range store.deleted { - delete(store.deleted, key) - } for key := range store.unsortedCache { delete(store.unsortedCache, key) } - store.sortedCache = dbm.NewMemDB() + store.sortedCache = internal.NewBTree() } // CacheWrap implements CacheWrapper. @@ -161,11 +153,6 @@ func (store *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types return NewStore(tracekv.NewStore(store, w, tc)) } -// CacheWrapWithListeners implements the CacheWrapper interface. -func (store *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { - return NewStore(listenkv.NewStore(store, storeKey, listeners)) -} - //---------------------------------------- // Iteration @@ -183,18 +170,26 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator { store.mtx.Lock() defer store.mtx.Unlock() - var parent, cache types.Iterator + store.dirtyItems(start, end) + isoSortedCache := store.sortedCache.Copy() + + var ( + err error + parent, cache types.Iterator + ) if ascending { parent = store.parent.Iterator(start, end) + cache, err = isoSortedCache.Iterator(start, end) } else { parent = store.parent.ReverseIterator(start, end) + cache, err = isoSortedCache.ReverseIterator(start, end) + } + if err != nil { + panic(err) } - store.dirtyItems(start, end) - cache = newMemIterator(start, end, store.sortedCache, store.deleted, ascending) - - return newCacheMergeIterator(parent, cache, ascending) + return internal.NewCacheMergeIterator(parent, cache, ascending) } func findStartIndex(strL []string, startQ string) int { @@ -270,22 +265,6 @@ func findEndIndex(strL []string, endQ string) int { return -1 } -//nolint -func findStartEndIndex(strL []string, startStr, endStr string) (int, int) { - // Now find the values within the domain - // [start, end) - startIndex := findStartIndex(strL, startStr) - endIndex := findEndIndex(strL, endStr) - - if endIndex < 0 { - endIndex = len(strL) - 1 - } - if startIndex < 0 { - startIndex = 0 - } - return startIndex, endIndex -} - type sortState int const ( @@ -293,32 +272,12 @@ const ( stateAlreadySorted ) -// strToByte is meant to make a zero allocation conversion -// from string -> []byte to speed up operations, it is not meant -// to be used generally, but for a specific pattern to check for available -// keys within a domain. -func strToByte(s string) []byte { - var b []byte - hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - hdr.Cap = len(s) - hdr.Len = len(s) - hdr.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data - return b -} - -// byteSliceToStr is meant to make a zero allocation conversion -// from []byte -> string to speed up operations, it is not meant -// to be used generally, but for a specific pattern to delete keys -// from a map. -func byteSliceToStr(b []byte) string { - hdr := (*reflect.StringHeader)(unsafe.Pointer(&b)) - return *(*string)(unsafe.Pointer(hdr)) -} +const minSortSize = 1024 // Constructs a slice of dirty items, to use w/ memIterator. func (store *Store) dirtyItems(start, end []byte) { startStr, endStr := conv.UnsafeBytesToStr(start), conv.UnsafeBytesToStr(end) - if startStr > endStr { + if end != nil && startStr > endStr { // Nothing to do here. return } @@ -331,52 +290,100 @@ func (store *Store) dirtyItems(start, end []byte) { // O(N^2) overhead. // Even without that, too many range checks eventually becomes more expensive // than just not having the cache. - if n >= 256 { + if n < minSortSize { for key := range store.unsortedCache { - cacheValue := store.cache[key] - keyBz := strToByte(key) - unsorted = append(unsorted, &kv.Pair{Key: keyBz, Value: cacheValue.value}) - } - } else { - // else do a linear scan to determine if the unsorted pairs are in the pool. - for key := range store.unsortedCache { - keyBz := strToByte(key) - if dbm.IsKeyInDomain(keyBz, start, end) { + // dbm.IsKeyInDomain is nil safe and returns true iff key is greater than start + if dbm.IsKeyInDomain(conv.UnsafeStrToBytes(key), start, end) { cacheValue := store.cache[key] - unsorted = append(unsorted, &kv.Pair{Key: keyBz, Value: cacheValue.value}) + unsorted = append(unsorted, &kv.Pair{Key: []byte(key), Value: cacheValue.value}) } } + store.clearUnsortedCacheSubset(unsorted, stateUnsorted) + return + } + + // Otherwise it is large so perform a modified binary search to find + // the target ranges for the keys that we should be looking for. + strL := make([]string, 0, n) + for key := range store.unsortedCache { + strL = append(strL, key) + } + sort.Strings(strL) + + // Now find the values within the domain + // [start, end) + startIndex := findStartIndex(strL, startStr) + if startIndex < 0 { + startIndex = 0 + } + + var endIndex int + if end == nil { + endIndex = len(strL) - 1 + } else { + endIndex = findEndIndex(strL, endStr) } - store.clearUnsortedCacheSubset(unsorted) + if endIndex < 0 { + endIndex = len(strL) - 1 + } + + // Since we spent cycles to sort the values, we should process and remove a reasonable amount + // ensure start to end is at least minSortSize in size + // if below minSortSize, expand it to cover additional values + // this amortizes the cost of processing elements across multiple calls + if endIndex-startIndex < minSortSize { + endIndex = min(startIndex+minSortSize, len(strL)-1) + if endIndex-startIndex < minSortSize { + startIndex = max(endIndex-minSortSize, 0) + } + } + + kvL := make([]*kv.Pair, 0, 1+endIndex-startIndex) + for i := startIndex; i <= endIndex; i++ { + key := strL[i] + cacheValue := store.cache[key] + kvL = append(kvL, &kv.Pair{Key: []byte(key), Value: cacheValue.value}) + } + + // kvL was already sorted so pass it in as is. + store.clearUnsortedCacheSubset(kvL, stateAlreadySorted) } -func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair) { +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sortState) { n := len(store.unsortedCache) if len(unsorted) == n { // This pattern allows the Go compiler to emit the map clearing idiom for the entire map. for key := range store.unsortedCache { delete(store.unsortedCache, key) } - store.unsortedCache = make(map[string]struct{}, 300) } else { // Otherwise, normally delete the unsorted keys from the map. for _, kv := range unsorted { - delete(store.unsortedCache, byteSliceToStr(kv.Key)) + delete(store.unsortedCache, conv.UnsafeBytesToStr(kv.Key)) } } - sort.Slice(unsorted, func(i, j int) bool { - return bytes.Compare(unsorted[i].Key, unsorted[j].Key) < 0 - }) + + if sortState == stateUnsorted { + sort.Slice(unsorted, func(i, j int) bool { + return bytes.Compare(unsorted[i].Key, unsorted[j].Key) < 0 + }) + } for _, item := range unsorted { - if item.Value == nil { - // deleted element, tracked by store.deleted - // setting arbitrary value - store.sortedCache.Set(item.Key, []byte{}) - continue - } - err := store.sortedCache.Set(item.Key, item.Value) - if err != nil { - panic(err) - } + // sortedCache is able to store `nil` value to represent deleted items. + store.sortedCache.Set(item.Key, item.Value) } } @@ -384,23 +391,18 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair) { // etc // Only entrypoint to mutate store.cache. -func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) { - keyStr := byteSliceToStr(key) +// A `nil` value means a deletion. +func (store *Store) setCacheValue(key, value []byte, dirty bool) { + keyStr := conv.UnsafeBytesToStr(key) store.cache[keyStr] = &cValue{ value: value, dirty: dirty, } - if deleted { - store.deleted[keyStr] = struct{}{} - } else { - delete(store.deleted, keyStr) - } if dirty { store.unsortedCache[keyStr] = struct{}{} } } -func (store *Store) isDeleted(key string) bool { - _, ok := store.deleted[key] - return ok +func (store *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap { + return NewStore(listenkv.NewStore(store, storeKey, listeners)) } diff --git a/store/cachekv/store_bench_test.go b/store/cachekv/store_bench_test.go index 88c86eff564a..01c495614a27 100644 --- a/store/cachekv/store_bench_test.go +++ b/store/cachekv/store_bench_test.go @@ -3,10 +3,9 @@ package cachekv_test import ( "testing" - dbm "github.com/tendermint/tm-db" - "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/dbadapter" + dbm "github.com/tendermint/tm-db" ) var sink interface{} @@ -35,7 +34,8 @@ func benchmarkBlankParentIteratorNext(b *testing.B, keysize int) { iter := kvstore.Iterator(keys[0], keys[b.N]) defer iter.Close() - for _ = iter.Key(); iter.Valid(); iter.Next() { + for ; iter.Valid(); iter.Next() { + _ = iter.Key() // deadcode elimination stub sink = iter } @@ -70,7 +70,8 @@ func benchmarkRandomSet(b *testing.B, keysize int) { // Use a singleton for value, to not waste time computing it value := randSlice(defaultValueSizeBz) - keys := generateRandomKeys(keysize, b.N) + // Add 1 to avoid issues when b.N = 1 + keys := generateRandomKeys(keysize, b.N+1) b.ReportAllocs() b.ResetTimer() @@ -82,7 +83,8 @@ func benchmarkRandomSet(b *testing.B, keysize int) { iter := kvstore.Iterator(keys[0], keys[b.N]) defer iter.Close() - for _ = iter.Key(); iter.Valid(); iter.Next() { + for ; iter.Valid(); iter.Next() { + _ = iter.Key() // deadcode elimination stub sink = iter } @@ -100,7 +102,8 @@ func benchmarkIteratorOnParentWithManyDeletes(b *testing.B, numDeletes int) { // Use simple values for keys, pick a random start, // and take next D keys sequentially after. startKey := randSlice(32) - keys := generateSequentialKeys(startKey, numDeletes) + // Add 1 to avoid issues when numDeletes = 1 + keys := generateSequentialKeys(startKey, numDeletes+1) // setup parent db with D keys. for _, k := range keys { mem.Set(k, value) @@ -118,10 +121,11 @@ func benchmarkIteratorOnParentWithManyDeletes(b *testing.B, numDeletes int) { b.ReportAllocs() b.ResetTimer() - iter := kvstore.Iterator(keys[0], keys[b.N]) + iter := kvstore.Iterator(keys[0], keys[numDeletes]) defer iter.Close() - for _ = iter.Key(); iter.Valid(); iter.Next() { + for ; iter.Valid(); iter.Next() { + _ = iter.Key() // deadcode elimination stub sink = iter } diff --git a/store/cachekv/store_test.go b/store/cachekv/store_test.go index 0404f33f2a28..33b043e66856 100644 --- a/store/cachekv/store_test.go +++ b/store/cachekv/store_test.go @@ -2,15 +2,14 @@ package cachekv_test import ( "fmt" + "github.com/cosmos/cosmos-sdk/store/cachekv" + "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/types" + tmrand "github.com/tendermint/tendermint/libs/rand" "testing" "github.com/stretchr/testify/require" - tmrand "github.com/tendermint/tendermint/libs/rand" dbm "github.com/tendermint/tm-db" - - "github.com/cosmos/cosmos-sdk/store/cachekv" - "github.com/cosmos/cosmos-sdk/store/dbadapter" - "github.com/cosmos/cosmos-sdk/store/types" ) func newCacheKVStore() types.CacheKVStore { @@ -112,7 +111,7 @@ func TestCacheKVIteratorBounds(t *testing.T) { // iterate over all of them itr := st.Iterator(nil, nil) - var i = 0 + i := 0 for ; itr.Valid(); itr.Next() { k, v := itr.Key(), itr.Value() require.Equal(t, keyFmt(i), k) @@ -120,6 +119,7 @@ func TestCacheKVIteratorBounds(t *testing.T) { i++ } require.Equal(t, nItems, i) + require.NoError(t, itr.Close()) // iterate over none itr = st.Iterator(bz("money"), nil) @@ -128,6 +128,7 @@ func TestCacheKVIteratorBounds(t *testing.T) { i++ } require.Equal(t, 0, i) + require.NoError(t, itr.Close()) // iterate over lower itr = st.Iterator(keyFmt(0), keyFmt(3)) @@ -139,6 +140,7 @@ func TestCacheKVIteratorBounds(t *testing.T) { i++ } require.Equal(t, 3, i) + require.NoError(t, itr.Close()) // iterate over upper itr = st.Iterator(keyFmt(2), keyFmt(4)) @@ -150,6 +152,64 @@ func TestCacheKVIteratorBounds(t *testing.T) { i++ } require.Equal(t, 4, i) + require.NoError(t, itr.Close()) +} + +func TestCacheKVReverseIteratorBounds(t *testing.T) { + st := newCacheKVStore() + + // set some items + nItems := 5 + for i := 0; i < nItems; i++ { + st.Set(keyFmt(i), valFmt(i)) + } + + // iterate over all of them + itr := st.ReverseIterator(nil, nil) + i := 0 + for ; itr.Valid(); itr.Next() { + k, v := itr.Key(), itr.Value() + require.Equal(t, keyFmt(nItems-1-i), k) + require.Equal(t, valFmt(nItems-1-i), v) + i++ + } + require.Equal(t, nItems, i) + require.NoError(t, itr.Close()) + + // iterate over none + itr = st.ReverseIterator(bz("money"), nil) + i = 0 + for ; itr.Valid(); itr.Next() { + i++ + } + require.Equal(t, 0, i) + require.NoError(t, itr.Close()) + + // iterate over lower + end := 3 + itr = st.ReverseIterator(keyFmt(0), keyFmt(end)) + i = 0 + for ; itr.Valid(); itr.Next() { + i++ + k, v := itr.Key(), itr.Value() + require.Equal(t, keyFmt(end-i), k) + require.Equal(t, valFmt(end-i), v) + } + require.Equal(t, 3, i) + require.NoError(t, itr.Close()) + + // iterate over upper + end = 4 + itr = st.ReverseIterator(keyFmt(2), keyFmt(end)) + i = 0 + for ; itr.Valid(); itr.Next() { + i++ + k, v := itr.Key(), itr.Value() + require.Equal(t, keyFmt(end-i), k) + require.Equal(t, valFmt(end-i), v) + } + require.Equal(t, 2, i) + require.NoError(t, itr.Close()) } func TestCacheKVMergeIteratorBasics(t *testing.T) { @@ -291,6 +351,25 @@ func TestCacheKVMergeIteratorChunks(t *testing.T) { assertIterateDomainCheck(t, st, truth, []keyRange{{0, 15}, {25, 35}, {38, 40}, {45, 80}}) } +func TestCacheKVMergeIteratorDomain(t *testing.T) { + st := newCacheKVStore() + + itr := st.Iterator(nil, nil) + start, end := itr.Domain() + require.Equal(t, start, end) + require.NoError(t, itr.Close()) + + itr = st.Iterator(keyFmt(40), keyFmt(60)) + start, end = itr.Domain() + require.Equal(t, keyFmt(40), start) + require.Equal(t, keyFmt(60), end) + require.NoError(t, itr.Close()) + + start, end = st.ReverseIterator(keyFmt(0), keyFmt(80)).Domain() + require.Equal(t, keyFmt(0), start) + require.Equal(t, keyFmt(80), end) +} + func TestCacheKVMergeIteratorRandom(t *testing.T) { st := newCacheKVStore() truth := dbm.NewMemDB() @@ -306,6 +385,67 @@ func TestCacheKVMergeIteratorRandom(t *testing.T) { } } +func TestNilEndIterator(t *testing.T) { + const SIZE = 3000 + + tests := []struct { + name string + write bool + startIndex int + end []byte + }{ + {name: "write=false, end=nil", write: false, end: nil, startIndex: 1000}, + {name: "write=false, end=nil; full key scan", write: false, end: nil, startIndex: 2000}, + {name: "write=true, end=nil", write: true, end: nil, startIndex: 1000}, + {name: "write=false, end=non-nil", write: false, end: keyFmt(3000), startIndex: 1000}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + st := newCacheKVStore() + + for i := 0; i < SIZE; i++ { + kstr := keyFmt(i) + st.Set(kstr, valFmt(i)) + } + + if tt.write { + st.Write() + } + + itr := st.Iterator(keyFmt(tt.startIndex), tt.end) + i := tt.startIndex + j := 0 + for itr.Valid() { + require.Equal(t, keyFmt(i), itr.Key()) + require.Equal(t, valFmt(i), itr.Value()) + itr.Next() + i++ + j++ + } + + require.Equal(t, SIZE-tt.startIndex, j) + require.NoError(t, itr.Close()) + }) + } +} + +// TestIteratorDeadlock demonstrate the deadlock issue in cache store. +func TestIteratorDeadlock(t *testing.T) { + mem := dbadapter.Store{DB: dbm.NewMemDB()} + store := cachekv.NewStore(mem) + // the channel buffer is 64 and received once, so put at least 66 elements. + for i := 0; i < 66; i++ { + store.Set([]byte(fmt.Sprintf("key%d", i)), []byte{1}) + } + it := store.Iterator(nil, nil) + defer it.Close() + store.Set([]byte("key20"), []byte{1}) + // it'll be blocked here with previous version, or enable lock on btree. + it2 := store.Iterator(nil, nil) + defer it2.Close() +} + //------------------------------------------------------------------------------------------- // do some random ops @@ -380,7 +520,7 @@ func doRandomOp(t *testing.T, st types.CacheKVStore, truth dbm.DB, maxKey int) { // iterate over whole domain func assertIterateDomain(t *testing.T, st types.KVStore, expectedN int) { itr := st.Iterator(nil, nil) - var i = 0 + i := 0 for ; itr.Valid(); itr.Next() { k, v := itr.Key(), itr.Value() require.Equal(t, keyFmt(i), k) @@ -388,6 +528,7 @@ func assertIterateDomain(t *testing.T, st types.KVStore, expectedN int) { i++ } require.Equal(t, expectedN, i) + require.NoError(t, itr.Close()) } func assertIterateDomainCheck(t *testing.T, st types.KVStore, mem dbm.DB, r []keyRange) { @@ -419,6 +560,8 @@ func assertIterateDomainCheck(t *testing.T, st types.KVStore, mem dbm.DB, r []ke require.False(t, itr.Valid()) require.False(t, itr2.Valid()) + require.NoError(t, itr.Close()) + require.NoError(t, itr2.Close()) } func assertIterateDomainCompare(t *testing.T, st types.KVStore, mem dbm.DB) { @@ -428,6 +571,8 @@ func assertIterateDomainCompare(t *testing.T, st types.KVStore, mem dbm.DB) { require.NoError(t, err) checkIterators(t, itr, itr2) checkIterators(t, itr2, itr) + require.NoError(t, itr.Close()) + require.NoError(t, itr2.Close()) } func checkIterators(t *testing.T, itr, itr2 types.Iterator) {