Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Fix the insertion of entries in shard entry list to maintain strictly increasing order of unique index #4293

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 103 additions & 2 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,12 +1409,111 @@ func (s *dbShard) insertSeriesSync(
return newEntry, nil
}

func (s *dbShard) insertNewShardEntriesWithLock(entries []*Entry) {
RiyaTyagi marked this conversation as resolved.
Show resolved Hide resolved
if len(entries) == 0 {
return
}

// Fast Path: Check if the entire slice can be appended to the end of the list
if s.canAppendToEnd(entries[0]) {
s.appendEntriesToEnd(entries)
return
}

// If not, proceed with the standard insertion logic
elem := s.list.Back()
i := len(entries) - 1
RiyaTyagi marked this conversation as resolved.
Show resolved Hide resolved

for elem != nil && i >= 0 {
currListEntry := elem.Value.(*Entry)

insertIdx := s.findInsertionIndex(entries, currListEntry.Index, 0, i)

if insertIdx < len(entries) {
i = s.insertEntriesAfter(elem, entries, insertIdx, i)
}

elem = elem.Prev()
}

s.insertRemainingEntriesAtFront(entries, i)
RiyaTyagi marked this conversation as resolved.
Show resolved Hide resolved
}

// Helper function to check if we can append the entire slice to the end of the list
RiyaTyagi marked this conversation as resolved.
Show resolved Hide resolved
func (s *dbShard) canAppendToEnd(firstEntry *Entry) bool {
RiyaTyagi marked this conversation as resolved.
Show resolved Hide resolved
lastListElem := s.list.Back()
if lastListElem == nil {
return false
}
lastListEntry := lastListElem.Value.(*Entry)
return firstEntry.Index > lastListEntry.Index
}

// Helper function to append all entries to the end of the list
func (s *dbShard) appendEntriesToEnd(entries []*Entry) {
for _, entry := range entries {
s.insertNewShardEntryWithLock(entry)
}
}

// Helper function to find the correct insertion index using binary search
func (s *dbShard) findInsertionIndex(entries []*Entry, targetIndex uint64, start, end int) int {
for start <= end {
mid := (start + end) / 2
if entries[mid].Index > targetIndex {
end = mid - 1
} else {
start = mid + 1
}
}
return start
}

// Helper function to insert entries after a given entry with indexes [start,end]
func (s *dbShard) insertEntriesAfter(elem *list.Element, entries []*Entry, start, end int) int {
currElem := elem
for j := start; j <= end; j++ {
entry := entries[j]
currElem = s.list.InsertAfter(entry, currElem)
s.insertInLookupMapWithLock(entry.Series.ID(), currElem)
}
return start - 1
}

// Helper function to insert any remaining entries at the front of the list
func (s *dbShard) insertRemainingEntriesAtFront(entries []*Entry, i int) {
for i >= 0 {
entry := entries[i]
elem := s.list.PushFront(entry)
s.insertInLookupMapWithLock(entry.Series.ID(), elem)
i--
}
}

func (s *dbShard) insertInLookupMapWithLock(id ident.ID, element *list.Element) {
s.lookup.SetUnsafe(id, element, shardMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
})
element.Value.(*Entry).SetInsertTime(s.nowFn())
}

func (s *dbShard) insertNewShardEntryWithLock(entry *Entry) {
// Set the lookup value, we use the copied ID and since it is GC'd
// we explicitly set it with options to not copy the key and not to
// finalize it.
copiedID := entry.Series.ID()
listElem := s.list.PushBack(entry)
listElem := s.list.Back()
RiyaTyagi marked this conversation as resolved.
Show resolved Hide resolved
if listElem == nil || listElem.Value.(*Entry).Index < entry.Index {
listElem = s.list.PushBack(entry)
} else if elem := s.list.Front(); elem == nil || elem.Value.(*Entry).Index > entry.Index {
listElem = s.list.PushFront(entry)
} else {
for listElem != nil && listElem.Value.(*Entry).Index > entry.Index {
listElem = listElem.Prev()
}
listElem = s.list.InsertAfter(entry, listElem)
}
s.lookup.SetUnsafe(copiedID, listElem, shardMapSetUnsafeOptions{
NoCopyKey: true,
NoFinalizeKey: true,
Expand All @@ -1426,6 +1525,7 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error {
var (
anyPendingAction = false
numPendingIndexing = 0
entriesToInsert []*Entry
)

s.Lock()
Expand Down Expand Up @@ -1484,8 +1584,9 @@ func (s *dbShard) insertSeriesBatch(inserts []dbShardInsert) error {

// Insert still pending, perform the insert
entry = inserts[i].entry
s.insertNewShardEntryWithLock(entry)
entriesToInsert = append(entriesToInsert, entry)
}
s.insertNewShardEntriesWithLock(entriesToInsert)
s.Unlock()

if !anyPendingAction {
Expand Down
20 changes: 19 additions & 1 deletion src/dbnode/storage/shard_insert_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package storage

import (
"errors"
"sort"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -51,6 +52,22 @@ var (
errNewSeriesInsertRateLimitExceeded = errors.New("shard insert of new series exceeds rate limit")
)

type dbShardInsertByEntryIndex []dbShardInsert

func (d dbShardInsertByEntryIndex) Len() int {
return len(d)
}

func (d dbShardInsertByEntryIndex) Less(i, j int) bool {
if d[i].entry == nil || d[j].entry == nil {
return true
}
return d[i].entry.Index < d[j].entry.Index
}
func (d dbShardInsertByEntryIndex) Swap(i, j int) {
d[i], d[j] = d[j], d[i]
}

type dbShardInsertQueueState int

const (
Expand Down Expand Up @@ -209,7 +226,8 @@ func (q *dbShardInsertQueue) insertLoop() {
allInserts = append(allInserts, batchByCPUCore.inserts...)
batchByCPUCore.Unlock()
}

// sort the shard inserts by entry index
sort.Sort(dbShardInsertByEntryIndex(allInserts))
err := q.insertEntryBatchFn(allInserts)
if err != nil {
q.metrics.insertsBatchErrors.Inc(1)
Expand Down