From 5fd4bd399b5de35beb1f82cf04853e37ffcd2770 Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Thu, 22 Aug 2024 15:48:04 +0530 Subject: [PATCH 1/6] Changed logic to insert entries into shard list --- src/dbnode/storage/shard.go | 102 ++++++++++++++++++++++- src/dbnode/storage/shard_insert_queue.go | 17 +++- 2 files changed, 116 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 8b4ddabda7..1ac75ec041 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1409,12 +1409,109 @@ func (s *dbShard) insertSeriesSync( return newEntry, nil } +func (s *dbShard) insertNewShardEntriesWithLock(entries []*Entry) { + if len(entries) == 0 { + return + } + + // Fast Path: Check if the entire slice can be appended to the end of the list + if s.canAppendToEnd(entries[0]) { + s.appendEntriesToEnd(entries) + return + } + + // If not, proceed with the standard insertion logic + elem := s.list.Back() + i := len(entries) - 1 + + for elem != nil && i >= 0 { + currListEntry := elem.Value.(*Entry) + + insertIdx := s.findInsertionIndex(entries, currListEntry.Index, 0, i) + + if insertIdx < len(entries) { + i = s.insertEntriesAfter(elem, entries, insertIdx, i) + } + + elem = elem.Prev() + } + + s.insertRemainingEntriesAtFront(entries, i) +} + +// Helper function to check if we can append the entire slice to the end of the list +func (s *dbShard) canAppendToEnd(firstEntry *Entry) bool { + lastListElem := s.list.Back() + if lastListElem == nil { + return false + } + lastListEntry := lastListElem.Value.(*Entry) + return firstEntry.Index > lastListEntry.Index +} + +// Helper function to append all entries to the end of the list +func (s *dbShard) appendEntriesToEnd(entries []*Entry) { + for _, entry := range entries { + s.insertNewShardEntryWithLock(entry) + } +} + +// Helper function to find the correct insertion index using binary search +func (s *dbShard) findInsertionIndex(entries []*Entry, targetIndex uint64, start, end int) int { + for start <= end { + mid := (start + end) / 2 + if entries[mid].Index > targetIndex { + end = mid - 1 + } else { + start = mid + 1 + } + } + return start +} + +// Helper function to insert entries after a given entry with indexes [start,end] +func (s *dbShard) insertEntriesAfter(elem *list.Element, entries []*Entry, start, end int) int { + currElem := elem + for j := start; j <= end; j++ { + entry := entries[j] + currElem = s.list.InsertAfter(entry, currElem) + s.insertInLookupMapWithLock(entry.Series.ID(), currElem) + } + return start - 1 +} + +// Helper function to insert any remaining entries at the front of the list +func (s *dbShard) insertRemainingEntriesAtFront(entries []*Entry, i int) { + for i >= 0 { + entry := entries[i] + elem := s.list.PushFront(entry) + s.insertInLookupMapWithLock(entry.Series.ID(), elem) + i-- + } +} + +func (s *dbShard) insertInLookupMapWithLock(id ident.ID, element *list.Element) { + s.lookup.SetUnsafe(id, element, shardMapSetUnsafeOptions{ + NoCopyKey: true, + NoFinalizeKey: true, + }) + element.Value.(*Entry).SetInsertTime(s.nowFn()) +} + func (s *dbShard) insertNewShardEntryWithLock(entry *Entry) { // Set the lookup value, we use the copied ID and since it is GC'd // we explicitly set it with options to not copy the key and not to // finalize it. copiedID := entry.Series.ID() - listElem := s.list.PushBack(entry) + listElem := s.list.Back() + if listElem == nil || listElem.Value.(*Entry).Index < entry.Index { + listElem = s.list.PushBack(entry) + } else { + for listElem != nil && listElem.Value.(*Entry).Index > entry.Index { + listElem = listElem.Prev() + } + listElem = s.list.InsertAfter(entry, listElem) + } s.lookup.SetUnsafe(copiedID, listElem, shardMapSetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: true, @@ -1426,6 +1523,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { var ( anyPendingAction = false numPendingIndexing = 0 + entriesToInsert []*Entry ) s.Lock() @@ -1484,7 +1582,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { // Insert still pending, perform the insert entry = inserts[i].entry - s.insertNewShardEntryWithLock(entry) + entriesToInsert = append(entriesToInsert, entry) } s.Unlock() diff --git a/src/dbnode/storage/shard_insert_queue.go b/src/dbnode/storage/shard_insert_queue.go index 4440556a85..aa3608f4da 100644 --- a/src/dbnode/storage/shard_insert_queue.go +++ b/src/dbnode/storage/shard_insert_queue.go @@ -22,6 +22,7 @@ package storage import ( "errors" + "sort" "strconv" "sync" "time" @@ -51,6 +52,19 @@ var ( errNewSeriesInsertRateLimitExceeded = errors.New("shard insert of new series exceeds rate limit") ) +type dbShardInsertByEntryIndex []dbShardInsert + +func (d dbShardInsertByEntryIndex) Len() int { + return len(d) +} + +func (d dbShardInsertByEntryIndex) Less(i, j int) bool { + return d[i].entry.Index < d[j].entry.Index +} +func (d dbShardInsertByEntryIndex) Swap(i, j int) { + d[i], d[j] = d[j], d[i] +} + type dbShardInsertQueueState int const ( @@ -209,7 +223,8 @@ func (q *dbShardInsertQueue) insertLoop() { allInserts = append(allInserts, batchByCPUCore.inserts...) batchByCPUCore.Unlock() } - + // sort the shard inserts by entry index + sort.Sort(dbShardInsertByEntryIndex(allInserts)) err := q.insertEntryBatchFn(allInserts) if err != nil { q.metrics.insertsBatchErrors.Inc(1) From 0d5fa325191dfc11286c0f8b87882124d659a11f Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Thu, 22 Aug 2024 19:20:29 +0530 Subject: [PATCH 2/6] Fixed code --- src/dbnode/storage/shard.go | 3 +++ src/dbnode/storage/shard_insert_queue.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 1ac75ec041..28db357aec 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1506,6 +1506,8 @@ func (s *dbShard) insertNewShardEntryWithLock(entry *Entry) { listElem := s.list.Back() if listElem == nil || listElem.Value.(*Entry).Index < entry.Index { listElem = s.list.PushBack(entry) + } else if elem := s.list.Front(); elem == nil || elem.Value.(*Entry).Index > entry.Index { + listElem = s.list.PushFront(entry) } else { for listElem != nil && listElem.Value.(*Entry).Index > entry.Index { listElem = listElem.Prev() @@ -1584,6 +1586,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { entry = inserts[i].entry entriesToInsert = append(entriesToInsert, entry) } + s.insertNewShardEntriesWithLock(entriesToInsert) s.Unlock() if !anyPendingAction { diff --git a/src/dbnode/storage/shard_insert_queue.go b/src/dbnode/storage/shard_insert_queue.go index aa3608f4da..04ae94188a 100644 --- a/src/dbnode/storage/shard_insert_queue.go +++ b/src/dbnode/storage/shard_insert_queue.go @@ -59,6 +59,9 @@ func (d dbShardInsertByEntryIndex) Len() int { } func (d dbShardInsertByEntryIndex) Less(i, j int) bool { + if d[i].entry == nil || d[j].entry == nil { + return true + } return d[i].entry.Index < d[j].entry.Index } func (d dbShardInsertByEntryIndex) Swap(i, j int) { From c0d45ea0030acab6fcaf32275570d883773e6736 Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Wed, 11 Sep 2024 11:18:03 +0530 Subject: [PATCH 3/6] Refactored code --- src/dbnode/storage/shard.go | 49 ++++++++++++++----------------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 28db357aec..92be5fb184 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1377,7 +1377,7 @@ func (s *dbShard) insertSeriesSync( return existingEntry, nil } - s.insertNewShardEntryWithLock(newEntry) + s.insertNewShardEntriesWithLock([]*Entry{newEntry}) // Track unlocking. unlocked = true @@ -1409,38 +1409,40 @@ func (s *dbShard) insertSeriesSync( return newEntry, nil } +// insertNewShardEntriesWithLock inserts the entries to shard. +// The entries passed to the function ae assumed to be sorted by index. func (s *dbShard) insertNewShardEntriesWithLock(entries []*Entry) { if len(entries) == 0 { return } // Fast Path: Check if the entire slice can be appended to the end of the list - if s.canAppendToEnd(entries[0]) { - s.appendEntriesToEnd(entries) + if s.canAppendToEndWithLock(entries[0]) { + s.appendEntriesToEndWithLock(entries) return } // If not, proceed with the standard insertion logic elem := s.list.Back() - i := len(entries) - 1 - for elem != nil && i >= 0 { + for elem != nil && len(entries) != 0 { currListEntry := elem.Value.(*Entry) - insertIdx := s.findInsertionIndex(entries, currListEntry.Index, 0, i) + insertIdx := s.findInsertionIndexWithLock(entries, currListEntry.Index) if insertIdx < len(entries) { - i = s.insertEntriesAfter(elem, entries, insertIdx, i) + s.insertEntriesAfterWithLock(elem, entries[insertIdx:]) + entries = entries[:insertIdx] } elem = elem.Prev() } - s.insertRemainingEntriesAtFront(entries, i) + s.insertEntriesAtFrontWithLock(entries) } // Helper function to check if we can append the entire slice to the end of the list -func (s *dbShard) canAppendToEnd(firstEntry *Entry) bool { +func (s *dbShard) canAppendToEndWithLock(firstEntry *Entry) bool { lastListElem := s.list.Back() if lastListElem == nil { return false @@ -1450,14 +1452,15 @@ func (s *dbShard) canAppendToEnd(firstEntry *Entry) bool { } // Helper function to append all entries to the end of the list -func (s *dbShard) appendEntriesToEnd(entries []*Entry) { +func (s *dbShard) appendEntriesToEndWithLock(entries []*Entry) { for _, entry := range entries { s.insertNewShardEntryWithLock(entry) } } // Helper function to find the correct insertion index using binary search -func (s *dbShard) findInsertionIndex(entries []*Entry, targetIndex uint64, start, end int) int { +func (s *dbShard) findInsertionIndexWithLock(entries []*Entry, targetIndex uint64) int { + start, end := 0, len(entries)-1 for start <= end { mid := (start + end) / 2 if entries[mid].Index > targetIndex { @@ -1470,23 +1473,19 @@ func (s *dbShard) findInsertionIndex(entries []*Entry, targetIndex uint64, start } // Helper function to insert entries after a given entry with indexes [start,end] -func (s *dbShard) insertEntriesAfter(elem *list.Element, entries []*Entry, start, end int) int { +func (s *dbShard) insertEntriesAfterWithLock(elem *list.Element, entries []*Entry) { currElem := elem - for j := start; j <= end; j++ { - entry := entries[j] + for _, entry := range entries { currElem = s.list.InsertAfter(entry, currElem) s.insertInLookupMapWithLock(entry.Series.ID(), currElem) } - return start - 1 } // Helper function to insert any remaining entries at the front of the list -func (s *dbShard) insertRemainingEntriesAtFront(entries []*Entry, i int) { - for i >= 0 { - entry := entries[i] +func (s *dbShard) insertEntriesAtFrontWithLock(entries []*Entry) { + for _, entry := range entries { elem := s.list.PushFront(entry) s.insertInLookupMapWithLock(entry.Series.ID(), elem) - i-- } } @@ -1503,17 +1502,7 @@ func (s *dbShard) insertNewShardEntryWithLock(entry *Entry) { // we explicitly set it with options to not copy the key and not to // finalize it. copiedID := entry.Series.ID() - listElem := s.list.Back() - if listElem == nil || listElem.Value.(*Entry).Index < entry.Index { - listElem = s.list.PushBack(entry) - } else if elem := s.list.Front(); elem == nil || elem.Value.(*Entry).Index > entry.Index { - listElem = s.list.PushFront(entry) - } else { - for listElem != nil && listElem.Value.(*Entry).Index > entry.Index { - listElem = listElem.Prev() - } - listElem = s.list.InsertAfter(entry, listElem) - } + listElem := s.list.PushBack(entry) s.lookup.SetUnsafe(copiedID, listElem, shardMapSetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: true, From 4e24b471882046e61082567d5c4444e8b59d7b7a Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Wed, 18 Sep 2024 14:21:07 +0530 Subject: [PATCH 4/6] Added integration and unit test --- ...on_fetch_bootstrap_blocks_metadata_test.go | 145 ++++++++++++++++++ src/dbnode/integration/options.go | 21 +++ src/dbnode/integration/setup.go | 4 +- .../shard_fetch_blocks_metadata_test.go | 66 ++++++++ 4 files changed, 234 insertions(+), 2 deletions(-) create mode 100644 src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go diff --git a/src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go b/src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go new file mode 100644 index 0000000000..9aad10c6a1 --- /dev/null +++ b/src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go @@ -0,0 +1,145 @@ +package integration + +import ( + "fmt" + "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" + "github.com/stretchr/testify/require" + "sync" + "testing" + "time" +) + +func TestAdminSessionFetchBootstrapBlocksMetadataFromPeer(t *testing.T) { + + if testing.Short() { + t.SkipNow() + } + + numOfActiveSeries := 1000 + writeBatchSize := 100 + readBatchSize := 10 + + testOpts := NewTestOptions(t).SetUseTChannelClientForWriting(true).SetNumShards(1).SetFetchSeriesBlocksBatchSize(readBatchSize) + testSetup, err := NewTestSetup(t, testOpts, nil) + require.NoError(t, err) + defer testSetup.Close() + + // Start the server + log := testSetup.StorageOpts().InstrumentOptions().Logger() + require.NoError(t, testSetup.StartServer()) + + // Stop the server + defer func() { + require.NoError(t, testSetup.StopServer()) + log.Debug("server is now down") + }() + + start := testSetup.NowFn()() + testSetup.SetNowFn(start) + + // Write test data + writeTestData(t, testSetup, testNamespaces[0], start, numOfActiveSeries, writeBatchSize) + + testSetup.SetNowFn(testSetup.NowFn()().Add(10 * time.Minute)) + end := testSetup.NowFn()() + + // Fetch and verify metadata + observedSeries := getTestSetupBootstrapBlocksMetadata(t, testSetup, testNamespaces[0], start, end) + verifySeriesMetadata(t, numOfActiveSeries, observedSeries) +} + +func writeTestData(t *testing.T, testSetup TestSetup, namespace ident.ID, start xtime.UnixNano, numOfSeries int, batchSize int) { + var wg sync.WaitGroup + + for i := 0; i < numOfSeries; i += batchSize { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < batchSize; j++ { + id := fmt.Sprintf("foo_%d_%d", i, j) + currInput := generate.BlockConfig{IDs: []string{id}, Start: start, NumPoints: 5} + testData := generate.Block(currInput) + require.NoError(t, testSetup.WriteBatch(namespace, testData)) + } + }() + } + wg.Wait() +} + +func getTestSetupBootstrapBlocksMetadata(t *testing.T, + testSetup TestSetup, + namespace ident.ID, + start xtime.UnixNano, + end xtime.UnixNano, +) map[string]int { + adminClient := testSetup.M3DBVerificationAdminClient() + metadatasByShard, err := m3dbClientFetchBootstrapBlocksMetadata(adminClient, + namespace, testSetup.ShardSet().AllIDs(), start, end) + require.NoError(t, err) + + // Setup only has one shard + return getTestsSetupSeriesMetadataMap(metadatasByShard[0]) +} + +func getTestsSetupSeriesMetadataMap(metadatas []block.ReplicaMetadata) map[string]int { + seriesMap := make(map[string]int, 100000) + for _, block := range metadatas { + idString := block.ID.String() + seriesMap[idString]++ + } + return seriesMap +} + +func m3dbClientFetchBootstrapBlocksMetadata( + c client.AdminClient, + namespace ident.ID, + shards []uint32, + start, end xtime.UnixNano, +) (map[uint32][]block.ReplicaMetadata, error) { + session, err := c.DefaultAdminSession() + if err != nil { + return nil, err + } + metadatasByShard := make(map[uint32][]block.ReplicaMetadata, 100000) + for _, shardID := range shards { + + var metadatas []block.ReplicaMetadata + iter, err := session.FetchBootstrapBlocksMetadataFromPeers(namespace, + shardID, start, end, result.NewOptions()) + if err != nil { + return nil, err + } + + for iter.Next() { + host, blockMetadata := iter.Current() + metadatas = append(metadatas, block.ReplicaMetadata{ + Metadata: blockMetadata, + Host: host, + }) + } + if err := iter.Err(); err != nil { + return nil, err + } + + if metadatas != nil { + metadatasByShard[shardID] = metadatas + } + } + return metadatasByShard, nil +} + +func verifySeriesMetadata( + t *testing.T, + expectedTotalSeries int, + observed map[string]int, +) { + require.Equal(t, expectedTotalSeries, len(observed)) + for expectedSeries, count := range observed { + require.Equal(t, 1, count, "expected series %s metadata was expected to be observed once", expectedSeries) + } +} diff --git a/src/dbnode/integration/options.go b/src/dbnode/integration/options.go index cf346fc2d2..5702ea82bd 100644 --- a/src/dbnode/integration/options.go +++ b/src/dbnode/integration/options.go @@ -70,6 +70,9 @@ const ( // defaultUseTChannelClientForWriting determines whether we use the tchannel client for writing by default. defaultUseTChannelClientForWriting = false + // defaultFetchSeriesBlocksBatchSize is the default fetch series blocks batch size + defaultFetchSeriesBlocksBatchSize = 4096 + // defaultUseTChannelClientForTruncation determines whether we use the tchannel client for truncation by default. defaultUseTChannelClientForTruncation = true @@ -335,6 +338,12 @@ type TestOptions interface { // CustomAdminOptions gets custom options to apply to the admin client connection. CustomAdminOptions() []client.CustomAdminOption + + // SetFetchSeriesBlocksBatchSize sets the batch size for fetching series blocks in batch. + SetFetchSeriesBlocksBatchSize(value int) TestOptions + + // FetchSeriesBlocksBatchSize gets the batch size for fetching series blocks in batch. + FetchSeriesBlocksBatchSize() int } type options struct { @@ -370,6 +379,7 @@ type options struct { writeNewSeriesAsync bool protoEncoding bool shardLeavingAndInitializingCountsTowardConsistency bool + fetchSeriesBlocksBatchSize int assertEqual assertTestDataEqual nowFn func() time.Time reportInterval time.Duration @@ -410,6 +420,7 @@ func NewTestOptions(t *testing.T) TestOptions { useTChannelClientForTruncation: defaultUseTChannelClientForTruncation, writeNewSeriesAsync: defaultWriteNewSeriesAsync, reportInterval: defaultReportInterval, + fetchSeriesBlocksBatchSize: defaultFetchSeriesBlocksBatchSize, } } @@ -785,3 +796,13 @@ func (o *options) SetCustomAdminOptions(value []client.CustomAdminOption) TestOp func (o *options) CustomAdminOptions() []client.CustomAdminOption { return o.customAdminOpts } + +func (o *options) SetFetchSeriesBlocksBatchSize(value int) TestOptions { + opts := *o + opts.fetchSeriesBlocksBatchSize = value + return &opts +} + +func (o *options) FetchSeriesBlocksBatchSize() int { + return o.fetchSeriesBlocksBatchSize +} diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index e191199cf6..8049f50a79 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -1144,9 +1144,9 @@ func newClients( origin = newOrigin(id, tchannelNodeAddr) verificationOrigin = newOrigin(id+"-verification", tchannelNodeAddr) - adminOpts = clientOpts.(client.AdminOptions).SetOrigin(origin).SetSchemaRegistry(schemaReg) + adminOpts = clientOpts.(client.AdminOptions).SetOrigin(origin).SetSchemaRegistry(schemaReg).SetFetchSeriesBlocksBatchSize(opts.FetchSeriesBlocksBatchSize()) - verificationAdminOpts = adminOpts.SetOrigin(verificationOrigin).SetSchemaRegistry(schemaReg) + verificationAdminOpts = adminOpts.SetOrigin(verificationOrigin).SetSchemaRegistry(schemaReg).SetFetchSeriesBlocksBatchSize(opts.FetchSeriesBlocksBatchSize()) ) if opts.ProtoEncoding() { diff --git a/src/dbnode/storage/shard_fetch_blocks_metadata_test.go b/src/dbnode/storage/shard_fetch_blocks_metadata_test.go index cf609abb61..9680999034 100644 --- a/src/dbnode/storage/shard_fetch_blocks_metadata_test.go +++ b/src/dbnode/storage/shard_fetch_blocks_metadata_test.go @@ -25,6 +25,7 @@ import ( "fmt" "sort" "strconv" + "sync" "testing" "time" @@ -297,3 +298,68 @@ func TestShardFetchBlocksMetadataV2WithSeriesCachePolicyNotCacheAll(t *testing.T } } } + +func TestShardFetchBlocksMetadata(t *testing.T) { + opts := DefaultTestOptions().SetSeriesCachePolicy(series.CacheRecentlyRead) + ctx := opts.ContextPool().Get() + defer ctx.Close() + + numOfActiveSeries := 1000 + writeBatchSize := 100 + readBatchSize := 10 + + shard := testDatabaseShard(t, opts) + defer shard.Close() + + nowFn := opts.ClockOptions().NowFn() + start := nowFn() + var wg sync.WaitGroup + for i := 0; i < numOfActiveSeries; i += writeBatchSize { + wg.Add(1) + go func(i int) { + defer wg.Done() + for j := 0; j < writeBatchSize; j++ { + id := fmt.Sprintf("foo=%d_%d", i, j) + _, err := shard.Write(ctx, ident.StringID(id), xtime.ToUnixNano(nowFn()), + 1.0, xtime.Second, nil, series.WriteOptions{}) + require.NoError(t, err) + } + }(i) + } + wg.Wait() + + end := nowFn() + var ( + encodedPageToken PageToken + err error + result block.FetchBlocksMetadataResults + ) + + fetchMetadata := true + observedIds := make(map[string]int, numOfActiveSeries) + fetchOpts := block.FetchBlocksMetadataOptions{ + IncludeSizes: true, + IncludeChecksums: true, + IncludeLastRead: true, + } + + for i := 0; i < numOfActiveSeries; i += readBatchSize { + if fetchMetadata { + result, encodedPageToken, err = shard.FetchBlocksMetadataV2(ctx, xtime.ToUnixNano(start), xtime.ToUnixNano(end), int64(readBatchSize), encodedPageToken, fetchOpts) + require.NoError(t, err) + for _, res := range result.Results() { + observedIds[res.ID.String()]++ + } + } else { + break + } + if encodedPageToken == nil { + fetchMetadata = false + } + } + + require.Equal(t, numOfActiveSeries, len(observedIds)) + for id, _ := range observedIds { + require.Equal(t, 1, observedIds[id]) + } +} From 2b55faf2c0736af3b8154ad1ebbb232e47629af9 Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Fri, 20 Sep 2024 11:29:18 +0530 Subject: [PATCH 5/6] Updated the code with extra field in entry logic --- src/dbnode/storage/entry.go | 2 + src/dbnode/storage/shard.go | 133 ++++-------------- .../shard_fetch_blocks_metadata_test.go | 5 +- src/dbnode/storage/shard_insert_queue.go | 19 --- src/dbnode/storage/shard_test.go | 20 +-- 5 files changed, 40 insertions(+), 139 deletions(-) diff --git a/src/dbnode/storage/entry.go b/src/dbnode/storage/entry.go index 5e0967ad6f..882cea3041 100644 --- a/src/dbnode/storage/entry.go +++ b/src/dbnode/storage/entry.go @@ -133,6 +133,8 @@ type Entry struct { nowFn clock.NowFn metrics *EntryMetrics pendingIndexBatchSizeOne []writes.PendingIndexInsert + // Index assigned to entry while adding it to the shard + indexInShard xatomic.Uint64 } // ensure Entry satisfies the `doc.OnIndexSeries` interface. diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 92be5fb184..6547abd517 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -115,20 +115,22 @@ const ( type dbShard struct { sync.RWMutex block.DatabaseBlockRetriever - opts Options - seriesOpts series.Options - nowFn clock.NowFn - state dbShardState - namespace namespace.Metadata - seriesBlockRetriever series.QueryableBlockRetriever - seriesOnRetrieveBlock block.OnRetrieveBlock - namespaceReaderMgr databaseNamespaceReaderManager - increasingIndex increasingIndex - seriesPool series.DatabaseSeriesPool - reverseIndex NamespaceIndex - insertQueue *dbShardInsertQueue - lookup *shardMap - list *list.List + opts Options + seriesOpts series.Options + nowFn clock.NowFn + state dbShardState + namespace namespace.Metadata + seriesBlockRetriever series.QueryableBlockRetriever + seriesOnRetrieveBlock block.OnRetrieveBlock + namespaceReaderMgr databaseNamespaceReaderManager + increasingIndex increasingIndex + seriesPool series.DatabaseSeriesPool + reverseIndex NamespaceIndex + insertQueue *dbShardInsertQueue + lookup *shardMap + list *list.List + // protected by dbShard lock + lastEntryIndex uint64 bootstrapState BootstrapState newMergerFn fs.NewMergerFn newFSMergeWithMemFn newFSMergeWithMemFn @@ -1377,7 +1379,7 @@ func (s *dbShard) insertSeriesSync( return existingEntry, nil } - s.insertNewShardEntriesWithLock([]*Entry{newEntry}) + s.insertNewShardEntryWithLock(newEntry) // Track unlocking. unlocked = true @@ -1409,100 +1411,16 @@ func (s *dbShard) insertSeriesSync( return newEntry, nil } -// insertNewShardEntriesWithLock inserts the entries to shard. -// The entries passed to the function ae assumed to be sorted by index. -func (s *dbShard) insertNewShardEntriesWithLock(entries []*Entry) { - if len(entries) == 0 { - return - } - - // Fast Path: Check if the entire slice can be appended to the end of the list - if s.canAppendToEndWithLock(entries[0]) { - s.appendEntriesToEndWithLock(entries) - return - } - - // If not, proceed with the standard insertion logic - elem := s.list.Back() - - for elem != nil && len(entries) != 0 { - currListEntry := elem.Value.(*Entry) - - insertIdx := s.findInsertionIndexWithLock(entries, currListEntry.Index) - - if insertIdx < len(entries) { - s.insertEntriesAfterWithLock(elem, entries[insertIdx:]) - entries = entries[:insertIdx] - } - - elem = elem.Prev() - } - - s.insertEntriesAtFrontWithLock(entries) -} - -// Helper function to check if we can append the entire slice to the end of the list -func (s *dbShard) canAppendToEndWithLock(firstEntry *Entry) bool { - lastListElem := s.list.Back() - if lastListElem == nil { - return false - } - lastListEntry := lastListElem.Value.(*Entry) - return firstEntry.Index > lastListEntry.Index -} - -// Helper function to append all entries to the end of the list -func (s *dbShard) appendEntriesToEndWithLock(entries []*Entry) { - for _, entry := range entries { - s.insertNewShardEntryWithLock(entry) - } -} - -// Helper function to find the correct insertion index using binary search -func (s *dbShard) findInsertionIndexWithLock(entries []*Entry, targetIndex uint64) int { - start, end := 0, len(entries)-1 - for start <= end { - mid := (start + end) / 2 - if entries[mid].Index > targetIndex { - end = mid - 1 - } else { - start = mid + 1 - } - } - return start -} - -// Helper function to insert entries after a given entry with indexes [start,end] -func (s *dbShard) insertEntriesAfterWithLock(elem *list.Element, entries []*Entry) { - currElem := elem - for _, entry := range entries { - currElem = s.list.InsertAfter(entry, currElem) - s.insertInLookupMapWithLock(entry.Series.ID(), currElem) - } -} - -// Helper function to insert any remaining entries at the front of the list -func (s *dbShard) insertEntriesAtFrontWithLock(entries []*Entry) { - for _, entry := range entries { - elem := s.list.PushFront(entry) - s.insertInLookupMapWithLock(entry.Series.ID(), elem) - } -} - -func (s *dbShard) insertInLookupMapWithLock(id ident.ID, element *list.Element) { - s.lookup.SetUnsafe(id, element, shardMapSetUnsafeOptions{ - NoCopyKey: true, - NoFinalizeKey: true, - }) - element.Value.(*Entry).SetInsertTime(s.nowFn()) -} - func (s *dbShard) insertNewShardEntryWithLock(entry *Entry) { // Set the lookup value, we use the copied ID and since it is GC'd // we explicitly set it with options to not copy the key and not to // finalize it. copiedID := entry.Series.ID() listElem := s.list.PushBack(entry) + // It is important to keep increasing order of shard index since it's used for cursor pagination filtering + // when peers bootstrapping in memory block. + s.lastEntryIndex++ + entry.indexInShard.Store(s.lastEntryIndex) s.lookup.SetUnsafe(copiedID, listElem, shardMapSetUnsafeOptions{ NoCopyKey: true, NoFinalizeKey: true, @@ -1514,7 +1432,6 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { var ( anyPendingAction = false numPendingIndexing = 0 - entriesToInsert []*Entry ) s.Lock() @@ -1570,12 +1487,10 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error { s.metrics.insertAsyncInsertErrors.Inc(int64(len(inserts) - i)) return err } - // Insert still pending, perform the insert entry = inserts[i].entry - entriesToInsert = append(entriesToInsert, entry) + s.insertNewShardEntryWithLock(entry) } - s.insertNewShardEntriesWithLock(entriesToInsert) s.Unlock() if !anyPendingAction { @@ -1744,13 +1659,13 @@ func (s *dbShard) fetchActiveBlocksMetadata( s.forEachShardEntry(func(entry *Entry) bool { // Break out of the iteration loop once we've accumulated enough entries. if int64(len(res.Results())) >= limit { - next := int64(entry.Index) + next := int64(entry.indexInShard.Load()) nextIndexCursor = &next return false } // Fast forward past indexes lower than page token - if int64(entry.Index) < indexCursor { + if int64(entry.indexInShard.Load()) < indexCursor { return true } diff --git a/src/dbnode/storage/shard_fetch_blocks_metadata_test.go b/src/dbnode/storage/shard_fetch_blocks_metadata_test.go index 9680999034..5f35569411 100644 --- a/src/dbnode/storage/shard_fetch_blocks_metadata_test.go +++ b/src/dbnode/storage/shard_fetch_blocks_metadata_test.go @@ -73,6 +73,7 @@ func TestShardFetchBlocksMetadataV2WithSeriesCachePolicyCacheAll(t *testing.T) { } lastRead := xtime.Now().Add(-time.Minute) for i := int64(0); i < 10; i++ { + entryIndex := i + 1 id := ident.StringID(fmt.Sprintf("foo.%d", i)) tags := ident.NewTags( ident.StringTag("aaa", "bbb"), @@ -80,12 +81,12 @@ func TestShardFetchBlocksMetadataV2WithSeriesCachePolicyCacheAll(t *testing.T) { ) tagsIter := ident.NewTagsIterator(tags) series := addMockSeries(ctrl, shard, id, tags, uint64(i)) - if i == startCursor { + if entryIndex == startCursor { series.EXPECT(). FetchBlocksMetadata(gomock.Not(nil), start, end, seriesFetchOpts). Return(block.NewFetchBlocksMetadataResult(id, tagsIter, block.NewFetchBlockMetadataResults()), nil) - } else if i > startCursor && i <= startCursor+fetchLimit { + } else if entryIndex > startCursor && entryIndex <= startCursor+fetchLimit { ids = append(ids, id) blocks := block.NewFetchBlockMetadataResults() at := start.Add(time.Duration(i)) diff --git a/src/dbnode/storage/shard_insert_queue.go b/src/dbnode/storage/shard_insert_queue.go index 04ae94188a..f97d7ee9b5 100644 --- a/src/dbnode/storage/shard_insert_queue.go +++ b/src/dbnode/storage/shard_insert_queue.go @@ -22,7 +22,6 @@ package storage import ( "errors" - "sort" "strconv" "sync" "time" @@ -52,22 +51,6 @@ var ( errNewSeriesInsertRateLimitExceeded = errors.New("shard insert of new series exceeds rate limit") ) -type dbShardInsertByEntryIndex []dbShardInsert - -func (d dbShardInsertByEntryIndex) Len() int { - return len(d) -} - -func (d dbShardInsertByEntryIndex) Less(i, j int) bool { - if d[i].entry == nil || d[j].entry == nil { - return true - } - return d[i].entry.Index < d[j].entry.Index -} -func (d dbShardInsertByEntryIndex) Swap(i, j int) { - d[i], d[j] = d[j], d[i] -} - type dbShardInsertQueueState int const ( @@ -226,8 +209,6 @@ func (q *dbShardInsertQueue) insertLoop() { allInserts = append(allInserts, batchByCPUCore.inserts...) batchByCPUCore.Unlock() } - // sort the shard inserts by entry index - sort.Sort(dbShardInsertByEntryIndex(allInserts)) err := q.insertEntryBatchFn(allInserts) if err != nil { q.metrics.insertsBatchErrors.Inc(1) diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index e84cc8cdcf..74cc6a5e3c 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -64,8 +64,7 @@ type testIncreasingIndex struct { } func (i *testIncreasingIndex) nextIndex() uint64 { - created := atomic.AddUint64(&i.created, 1) - return created - 1 + return atomic.AddUint64(&i.created, 1) } func testDatabaseShard(t *testing.T, opts Options) *dbShard { @@ -924,6 +923,9 @@ func writeShardAndVerify( assert.Equal(t, id, seriesWrite.Series.ID.String()) assert.Equal(t, "testns1", seriesWrite.Series.Namespace.String()) assert.Equal(t, expectedIdx, seriesWrite.Series.UniqueIndex) + entry, err := shard.lookupEntryWithLock(ident.StringID(id)) + require.NoError(t, err) + assert.Equal(t, expectedIdx, entry.indexInShard.Load()) } func TestShardTick(t *testing.T) { @@ -1001,20 +1003,20 @@ func TestShardTick(t *testing.T) { setNow(nowFn().Add(t)) } - writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 1.0, true, 0) + writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 1.0, true, 1) // same time, different value should write - writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 2.0, true, 0) + writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 2.0, true, 1) - writeShardAndVerify(ctx, t, shard, "bar", nowFn(), 2.0, true, 1) + writeShardAndVerify(ctx, t, shard, "bar", nowFn(), 2.0, true, 2) // same tme, same value should not write - writeShardAndVerify(ctx, t, shard, "bar", nowFn(), 2.0, false, 1) + writeShardAndVerify(ctx, t, shard, "bar", nowFn(), 2.0, false, 2) - writeShardAndVerify(ctx, t, shard, "baz", nowFn(), 3.0, true, 2) + writeShardAndVerify(ctx, t, shard, "baz", nowFn(), 3.0, true, 3) // different time, same value should write - writeShardAndVerify(ctx, t, shard, "baz", nowFn().Add(1), 3.0, true, 2) + writeShardAndVerify(ctx, t, shard, "baz", nowFn().Add(1), 3.0, true, 3) // same time, same value should not write, regardless of being out of order - writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 2.0, false, 0) + writeShardAndVerify(ctx, t, shard, "foo", nowFn(), 2.0, false, 1) r, err := shard.Tick(context.NewNoOpCanncellable(), nowFn(), namespace.Context{}) require.NoError(t, err) From dc2a2fe3bd82fd1f46deec9b516837484e2060b6 Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Mon, 23 Sep 2024 12:31:38 +0530 Subject: [PATCH 6/6] Changed unit test to run multiple configurations --- ...on_fetch_bootstrap_blocks_metadata_test.go | 18 ++++--- src/dbnode/storage/entry.go | 2 +- src/dbnode/storage/shard.go | 14 +++--- .../shard_fetch_blocks_metadata_test.go | 47 ++++++++++++++----- 4 files changed, 56 insertions(+), 25 deletions(-) diff --git a/src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go b/src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go index 9aad10c6a1..d508946356 100644 --- a/src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go +++ b/src/dbnode/integration/admin_session_fetch_bootstrap_blocks_metadata_test.go @@ -21,8 +21,8 @@ func TestAdminSessionFetchBootstrapBlocksMetadataFromPeer(t *testing.T) { } numOfActiveSeries := 1000 - writeBatchSize := 100 - readBatchSize := 10 + writeBatchSize := 7 + readBatchSize := 13 testOpts := NewTestOptions(t).SetUseTChannelClientForWriting(true).SetNumShards(1).SetFetchSeriesBlocksBatchSize(readBatchSize) testSetup, err := NewTestSetup(t, testOpts, nil) @@ -49,7 +49,7 @@ func TestAdminSessionFetchBootstrapBlocksMetadataFromPeer(t *testing.T) { end := testSetup.NowFn()() // Fetch and verify metadata - observedSeries := getTestSetupBootstrapBlocksMetadata(t, testSetup, testNamespaces[0], start, end) + observedSeries := newTestSetupBootstrapBlocksMetadata(t, testSetup, testNamespaces[0], start, end) verifySeriesMetadata(t, numOfActiveSeries, observedSeries) } @@ -60,7 +60,11 @@ func writeTestData(t *testing.T, testSetup TestSetup, namespace ident.ID, start wg.Add(1) go func() { defer wg.Done() - for j := 0; j < batchSize; j++ { + size := batchSize + if numOfSeries-i < batchSize { + size = numOfSeries - i + } + for j := 0; j < size; j++ { id := fmt.Sprintf("foo_%d_%d", i, j) currInput := generate.BlockConfig{IDs: []string{id}, Start: start, NumPoints: 5} testData := generate.Block(currInput) @@ -71,7 +75,7 @@ func writeTestData(t *testing.T, testSetup TestSetup, namespace ident.ID, start wg.Wait() } -func getTestSetupBootstrapBlocksMetadata(t *testing.T, +func newTestSetupBootstrapBlocksMetadata(t *testing.T, testSetup TestSetup, namespace ident.ID, start xtime.UnixNano, @@ -83,10 +87,10 @@ func getTestSetupBootstrapBlocksMetadata(t *testing.T, require.NoError(t, err) // Setup only has one shard - return getTestsSetupSeriesMetadataMap(metadatasByShard[0]) + return newTestsSetupSeriesMetadataMap(metadatasByShard[0]) } -func getTestsSetupSeriesMetadataMap(metadatas []block.ReplicaMetadata) map[string]int { +func newTestsSetupSeriesMetadataMap(metadatas []block.ReplicaMetadata) map[string]int { seriesMap := make(map[string]int, 100000) for _, block := range metadatas { idString := block.ID.String() diff --git a/src/dbnode/storage/entry.go b/src/dbnode/storage/entry.go index 882cea3041..b67c3a29c7 100644 --- a/src/dbnode/storage/entry.go +++ b/src/dbnode/storage/entry.go @@ -133,7 +133,7 @@ type Entry struct { nowFn clock.NowFn metrics *EntryMetrics pendingIndexBatchSizeOne []writes.PendingIndexInsert - // Index assigned to entry while adding it to the shard + // indexInShard is assigned to entry while adding it to the shard. indexInShard xatomic.Uint64 } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 6547abd517..4636fa00d0 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -118,7 +118,6 @@ type dbShard struct { opts Options seriesOpts series.Options nowFn clock.NowFn - state dbShardState namespace namespace.Metadata seriesBlockRetriever series.QueryableBlockRetriever seriesOnRetrieveBlock block.OnRetrieveBlock @@ -127,11 +126,14 @@ type dbShard struct { seriesPool series.DatabaseSeriesPool reverseIndex NamespaceIndex insertQueue *dbShardInsertQueue - lookup *shardMap - list *list.List - // protected by dbShard lock - lastEntryIndex uint64 - bootstrapState BootstrapState + + // protected by dbShard lock. + lastEntryIndex uint64 + lookup *shardMap + list *list.List + state dbShardState + bootstrapState BootstrapState + newMergerFn fs.NewMergerFn newFSMergeWithMemFn newFSMergeWithMemFn filesetsFn filesetsFn diff --git a/src/dbnode/storage/shard_fetch_blocks_metadata_test.go b/src/dbnode/storage/shard_fetch_blocks_metadata_test.go index 5f35569411..1e507bea7c 100644 --- a/src/dbnode/storage/shard_fetch_blocks_metadata_test.go +++ b/src/dbnode/storage/shard_fetch_blocks_metadata_test.go @@ -301,25 +301,48 @@ func TestShardFetchBlocksMetadataV2WithSeriesCachePolicyNotCacheAll(t *testing.T } func TestShardFetchBlocksMetadata(t *testing.T) { - opts := DefaultTestOptions().SetSeriesCachePolicy(series.CacheRecentlyRead) - ctx := opts.ContextPool().Get() - defer ctx.Close() - numOfActiveSeries := 1000 - writeBatchSize := 100 - readBatchSize := 10 + tests := []struct { + name string + numOfActiveSeries int + WriteBatchSize int + readBatchSize int + }{ + {name: "Test-case 1", numOfActiveSeries: 1000, WriteBatchSize: 10, readBatchSize: 10}, + {name: "Test-case 2", numOfActiveSeries: 1000, WriteBatchSize: 7, readBatchSize: 19}, + {name: "Test-case 3", numOfActiveSeries: 4000, WriteBatchSize: 9, readBatchSize: 7}, + {name: "Test-case 4", numOfActiveSeries: 5000, WriteBatchSize: 121, readBatchSize: 151}, + {name: "Test-case 5", numOfActiveSeries: 30000, WriteBatchSize: 1021, readBatchSize: 4096}, + } - shard := testDatabaseShard(t, opts) - defer shard.Close() + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + opts := DefaultTestOptions().SetSeriesCachePolicy(series.CacheRecentlyRead) + shard := testDatabaseShard(t, opts) + defer shard.Close() + nowFn := opts.ClockOptions().NowFn() + start := nowFn() + writeTestData(t, tc.numOfActiveSeries, tc.WriteBatchSize, shard, opts) + end := nowFn() + verifyFetchedBlockMetadata(t, tc.numOfActiveSeries, tc.readBatchSize, shard, opts, start, end) + }) + } +} +func writeTestData(t *testing.T, numOfActiveSeries int, writeBatchSize int, shard *dbShard, opts Options) { + ctx := opts.ContextPool().Get() + defer ctx.Close() nowFn := opts.ClockOptions().NowFn() - start := nowFn() var wg sync.WaitGroup for i := 0; i < numOfActiveSeries; i += writeBatchSize { wg.Add(1) go func(i int) { defer wg.Done() - for j := 0; j < writeBatchSize; j++ { + size := writeBatchSize + if numOfActiveSeries-i < writeBatchSize { + size = numOfActiveSeries - i + } + for j := 0; j < size; j++ { id := fmt.Sprintf("foo=%d_%d", i, j) _, err := shard.Write(ctx, ident.StringID(id), xtime.ToUnixNano(nowFn()), 1.0, xtime.Second, nil, series.WriteOptions{}) @@ -328,8 +351,10 @@ func TestShardFetchBlocksMetadata(t *testing.T) { }(i) } wg.Wait() +} - end := nowFn() +func verifyFetchedBlockMetadata(t *testing.T, numOfActiveSeries int, readBatchSize int, shard *dbShard, opts Options, start, end time.Time) { + ctx := opts.ContextPool().Get() var ( encodedPageToken PageToken err error