Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Reduce GetIndexInfos calls #37695

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions internal/datacoord/index_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,26 +623,24 @@ func (m *indexMeta) IsUnIndexedSegment(collectionID UniqueID, segID UniqueID) bo
return false
}

func (m *indexMeta) getSegmentIndexes(segID UniqueID) map[UniqueID]*model.SegmentIndex {
func (m *indexMeta) GetSegmentsIndexes(collectionID UniqueID, segIDs []UniqueID) map[int64]map[UniqueID]*model.SegmentIndex {
m.RLock()
defer m.RUnlock()

ret := make(map[UniqueID]*model.SegmentIndex, 0)
segIndexInfos, ok := m.segmentIndexes[segID]
if !ok || len(segIndexInfos) == 0 {
return ret
segmentsIndexes := make(map[int64]map[UniqueID]*model.SegmentIndex)
for _, segmentID := range segIDs {
segmentsIndexes[segmentID] = m.getSegmentIndexes(collectionID, segmentID)
}

for _, segIdx := range segIndexInfos {
ret[segIdx.IndexID] = model.CloneSegmentIndex(segIdx)
}
return ret
return segmentsIndexes
}

func (m *indexMeta) GetSegmentIndexes(collectionID UniqueID, segID UniqueID) map[UniqueID]*model.SegmentIndex {
m.RLock()
defer m.RUnlock()
return m.getSegmentIndexes(collectionID, segID)
}

// Note: thread-unsafe, don't call it outside indexMeta
func (m *indexMeta) getSegmentIndexes(collectionID UniqueID, segID UniqueID) map[UniqueID]*model.SegmentIndex {
ret := make(map[UniqueID]*model.SegmentIndex, 0)
segIndexInfos, ok := m.segmentIndexes[segID]
if !ok || len(segIndexInfos) == 0 {
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/index_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,12 +870,12 @@ func TestMeta_GetSegmentIndexes(t *testing.T) {
m := createMeta(catalog, withIndexMeta(createIndexMeta(catalog)))

t.Run("success", func(t *testing.T) {
segIndexes := m.indexMeta.getSegmentIndexes(segID)
segIndexes := m.indexMeta.GetSegmentIndexes(collID, segID)
assert.Equal(t, 1, len(segIndexes))
})

t.Run("segment not exist", func(t *testing.T) {
segIndexes := m.indexMeta.getSegmentIndexes(segID + 100)
segIndexes := m.indexMeta.GetSegmentIndexes(collID, segID+100)
assert.Equal(t, 0, len(segIndexes))
})

Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/index_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,9 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
SegmentInfo: map[int64]*indexpb.SegmentInfo{},
}

segmentsIndexes := s.meta.indexMeta.GetSegmentsIndexes(req.GetCollectionID(), req.GetSegmentIDs())
for _, segID := range req.GetSegmentIDs() {
segIdxes := s.meta.indexMeta.GetSegmentIndexes(req.GetCollectionID(), segID)
segIdxes := segmentsIndexes[segID]
ret.SegmentInfo[segID] = &indexpb.SegmentInfo{
CollectionID: req.GetCollectionID(),
SegmentID: segID,
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *taskScheduler) Stop() {
func (s *taskScheduler) reloadFromMeta() {
segments := s.meta.GetAllSegmentsUnsafe()
for _, segment := range segments {
for _, segIndex := range s.meta.indexMeta.getSegmentIndexes(segment.ID) {
for _, segIndex := range s.meta.indexMeta.GetSegmentIndexes(segment.GetCollectionID(), segment.ID) {
if segIndex.IsDeleted {
continue
}
Expand Down
23 changes: 14 additions & 9 deletions internal/querycoordv2/checkers/index_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const MaxSegmentNumPerGetIndexInfoRPC = 1024

var _ Checker = (*IndexChecker)(nil)

// IndexChecker perform segment index check.
Expand Down Expand Up @@ -132,18 +134,21 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec
}

segmentsToUpdate := typeutil.NewSet[int64]()
for segment, fields := range targets {
missingFields := typeutil.NewSet(fields...)
infos, err := c.broker.GetIndexInfo(ctx, collection.GetCollectionID(), segment)
for _, segmentIDs := range lo.Chunk(lo.Keys(idSegments), MaxSegmentNumPerGetIndexInfoRPC) {
segmentIndexInfos, err := c.broker.GetIndexInfo(ctx, collection.GetCollectionID(), segmentIDs...)
if err != nil {
log.Warn("failed to get indexInfo for segment", zap.Int64("segmentID", segment), zap.Error(err))
log.Warn("failed to get indexInfo for segments", zap.Int64s("segmentIDs", segmentIDs), zap.Error(err))
continue
}
for _, info := range infos {
if missingFields.Contain(info.GetFieldID()) &&
info.GetEnableIndex() &&
len(info.GetIndexFilePaths()) > 0 {
segmentsToUpdate.Insert(segment)
for segmentID, segmentIndexInfo := range segmentIndexInfos {
fields := targets[segmentID]
missingFields := typeutil.NewSet(fields...)
for _, fieldIndexInfo := range segmentIndexInfo {
if missingFields.Contain(fieldIndexInfo.GetFieldID()) &&
fieldIndexInfo.GetEnableIndex() &&
len(fieldIndexInfo.GetIndexFilePaths()) > 0 {
segmentsToUpdate.Insert(segmentID)
}
}
}
}
Expand Down
56 changes: 27 additions & 29 deletions internal/querycoordv2/checkers/index_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ func (suite *IndexCheckerSuite) TestLoadIndex() {

// broker
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(2)).
Return([]*querypb.FieldIndexInfo{
Return(map[int64][]*querypb.FieldIndexInfo{2: {
{
FieldID: 101,
IndexID: 1000,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
}, nil)
}}, nil)

suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
{
Expand Down Expand Up @@ -180,28 +180,28 @@ func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() {
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 3, 1, 1, "test-insert-channel"))

// broker
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).Call.
Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo {
if segmentID == 2 {
return []*querypb.FieldIndexInfo{
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).
RunAndReturn(func(ctx context.Context, collectionID int64, segmentIDs ...int64) (map[int64][]*querypb.FieldIndexInfo, error) {
if segmentIDs[0] == 2 {
return map[int64][]*querypb.FieldIndexInfo{2: {
{
FieldID: 101,
IndexID: 1000,
EnableIndex: false,
},
}
}}, nil
}
if segmentID == 3 {
return []*querypb.FieldIndexInfo{
if segmentIDs[0] == 3 {
return map[int64][]*querypb.FieldIndexInfo{3: {
{
FieldID: 101,
IndexID: 1002,
EnableIndex: false,
},
}
}}, nil
}
return nil
}, nil)
return nil, nil
})

suite.broker.EXPECT().ListIndexes(mock.Anything, int64(1)).Return([]*indexpb.IndexInfo{
{
Expand Down Expand Up @@ -298,23 +298,21 @@ func (suite *IndexCheckerSuite) TestCreateNewIndex() {
}, nil
},
)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, mock.Anything, mock.AnythingOfType("int64")).Call.
Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo {
return []*querypb.FieldIndexInfo{
{
FieldID: 101,
IndexID: 1000,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
{
FieldID: 102,
IndexID: 1001,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
}
}, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, mock.Anything, mock.AnythingOfType("int64")).
Return(map[int64][]*querypb.FieldIndexInfo{2: {
{
FieldID: 101,
IndexID: 1000,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
{
FieldID: 102,
IndexID: 1001,
EnableIndex: true,
IndexFilePaths: []string{"index"},
},
}}, nil)

tasks := checker.Check(context.Background())
suite.Len(tasks, 1)
Expand Down
50 changes: 24 additions & 26 deletions internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
GetRecoveryInfo(ctx context.Context, collectionID UniqueID, partitionID UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentBinlogs, error)
ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error)
GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) ([]*datapb.SegmentInfo, error)
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error)
GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error)
GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error)
DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error)
GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error)
Expand Down Expand Up @@ -306,13 +306,13 @@
return ret, nil
}

func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentID UniqueID) ([]*querypb.FieldIndexInfo, error) {
func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error) {
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()

log := log.Ctx(ctx).With(
zap.Int64("collectionID", collectionID),
zap.Int64("segmentID", segmentID),
zap.Int64s("segmentIDs", segmentIDs),
)

// during rolling upgrade, query coord may connect to datacoord with version 2.2, which will return merr.ErrServiceUnimplemented
Expand All @@ -322,7 +322,7 @@
retry.Do(ctx, func() error {
resp, err = broker.dataCoord.GetIndexInfos(ctx, &indexpb.GetIndexInfoRequest{
CollectionID: collectionID,
SegmentIDs: []int64{segmentID},
SegmentIDs: segmentIDs,
})

if errors.Is(err, merr.ErrServiceUnimplemented) {
Expand All @@ -337,32 +337,30 @@
}

if resp.GetSegmentInfo() == nil {
err = merr.WrapErrIndexNotFoundForSegment(segmentID)
log.Warn("failed to get segment index info",
err = merr.WrapErrIndexNotFoundForSegments(segmentIDs)
log.Warn("failed to get segments index info",

Check warning on line 341 in internal/querycoordv2/meta/coordinator_broker.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/coordinator_broker.go#L340-L341

Added lines #L340 - L341 were not covered by tests
zap.Error(err))
return nil, err
}

segmentInfo, ok := resp.GetSegmentInfo()[segmentID]
if !ok || len(segmentInfo.GetIndexInfos()) == 0 {
return nil, merr.WrapErrIndexNotFoundForSegment(segmentID)
}

indexes := make([]*querypb.FieldIndexInfo, 0)
for _, info := range segmentInfo.GetIndexInfos() {
indexes = append(indexes, &querypb.FieldIndexInfo{
FieldID: info.GetFieldID(),
EnableIndex: true, // deprecated, but keep it for compatibility
IndexName: info.GetIndexName(),
IndexID: info.GetIndexID(),
BuildID: info.GetBuildID(),
IndexParams: info.GetIndexParams(),
IndexFilePaths: info.GetIndexFilePaths(),
IndexSize: int64(info.GetSerializedSize()),
IndexVersion: info.GetIndexVersion(),
NumRows: info.GetNumRows(),
CurrentIndexVersion: info.GetCurrentIndexVersion(),
})
indexes := make(map[int64][]*querypb.FieldIndexInfo, 0)
for segmentID, segmentInfo := range resp.GetSegmentInfo() {
indexes[segmentID] = make([]*querypb.FieldIndexInfo, 0)
for _, info := range segmentInfo.GetIndexInfos() {
indexes[segmentID] = append(indexes[segmentID], &querypb.FieldIndexInfo{
FieldID: info.GetFieldID(),
EnableIndex: true, // deprecated, but keep it for compatibility
IndexName: info.GetIndexName(),
IndexID: info.GetIndexID(),
BuildID: info.GetBuildID(),
IndexParams: info.GetIndexParams(),
IndexFilePaths: info.GetIndexFilePaths(),
IndexSize: int64(info.GetSerializedSize()),
IndexVersion: info.GetIndexVersion(),
NumRows: info.GetNumRows(),
CurrentIndexVersion: info.GetCurrentIndexVersion(),
})
}
}

return indexes, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/meta/coordinator_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (s *CoordinatorBrokerDataCoordSuite) TestGetIndexInfo() {

infos, err := s.broker.GetIndexInfo(ctx, collectionID, segmentID)
s.NoError(err)
s.ElementsMatch(indexIDs, lo.Map(infos, func(info *querypb.FieldIndexInfo, _ int) int64 {
s.ElementsMatch(indexIDs, lo.Map(infos[segmentID], func(info *querypb.FieldIndexInfo, _ int) int64 {
return info.GetIndexID()
}))
s.resetMock()
Expand Down
50 changes: 32 additions & 18 deletions internal/querycoordv2/meta/mock_broker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading