From 2f5d5840bc431c6c5d50e32da18cc56b545c1703 Mon Sep 17 00:00:00 2001 From: Thomas Jungblut Date: Thu, 25 Jul 2024 15:08:25 +0200 Subject: [PATCH] Split serialization into composition This also moves out copyall, removes the duplicated reload function, adds a more general list func. Signed-off-by: Thomas Jungblut --- db.go | 8 ++- internal/freelist/array.go | 2 +- internal/freelist/array_test.go | 8 +-- internal/freelist/freelist.go | 23 +++---- internal/freelist/freelist_test.go | 32 +++++----- internal/freelist/hashmap.go | 2 +- internal/freelist/hashmap_test.go | 6 +- internal/freelist/read_writer.go | 82 +++++++++++++++++++++++++ internal/freelist/shared.go | 99 +++--------------------------- tx.go | 27 ++++---- tx_check.go | 4 +- 11 files changed, 149 insertions(+), 144 deletions(-) create mode 100644 internal/freelist/read_writer.go diff --git a/db.go b/db.go index 349f187ae..f862031e0 100644 --- a/db.go +++ b/db.go @@ -134,8 +134,9 @@ type DB struct { rwtx *Tx txs []*Tx - freelist fl.Interface - freelistLoad sync.Once + freelist fl.Interface + freelistLoad sync.Once + freelistReadWriter fl.ReadWriter pagePool sync.Pool @@ -417,12 +418,13 @@ func (db *DB) getPageSizeFromSecondMeta() (int, bool, error) { func (db *DB) loadFreelist() { db.freelistLoad.Do(func() { db.freelist = newFreelist(db.FreelistType) + db.freelistReadWriter = fl.NewSortedSerializer() if !db.hasSyncedFreelist() { // Reconstruct free list by scanning the DB. db.freelist.Init(db.freepages()) } else { // Read free list from freelist page. - db.freelist.Read(db.page(db.meta().Freelist())) + db.freelistReadWriter.Read(db.freelist, db.page(db.meta().Freelist())) } db.stats.FreePageN = db.freelist.FreeCount() }) diff --git a/internal/freelist/array.go b/internal/freelist/array.go index 9533d9dfc..f98e45065 100644 --- a/internal/freelist/array.go +++ b/internal/freelist/array.go @@ -61,7 +61,7 @@ func (f *array) FreeCount() int { return len(f.ids) } -func (f *array) freePageIds() common.Pgids { +func (f *array) FreePageIds() common.Pgids { return f.ids } diff --git a/internal/freelist/array_test.go b/internal/freelist/array_test.go index 31b0702dc..b4ccfd9e4 100644 --- a/internal/freelist/array_test.go +++ b/internal/freelist/array_test.go @@ -33,8 +33,8 @@ func TestFreelistArray_allocate(t *testing.T) { if id := int(f.Allocate(1, 0)); id != 0 { t.Fatalf("exp=0; got=%v", id) } - if exp := common.Pgids([]common.Pgid{9, 18}); !reflect.DeepEqual(exp, f.freePageIds()) { - t.Fatalf("exp=%v; got=%v", exp, f.freePageIds()) + if exp := common.Pgids([]common.Pgid{9, 18}); !reflect.DeepEqual(exp, f.FreePageIds()) { + t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds()) } if id := int(f.Allocate(1, 1)); id != 9 { @@ -46,7 +46,7 @@ func TestFreelistArray_allocate(t *testing.T) { if id := int(f.Allocate(1, 1)); id != 0 { t.Fatalf("exp=0; got=%v", id) } - if exp := common.Pgids([]common.Pgid{}); !reflect.DeepEqual(exp, f.freePageIds()) { - t.Fatalf("exp=%v; got=%v", exp, f.freePageIds()) + if exp := common.Pgids([]common.Pgid{}); !reflect.DeepEqual(exp, f.FreePageIds()) { + t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds()) } } diff --git a/internal/freelist/freelist.go b/internal/freelist/freelist.go index fc3e2277e..160077047 100644 --- a/internal/freelist/freelist.go +++ b/internal/freelist/freelist.go @@ -6,14 +6,14 @@ import ( type ReadWriter interface { // Read calls Init with the page ids stored in the given page. - Read(page *common.Page) + Read(f Interface, page *common.Page) // Write writes the freelist into the given page. - Write(page *common.Page) + Write(f Interface, page *common.Page) // EstimatedWritePageSize returns the size of the freelist after serialization in Write. // This should never underestimate the size. - EstimatedWritePageSize() int + EstimatedWritePageSize(f Interface) int } type allocator interface { @@ -23,8 +23,8 @@ type allocator interface { // FreeCount returns the number of free pages. FreeCount() int - // freePageIds returns the IDs of all free pages. - freePageIds() common.Pgids + // FreePageIds returns the IDs of all free pages. + FreePageIds() common.Pgids // mergeSpans is merging the given pages into the freelist mergeSpans(ids common.Pgids) @@ -54,7 +54,6 @@ type txManager interface { } type Interface interface { - ReadWriter allocator txManager @@ -79,13 +78,9 @@ type Interface interface { // Rollback removes the pages from a given pending tx. Rollback(txId common.Txid) - // Copyall copies a list of all free ids and all pending ids in one sorted list. - // f.count returns the minimum length required for dst. - Copyall(dst []common.Pgid) + // List returns a list of all free ids and all pending ids in one sorted list. + List() common.Pgids - // Reload reads the freelist from a page and filters out pending items. - Reload(p *common.Page) - - // NoSyncReload reads the freelist from Pgids and filters out pending items. - NoSyncReload(pgIds common.Pgids) + // Reload reads the freelist from Pgids and filters out pending items. + Reload(pgIds common.Pgids) } diff --git a/internal/freelist/freelist_test.go b/internal/freelist/freelist_test.go index df7c7697e..14f8f4487 100644 --- a/internal/freelist/freelist_test.go +++ b/internal/freelist/freelist_test.go @@ -40,13 +40,13 @@ func TestFreelist_release(t *testing.T) { f.Free(102, common.NewPage(39, 0, 0, 0)) f.release(100) f.release(101) - if exp := common.Pgids([]common.Pgid{9, 12, 13}); !reflect.DeepEqual(exp, f.freePageIds()) { - t.Fatalf("exp=%v; got=%v", exp, f.freePageIds()) + if exp := common.Pgids([]common.Pgid{9, 12, 13}); !reflect.DeepEqual(exp, f.FreePageIds()) { + t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds()) } f.release(102) - if exp := common.Pgids([]common.Pgid{9, 12, 13, 39}); !reflect.DeepEqual(exp, f.freePageIds()) { - t.Fatalf("exp=%v; got=%v", exp, f.freePageIds()) + if exp := common.Pgids([]common.Pgid{9, 12, 13, 39}); !reflect.DeepEqual(exp, f.FreePageIds()) { + t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds()) } } @@ -173,8 +173,8 @@ func TestFreelist_releaseRange(t *testing.T) { f.releaseRange(r.begin, r.end) } - if exp := common.Pgids(c.wantFree); !reflect.DeepEqual(exp, f.freePageIds()) { - t.Errorf("exp=%v; got=%v for %s", exp, f.freePageIds(), c.title) + if exp := common.Pgids(c.wantFree); !reflect.DeepEqual(exp, f.FreePageIds()) { + t.Errorf("exp=%v; got=%v for %s", exp, f.FreePageIds(), c.title) } } } @@ -194,11 +194,12 @@ func TestFreelist_read(t *testing.T) { // Deserialize page into a freelist. f := newTestFreelist() - f.Read(page) + s := SortedSerializer{} + s.Read(f, page) // Ensure that there are two page ids in the freelist. - if exp := common.Pgids([]common.Pgid{23, 50}); !reflect.DeepEqual(exp, f.freePageIds()) { - t.Fatalf("exp=%v; got=%v", exp, f.freePageIds()) + if exp := common.Pgids([]common.Pgid{23, 50}); !reflect.DeepEqual(exp, f.FreePageIds()) { + t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds()) } } @@ -207,21 +208,22 @@ func TestFreelist_write(t *testing.T) { // Create a freelist and write it to a page. var buf [4096]byte f := newTestFreelist() + s := SortedSerializer{} f.Init([]common.Pgid{12, 39}) f.pendingPageIds()[100] = &txPending{ids: []common.Pgid{28, 11}} f.pendingPageIds()[101] = &txPending{ids: []common.Pgid{3}} p := (*common.Page)(unsafe.Pointer(&buf[0])) - f.Write(p) + s.Write(f, p) // Read the page back out. f2 := newTestFreelist() - f2.Read(p) + s.Read(f2, p) // Ensure that the freelist is correct. // All pages should be present and in reverse order. - if exp := common.Pgids([]common.Pgid{3, 11, 12, 28, 39}); !reflect.DeepEqual(exp, f2.freePageIds()) { - t.Fatalf("exp=%v; got=%v", exp, f2.freePageIds()) + if exp := common.Pgids([]common.Pgid{3, 11, 12, 28, 39}); !reflect.DeepEqual(exp, f2.FreePageIds()) { + t.Fatalf("exp=%v; got=%v", exp, f2.FreePageIds()) } } @@ -258,7 +260,7 @@ func Test_freelist_ReadIDs_and_getFreePageIDs(t *testing.T) { f.Init(exp) - if got := f.freePageIds(); !reflect.DeepEqual(exp, got) { + if got := f.FreePageIds(); !reflect.DeepEqual(exp, got) { t.Fatalf("exp=%v; got=%v", exp, got) } @@ -266,7 +268,7 @@ func Test_freelist_ReadIDs_and_getFreePageIDs(t *testing.T) { var exp2 []common.Pgid f2.Init(exp2) - if got2 := f2.freePageIds(); !reflect.DeepEqual(got2, common.Pgids(exp2)) { + if got2 := f2.FreePageIds(); !reflect.DeepEqual(got2, common.Pgids(exp2)) { t.Fatalf("exp2=%#v; got2=%#v", exp2, got2) } diff --git a/internal/freelist/hashmap.go b/internal/freelist/hashmap.go index 83fa6d195..0cbc28875 100644 --- a/internal/freelist/hashmap.go +++ b/internal/freelist/hashmap.go @@ -110,7 +110,7 @@ func (f *hashMap) FreeCount() int { return int(f.freePagesCount) } -func (f *hashMap) freePageIds() common.Pgids { +func (f *hashMap) FreePageIds() common.Pgids { count := f.FreeCount() if count == 0 { return nil diff --git a/internal/freelist/hashmap_test.go b/internal/freelist/hashmap_test.go index 37f1944fb..8c92cdb79 100644 --- a/internal/freelist/hashmap_test.go +++ b/internal/freelist/hashmap_test.go @@ -91,7 +91,7 @@ func TestFreelistHashmap_mergeWithExist(t *testing.T) { f.mergeWithExistingSpan(tt.pgid) - if got := f.freePageIds(); !reflect.DeepEqual(tt.want, got) { + if got := f.FreePageIds(); !reflect.DeepEqual(tt.want, got) { t.Fatalf("name %s; exp=%v; got=%v", tt.name, tt.want, got) } if got := f.forwardMap; !reflect.DeepEqual(tt.wantForwardmap, got) { @@ -121,7 +121,7 @@ func TestFreelistHashmap_GetFreePageIDs(t *testing.T) { } f.forwardMap = fm - res := f.freePageIds() + res := f.FreePageIds() if !sort.SliceIsSorted(res, func(i, j int) bool { return res[i] < res[j] }) { t.Fatalf("pgids not sorted") @@ -145,6 +145,6 @@ func Benchmark_freelist_hashmapGetFreePageIDs(b *testing.B) { b.ReportAllocs() b.ResetTimer() for n := 0; n < b.N; n++ { - f.freePageIds() + f.FreePageIds() } } diff --git a/internal/freelist/read_writer.go b/internal/freelist/read_writer.go new file mode 100644 index 000000000..ad6fd9863 --- /dev/null +++ b/internal/freelist/read_writer.go @@ -0,0 +1,82 @@ +package freelist + +import ( + "fmt" + "sort" + "unsafe" + + "go.etcd.io/bbolt/internal/common" +) + +type SortedSerializer struct { +} + +func (s *SortedSerializer) Read(t Interface, p *common.Page) { + if !p.IsFreelistPage() { + panic(fmt.Sprintf("invalid freelist page: %d, page type is %s", p.Id(), p.Typ())) + } + + ids := p.FreelistPageIds() + + // Copy the list of page ids from the freelist. + if len(ids) == 0 { + t.Init(nil) + } else { + // copy the ids, so we don't modify on the freelist page directly + idsCopy := make([]common.Pgid, len(ids)) + copy(idsCopy, ids) + // Make sure they're sorted. + sort.Sort(common.Pgids(idsCopy)) + + t.Init(idsCopy) + } +} + +func (s *SortedSerializer) EstimatedWritePageSize(t Interface) int { + n := t.Count() + if n >= 0xFFFF { + // The first element will be used to store the count. See freelist.write. + n++ + } + return int(common.PageHeaderSize) + (int(unsafe.Sizeof(common.Pgid(0))) * n) +} + +func (s *SortedSerializer) Write(t Interface, p *common.Page) { + // Combine the old free pgids and pgids waiting on an open transaction. + + // Update the header flag. + p.SetFlags(common.FreelistPageFlag) + + // The page.count can only hold up to 64k elements so if we overflow that + // number then we handle it by putting the size in the first element. + l := t.Count() + if l == 0 { + p.SetCount(uint16(l)) + } else if l < 0xFFFF { + p.SetCount(uint16(l)) + data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p)) + ids := unsafe.Slice((*common.Pgid)(data), l) + copyall(t, ids) + } else { + p.SetCount(0xFFFF) + data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p)) + ids := unsafe.Slice((*common.Pgid)(data), l+1) + ids[0] = common.Pgid(l) + copyall(t, ids[1:]) + } +} + +// copyall copies a list of all free ids and all pending ids in one sorted list. +// f.count returns the minimum length required for dst. +func copyall(t Interface, dst []common.Pgid) { + m := make(common.Pgids, 0, t.PendingCount()) + for _, txp := range t.pendingPageIds() { + m = append(m, txp.ids...) + } + sort.Sort(m) + common.Mergepgids(dst, t.FreePageIds(), m) +} + +func NewSortedSerializer() ReadWriter { + return &SortedSerializer{} +} diff --git a/internal/freelist/shared.go b/internal/freelist/shared.go index 64f575320..aabfddcf5 100644 --- a/internal/freelist/shared.go +++ b/internal/freelist/shared.go @@ -3,8 +3,8 @@ package freelist import ( "fmt" "math" + "slices" "sort" - "unsafe" "go.etcd.io/bbolt/internal/common" ) @@ -202,20 +202,7 @@ func (t *shared) releaseRange(begin, end common.Txid) { t.mergeSpans(m) } -// Copyall copies a list of all free ids and all pending ids in one sorted list. -// f.count returns the minimum length required for dst. -func (t *shared) Copyall(dst []common.Pgid) { - m := make(common.Pgids, 0, t.PendingCount()) - for _, txp := range t.pendingPageIds() { - m = append(m, txp.ids...) - } - sort.Sort(m) - common.Mergepgids(dst, t.freePageIds(), m) -} - -func (t *shared) Reload(p *common.Page) { - t.Read(p) - +func (t *shared) Reload(pgIds common.Pgids) { // Build a cache of only pending pages. pcache := make(map[common.Pgid]bool) for _, txp := range t.pending { @@ -227,7 +214,7 @@ func (t *shared) Reload(p *common.Page) { // Check each page in the freelist and build a new available freelist // with any pages not in the pending lists. var a []common.Pgid - for _, id := range t.freePageIds() { + for _, id := range pgIds { if !pcache[id] { a = append(a, id) } @@ -236,30 +223,19 @@ func (t *shared) Reload(p *common.Page) { t.Init(a) } -func (t *shared) NoSyncReload(pgIds common.Pgids) { - // Build a cache of only pending pages. - pcache := make(map[common.Pgid]bool) - for _, txp := range t.pending { - for _, pendingID := range txp.ids { - pcache[pendingID] = true - } - } - - // Check each page in the freelist and build a new available freelist - // with any pages not in the pending lists. - var a []common.Pgid - for _, id := range pgIds { - if !pcache[id] { - a = append(a, id) - } +func (t *shared) List() common.Pgids { + a := slices.Clone(t.FreePageIds()) + for _, pending := range t.pendingPageIds() { + a = append(a, pending.ids...) } - t.Init(a) + sort.Sort(a) + return a } // reindex rebuilds the free cache based on available and pending free lists. func (t *shared) reindex() { - free := t.freePageIds() + free := t.FreePageIds() pending := t.pendingPageIds() t.cache = make(map[common.Pgid]struct{}, len(free)) for _, id := range free { @@ -271,58 +247,3 @@ func (t *shared) reindex() { } } } - -func (t *shared) Read(p *common.Page) { - if !p.IsFreelistPage() { - panic(fmt.Sprintf("invalid freelist page: %d, page type is %s", p.Id(), p.Typ())) - } - - ids := p.FreelistPageIds() - - // Copy the list of page ids from the freelist. - if len(ids) == 0 { - t.Init(nil) - } else { - // copy the ids, so we don't modify on the freelist page directly - idsCopy := make([]common.Pgid, len(ids)) - copy(idsCopy, ids) - // Make sure they're sorted. - sort.Sort(common.Pgids(idsCopy)) - - t.Init(idsCopy) - } -} - -func (t *shared) EstimatedWritePageSize() int { - n := t.Count() - if n >= 0xFFFF { - // The first element will be used to store the count. See freelist.write. - n++ - } - return int(common.PageHeaderSize) + (int(unsafe.Sizeof(common.Pgid(0))) * n) -} - -func (t *shared) Write(p *common.Page) { - // Combine the old free pgids and pgids waiting on an open transaction. - - // Update the header flag. - p.SetFlags(common.FreelistPageFlag) - - // The page.count can only hold up to 64k elements so if we overflow that - // number then we handle it by putting the size in the first element. - l := t.Count() - if l == 0 { - p.SetCount(uint16(l)) - } else if l < 0xFFFF { - p.SetCount(uint16(l)) - data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p)) - ids := unsafe.Slice((*common.Pgid)(data), l) - t.Copyall(ids) - } else { - p.SetCount(0xFFFF) - data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p)) - ids := unsafe.Slice((*common.Pgid)(data), l+1) - ids[0] = common.Pgid(l) - t.Copyall(ids[1:]) - } -} diff --git a/tx.go b/tx.go index 7b5db7727..fbc583479 100644 --- a/tx.go +++ b/tx.go @@ -285,13 +285,13 @@ func (tx *Tx) Commit() (err error) { func (tx *Tx) commitFreelist() error { // Allocate new pages for the new free list. This will overestimate // the size of the freelist but not underestimate the size (which would be bad). - p, err := tx.allocate((tx.db.freelist.EstimatedWritePageSize() / tx.db.pageSize) + 1) + p, err := tx.allocate((tx.db.freelistReadWriter.EstimatedWritePageSize(tx.db.freelist) / tx.db.pageSize) + 1) if err != nil { tx.rollback() return err } - tx.db.freelist.Write(p) + tx.db.freelistReadWriter.Write(tx.db.freelist, p) tx.meta.SetFreelist(p.Id()) return nil @@ -329,19 +329,24 @@ func (tx *Tx) rollback() { // When mmap fails, the `data`, `dataref` and `datasz` may be reset to // zero values, and there is no way to reload free page IDs in this case. if tx.db.data != nil { - if !tx.db.hasSyncedFreelist() { - // Reconstruct free page list by scanning the DB to get the whole free page list. - // Note: scanning the whole db is heavy if your db size is large in NoSyncFreeList mode. - tx.db.freelist.NoSyncReload(tx.db.freepages()) - } else { - // Read free page list from freelist page. - tx.db.freelist.Reload(tx.db.page(tx.db.meta().Freelist())) - } + tx.reloadFreelist() } } tx.close() } +func (tx *Tx) reloadFreelist() { + if !tx.db.hasSyncedFreelist() { + // Reconstruct free page list by scanning the DB to get the whole free page list. + // Note: scanning the whole db is heavy if your db size is large in NoSyncFreeList mode. + tx.db.freelist.Reload(tx.db.freepages()) + } else { + // Read free page list from freelist page. + tx.db.freelistReadWriter.Read(tx.db.freelist, tx.db.page(tx.db.meta().Freelist())) + tx.db.freelist.Reload(tx.db.freelist.FreePageIds()) + } +} + func (tx *Tx) close() { if tx.db == nil { return @@ -350,7 +355,7 @@ func (tx *Tx) close() { // Grab freelist stats. var freelistFreeN = tx.db.freelist.FreeCount() var freelistPendingN = tx.db.freelist.PendingCount() - var freelistAlloc = tx.db.freelist.EstimatedWritePageSize() + var freelistAlloc = tx.db.freelistReadWriter.EstimatedWritePageSize(tx.db.freelist) // Remove transaction ref & writer lock. tx.db.rwtx = nil diff --git a/tx_check.go b/tx_check.go index c3ecbb975..b8d3bf555 100644 --- a/tx_check.go +++ b/tx_check.go @@ -41,9 +41,7 @@ func (tx *Tx) check(cfg checkConfig, ch chan error) { // Check if any pages are double freed. freed := make(map[common.Pgid]bool) - all := make([]common.Pgid, tx.db.freelist.Count()) - tx.db.freelist.Copyall(all) - for _, id := range all { + for _, id := range tx.db.freelist.List() { if freed[id] { ch <- fmt.Errorf("page %d: already freed", id) }