Skip to content

Commit

Permalink
enhance: [2.5] Return deltadata for DeleteCodec.Deserialize (#37214) (
Browse files Browse the repository at this point in the history
#37264)

Cherry pick from master
pr: #37214
Related to #35303 #30404

This PR change return type of `DeleteCodec.Deserialize` from
`storage.DeleteData` to `DeltaData`, which
reduces the memory usage of interface header.

Also refine `storage.DeltaData` methods to make it easier to usage.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Oct 30, 2024
1 parent bf8693d commit ee6ce8c
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 58 deletions.
3 changes: 2 additions & 1 deletion internal/datanode/importv2/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/stretchr/testify/assert"
)

func TestResizePools(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions internal/datanode/iterators/deltalog_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type DeltalogIterator struct {
disposedOnce sync.Once
disposed atomic.Bool

data *storage.DeleteData
data *storage.DeltaData
blobs []*storage.Blob
label *Label
pos int
Expand Down Expand Up @@ -50,8 +50,8 @@ func (d *DeltalogIterator) Next() (*LabeledRowData, error) {
}

row := &DeltalogRow{
Pk: d.data.Pks[d.pos],
Timestamp: d.data.Tss[d.pos],
Pk: d.data.DeletePks().Get(d.pos),
Timestamp: d.data.DeleteTimestamps()[d.pos],
}
d.pos++

Expand All @@ -76,7 +76,7 @@ func (d *DeltalogIterator) hasNext() bool {
d.data = dData
d.blobs = nil
}
return int64(d.pos) < d.data.RowCount
return int64(d.pos) < d.data.DeleteRowCount()
}

func (d *DeltalogIterator) isDisposed() bool {
Expand Down
33 changes: 15 additions & 18 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,18 +1509,15 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
func (s *DelegatorDataSuite) TestLevel0Deletions() {
delegator := s.delegator
partitionID := int64(10)
partitionDelPks := storage.NewInt64PrimaryKeys(1)
partitionDelPks.AppendRaw(1)
allPartitionDelPks := storage.NewInt64PrimaryKeys(1)
allPartitionDelPks.AppendRaw(2)
partitionDeleteData := &storage.DeltaData{
DeletePks: partitionDelPks,
DeleteTimestamps: []storage.Timestamp{100},
}
allPartitionDeleteData := &storage.DeltaData{
DeletePks: allPartitionDelPks,
DeleteTimestamps: []storage.Timestamp{101},
}
partitionDeleteData, err := storage.NewDeltaDataWithPkType(1, schemapb.DataType_Int64)
s.Require().NoError(err)
err = partitionDeleteData.Append(storage.NewInt64PrimaryKey(1), 100)
s.Require().NoError(err)

allPartitionDeleteData, err := storage.NewDeltaDataWithPkType(1, schemapb.DataType_Int64)
s.Require().NoError(err)
err = allPartitionDeleteData.Append(storage.NewInt64PrimaryKey(2), 101)
s.Require().NoError(err)

schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
Expand Down Expand Up @@ -1549,29 +1546,29 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)

pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(partitionDeleteData.DeletePks().Get(0)))

pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)

delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks.Get(0), allPartitionDeleteData.DeletePks.Get(0)})
s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)})

bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks.Get(0)})
bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks().Get(0)})

pks, _ = delegator.GetLevel0Deletions(partitionID, bfs)
// bf filtered segment
s.Equal(len(pks), 1)
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))

delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))

pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks.Get(0)))
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))

delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,8 @@ func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fie
}

func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error {
pks, tss := deltaData.DeletePks, deltaData.DeleteTimestamps
rowNum := deltaData.DelRowCount
pks, tss := deltaData.DeletePks(), deltaData.DeleteTimestamps()
rowNum := deltaData.DeleteRowCount()

if !s.ptrLock.RLockIf(state.IsNotReleased) {
return merr.WrapErrSegmentNotLoaded(s.ID(), "segment released")
Expand Down
6 changes: 3 additions & 3 deletions internal/querynodev2/segments/segment_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ func (s *L0Segment) LoadDeltaData(ctx context.Context, deltaData *storage.DeltaD
s.dataGuard.Lock()
defer s.dataGuard.Unlock()

for i := 0; i < deltaData.DeletePks.Len(); i++ {
s.pks = append(s.pks, deltaData.DeletePks.Get(i))
for i := 0; i < int(deltaData.DeleteRowCount()); i++ {
s.pks = append(s.pks, deltaData.DeletePks().Get(i))
}
s.tss = append(s.tss, deltaData.DeleteTimestamps...)
s.tss = append(s.tss, deltaData.DeleteTimestamps()...)
return nil
}

Expand Down
24 changes: 8 additions & 16 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1208,22 +1208,13 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
return blob.RowNum
})

var deltaData *storage.DeltaData
collection := loader.manager.Collection.Get(segment.Collection())

helper, _ := typeutil.CreateSchemaHelper(collection.Schema())
pkField, _ := helper.GetPrimaryKeyField()
switch pkField.DataType {
case schemapb.DataType_Int64:
deltaData = &storage.DeltaData{
DeletePks: storage.NewInt64PrimaryKeys(int(rowNums)),
DeleteTimestamps: make([]uint64, 0, rowNums),
}
case schemapb.DataType_VarChar:
deltaData = &storage.DeltaData{
DeletePks: storage.NewVarcharPrimaryKeys(int(rowNums)),
DeleteTimestamps: make([]uint64, 0, rowNums),
}
deltaData, err := storage.NewDeltaDataWithPkType(rowNums, pkField.DataType)
if err != nil {
return err
}

reader, err := storage.CreateDeltalogReader(blobs)
Expand All @@ -1240,17 +1231,18 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
return err
}
dl := reader.Value()
deltaData.DeletePks.MustAppend(dl.Pk)
deltaData.DeleteTimestamps = append(deltaData.DeleteTimestamps, dl.Ts)
deltaData.DelRowCount++
err = deltaData.Append(dl.Pk, dl.Ts)
if err != nil {
return err
}
}

err = segment.LoadDeltaData(ctx, deltaData)
if err != nil {
return err
}

log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.DelRowCount))
log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.DeleteRowCount()))
return nil
}

Expand Down
15 changes: 12 additions & 3 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"math"
"sort"

"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -761,13 +763,17 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
}

// Deserialize deserializes the deltalog blobs into DeleteData
func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeltaData, err error) {
if len(blobs) == 0 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
}

rowNum := lo.SumBy(blobs, func(blob *Blob) int64 {
return blob.RowNum
})

var pid, sid UniqueID
result := &DeleteData{}
result := NewDeltaData(rowNum)

deserializeBlob := func(blob *Blob) error {
binlogReader, err := NewBinlogReader(blob.Value)
Expand Down Expand Up @@ -801,7 +807,10 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return err
}
result.Append(deleteLog.Pk, deleteLog.Ts)
err = result.Append(deleteLog.Pk, deleteLog.Ts)
if err != nil {
return err
}
}
return nil
}
Expand Down
14 changes: 10 additions & 4 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,11 @@ func TestDeleteCodec(t *testing.T) {

pid, sid, data, err := deleteCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
intPks, ok := data.DeletePks().(*Int64PrimaryKeys)
require.True(t, ok)
assert.Equal(t, pid, int64(1))
assert.Equal(t, sid, int64(1))
assert.Equal(t, data, deleteData)
assert.Equal(t, []int64{1, 2}, intPks.values)
})

t.Run("string pk", func(t *testing.T) {
Expand All @@ -591,9 +593,11 @@ func TestDeleteCodec(t *testing.T) {

pid, sid, data, err := deleteCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
strPks, ok := data.DeletePks().(*VarcharPrimaryKeys)
require.True(t, ok)
assert.Equal(t, pid, int64(1))
assert.Equal(t, sid, int64(1))
assert.Equal(t, data, deleteData)
assert.Equal(t, []string{"test1", "test2"}, strPks.values)
})
}

Expand Down Expand Up @@ -633,8 +637,10 @@ func TestUpgradeDeleteLog(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(1), parID)
assert.Equal(t, int64(1), segID)
assert.ElementsMatch(t, dData.Pks, deleteData.Pks)
assert.ElementsMatch(t, dData.Tss, deleteData.Tss)
intPks, ok := deleteData.DeletePks().(*Int64PrimaryKeys)
require.True(t, ok)
assert.ElementsMatch(t, []int64{1, 2}, intPks.values)
assert.ElementsMatch(t, dData.Tss, deleteData.DeleteTimestamps())
})

t.Run("with split lenth error", func(t *testing.T) {
Expand Down
82 changes: 77 additions & 5 deletions internal/storage/delta_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"fmt"
"strconv"
"strings"
"sync"

"github.com/samber/lo"
"github.com/valyala/fastjson"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/merr"
)

// parserPool use object pooling to reduce fastjson.Parser allocation.
Expand All @@ -34,14 +36,84 @@ var parserPool = &fastjson.ParserPool{}
// DeltaData stores delta data
// currently only delete tuples are stored
type DeltaData struct {
PkType schemapb.DataType
pkType schemapb.DataType
// delete tuples
DeletePks PrimaryKeys
DeleteTimestamps []Timestamp
deletePks PrimaryKeys
deleteTimestamps []Timestamp

// stats
DelRowCount int64
MemSize int64
delRowCount int64

initCap int64
typeInitOnce sync.Once
}

func (dd *DeltaData) initPkType(pkType schemapb.DataType) error {
var err error
dd.typeInitOnce.Do(func() {
switch pkType {
case schemapb.DataType_Int64:
dd.deletePks = NewInt64PrimaryKeys(dd.initCap)
case schemapb.DataType_VarChar:
dd.deletePks = NewVarcharPrimaryKeys(dd.initCap)
default:
err = merr.WrapErrServiceInternal("unsupported pk type", pkType.String())
}
dd.pkType = pkType
})
return err
}

func (dd *DeltaData) PkType() schemapb.DataType {
return dd.pkType
}

func (dd *DeltaData) DeletePks() PrimaryKeys {
return dd.deletePks
}

func (dd *DeltaData) DeleteTimestamps() []Timestamp {
return dd.deleteTimestamps
}

func (dd *DeltaData) Append(pk PrimaryKey, ts Timestamp) error {
dd.initPkType(pk.Type())
err := dd.deletePks.Append(pk)
if err != nil {
return err
}
dd.deleteTimestamps = append(dd.deleteTimestamps, ts)
dd.delRowCount++
return nil
}

func (dd *DeltaData) DeleteRowCount() int64 {
return dd.delRowCount
}

func (dd *DeltaData) MemSize() int64 {
var result int64
if dd.deletePks != nil {
result += dd.deletePks.Size()
}
result += int64(len(dd.deleteTimestamps) * 8)
return result
}

func NewDeltaData(cap int64) *DeltaData {
return &DeltaData{
deleteTimestamps: make([]Timestamp, 0, cap),
initCap: cap,
}
}

func NewDeltaDataWithPkType(cap int64, pkType schemapb.DataType) (*DeltaData, error) {
result := NewDeltaData(cap)
err := result.initPkType(pkType)
if err != nil {
return nil, err
}
return result, nil
}

type DeleteLog struct {
Expand Down
Loading

0 comments on commit ee6ce8c

Please sign in to comment.