Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memdb: retain old version nodes of ART to satisfy snapshot read #1503

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 56 additions & 3 deletions internal/unionstore/art/art_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package art

import (
"sync/atomic"
"unsafe"

"github.com/tikv/client-go/v2/internal/unionstore/arena"
Expand All @@ -25,11 +26,23 @@ import (
// reusing blocks reduces the memory pieces.
type nodeArena struct {
arena.MemdbArena

// The ART node will expand to a higher capacity, and the address of the freed node will be stored in the free list for reuse.
// By reusing the freed node, memory usage and fragmentation can be reduced.
freeNode4 []arena.MemdbArenaAddr
freeNode16 []arena.MemdbArenaAddr
freeNode48 []arena.MemdbArenaAddr

// When there is ongoing snapshot iterator, ART should keep the old versions available,
// reuse the node can cause incorrect snapshot read result in this time.
// To avoid reused, freed nodes will be stored in unused slices before the snapshot iterator is closed.
// blockedSnapshotCnt is used to count the ongoing snapshot iterator.
blockedSnapshotCnt atomic.Int64
// isUnusedNodeFreeing protect the free unused nodes process from data race.
isUnusedNodeFreeing atomic.Bool
unusedNode4 []arena.MemdbArenaAddr
unusedNode16 []arena.MemdbArenaAddr
unusedNode48 []arena.MemdbArenaAddr
}

type artAllocator struct {
Expand Down Expand Up @@ -62,7 +75,11 @@ func (f *artAllocator) allocNode4() (arena.MemdbArenaAddr, *node4) {
}

func (f *artAllocator) freeNode4(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, addr)
return
}
f.nodeAllocator.unusedNode4 = append(f.nodeAllocator.unusedNode4, addr)
}

func (f *artAllocator) getNode4(addr arena.MemdbArenaAddr) *node4 {
Expand All @@ -88,7 +105,11 @@ func (f *artAllocator) allocNode16() (arena.MemdbArenaAddr, *node16) {
}

func (f *artAllocator) freeNode16(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, addr)
return
}
f.nodeAllocator.unusedNode16 = append(f.nodeAllocator.unusedNode16, addr)
}

func (f *artAllocator) getNode16(addr arena.MemdbArenaAddr) *node16 {
Expand All @@ -114,7 +135,11 @@ func (f *artAllocator) allocNode48() (arena.MemdbArenaAddr, *node48) {
}

func (f *artAllocator) freeNode48(addr arena.MemdbArenaAddr) {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr)
if f.nodeAllocator.blockedSnapshotCnt.Load() == 0 {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, addr)
return
}
f.nodeAllocator.unusedNode48 = append(f.nodeAllocator.unusedNode48, addr)
}

func (f *artAllocator) getNode48(addr arena.MemdbArenaAddr) *node48 {
Expand Down Expand Up @@ -156,3 +181,31 @@ func (f *artAllocator) getLeaf(addr arena.MemdbArenaAddr) *artLeaf {
data := f.nodeAllocator.GetData(addr)
return (*artLeaf)(unsafe.Pointer(&data[0]))
}

func (f *artAllocator) snapshotInc() {
f.nodeAllocator.blockedSnapshotCnt.Add(1)
}

// freeUnusedNodes will move the unused old version nodes into free list, allow it to be reused.
// This function is called when the snapshot iterator is closed, because read iterators can run concurrently.
func (f *artAllocator) snapshotDec() {
if f.nodeAllocator.blockedSnapshotCnt.Add(-1) != 0 {
return
}
if !f.nodeAllocator.isUnusedNodeFreeing.CompareAndSwap(false, true) {
return
}
if len(f.nodeAllocator.unusedNode4) > 0 {
f.nodeAllocator.freeNode4 = append(f.nodeAllocator.freeNode4, f.nodeAllocator.unusedNode4...)
f.nodeAllocator.unusedNode4 = f.nodeAllocator.unusedNode4[:0]
}
if len(f.nodeAllocator.unusedNode16) > 0 {
f.nodeAllocator.freeNode16 = append(f.nodeAllocator.freeNode16, f.nodeAllocator.unusedNode16...)
f.nodeAllocator.unusedNode16 = f.nodeAllocator.unusedNode16[:0]
}
if len(f.nodeAllocator.unusedNode48) > 0 {
f.nodeAllocator.freeNode48 = append(f.nodeAllocator.freeNode48, f.nodeAllocator.unusedNode48...)
f.nodeAllocator.unusedNode48 = f.nodeAllocator.unusedNode48[:0]
}
f.nodeAllocator.isUnusedNodeFreeing.Store(false)
}
41 changes: 36 additions & 5 deletions internal/unionstore/art/art_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ func (t *ART) SnapshotIter(start, end []byte) *SnapIter {
panic(err)
}
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
Iterator: inner,
cp: t.getSnapshot(),
isCounting: false,
}
for !it.setValue() && it.Valid() {
_ = it.Next()
you06 marked this conversation as resolved.
Show resolved Hide resolved
}
if it.Valid() {
it.counterInc()
}
return it
}

Expand All @@ -62,9 +66,12 @@ func (t *ART) SnapshotIterReverse(k, lowerBound []byte) *SnapIter {
Iterator: inner,
cp: t.getSnapshot(),
}
for !it.setValue() && it.valid {
for !it.setValue() && it.Valid() {
_ = it.Next()
}
if it.Valid() {
it.counterInc()
}
return it
}

Expand All @@ -91,15 +98,21 @@ func (snap *SnapGetter) Get(ctx context.Context, key []byte) ([]byte, error) {

type SnapIter struct {
*Iterator
value []byte
cp arena.MemDBCheckpoint
value []byte
cp arena.MemDBCheckpoint
isCounting bool
}

func (i *SnapIter) Value() []byte {
return i.value
}

func (i *SnapIter) Next() error {
defer func() {
if !i.Valid() {
i.counterDec()
you06 marked this conversation as resolved.
Show resolved Hide resolved
}
}()
i.value = nil
for i.Valid() {
if err := i.Iterator.Next(); err != nil {
Expand All @@ -112,6 +125,11 @@ func (i *SnapIter) Next() error {
return nil
}

func (i *SnapIter) Close() {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
i.Iterator.Close()
i.counterDec()
}

func (i *SnapIter) setValue() bool {
if !i.Valid() {
return false
Expand All @@ -122,3 +140,16 @@ func (i *SnapIter) setValue() bool {
}
return false
}

func (i *SnapIter) counterInc() {
i.isCounting = true
i.tree.allocator.snapshotInc()
}

func (i *SnapIter) counterDec() {
if !i.isCounting {
return
}
i.isCounting = false
i.tree.allocator.snapshotDec()
}
90 changes: 90 additions & 0 deletions internal/unionstore/art/art_snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package art

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
)

func TestSnapshotIteratorPreventFreeNode(t *testing.T) {
check := func(num int) {
tree := New()
for i := 0; i < num; i++ {
tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}
var unusedNodeSlice *[]arena.MemdbArenaAddr
switch num {
case 4:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode4
case 16:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode16
case 48:
unusedNodeSlice = &tree.allocator.nodeAllocator.unusedNode48
default:
panic("unsupported num")
}
it := tree.SnapshotIter(nil, nil)
require.Equal(t, 0, len(*unusedNodeSlice))
tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)})
require.Equal(t, 1, len(*unusedNodeSlice))
it.Close()
require.Equal(t, 0, len(*unusedNodeSlice))
}

check(4)
check(16)
check(48)
}

func TestConcurrentSnapshotIterNoRace(t *testing.T) {
check := func(num int) {
tree := New()
for i := 0; i < num; i++ {
tree.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}

const concurrency = 100
it := tree.SnapshotIter(nil, nil)

tree.Set([]byte{0, byte(num)}, []byte{0, byte(num)})

var wg sync.WaitGroup
wg.Add(concurrency)
go func() {
it.Close()
wg.Done()
}()
for i := 1; i < concurrency; i++ {
go func(it *SnapIter) {
concurrentIt := tree.SnapshotIter(nil, nil)
concurrentIt.Close()
wg.Done()
}(it)
}
wg.Wait()

require.Empty(t, tree.allocator.nodeAllocator.unusedNode4)
require.Empty(t, tree.allocator.nodeAllocator.unusedNode16)
require.Empty(t, tree.allocator.nodeAllocator.unusedNode48)
}

check(4)
check(16)
check(48)
}
36 changes: 36 additions & 0 deletions internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1327,3 +1327,39 @@ func TestSelectValueHistory(t *testing.T) {
check(t, newRbtDBWithContext())
check(t, newArtDBWithContext())
}

func TestSnapshotReaderWithWrite(t *testing.T) {
check := func(db MemBuffer, num int) {
for i := 0; i < num; i++ {
db.Set([]byte{0, byte(i)}, []byte{0, byte(i)})
}
h := db.Staging()
defer db.Release(h)

iter := db.SnapshotIter([]byte{0, 0}, []byte{0, 255})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to Close the iter following the requirement usage pattern in test.

assert.Equal(t, iter.Key(), []byte{0, 0})

db.Set([]byte{0, byte(num)}, []byte{0, byte(num)}) // ART: node4/node16/node48 is freed and wait to be reused.

// ART: reuse the node4/node16/node48
for i := 0; i < num; i++ {
db.Set([]byte{1, byte(i)}, []byte{1, byte(i)})
}

for i := 0; i < num; i++ {
assert.True(t, iter.Valid())
assert.Equal(t, iter.Key(), []byte{0, byte(i)})
assert.Nil(t, iter.Next())
}
assert.False(t, iter.Valid())
}

check(newRbtDBWithContext(), 4)
check(newArtDBWithContext(), 4)

check(newRbtDBWithContext(), 16)
check(newArtDBWithContext(), 16)

check(newRbtDBWithContext(), 48)
check(newArtDBWithContext(), 48)
}
Loading