diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 8a5d688c893b0..0ee390b8a7ebb 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -251,7 +251,7 @@ message LoadPartitionsRequest { bool refresh = 8; // resource group names repeated string resource_groups = 9; - repeated index.IndexInfo index_info_list = 10; + repeated index.IndexInfo index_info_list = 10; // deprecated repeated int64 load_fields = 11; } diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index f7c75da2fc33a..641bd65450809 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -194,7 +194,7 @@ func (job *LoadCollectionJob) Execute() error { } // 3. loadPartitions on QueryNodes - err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...) + err = loadPartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), lackPartitionIDs...) if err != nil { return err } @@ -399,11 +399,9 @@ func (job *LoadPartitionJob) Execute() error { job.undo.IsReplicaCreated = true } - // 3. loadPartitions on QueryNodes - err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...) - if err != nil { - return err - } + // 3. For the load collection job, the list of partitions in the QueryNode + // is passed during the initialization of the collection object, + // so there is no need to load partitions at this point. // 4. put collection/partitions meta partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition { diff --git a/internal/querycoordv2/job/job_sync.go b/internal/querycoordv2/job/job_sync.go index 72a25b9a67e97..1b22cc68287dc 100644 --- a/internal/querycoordv2/job/job_sync.go +++ b/internal/querycoordv2/job/job_sync.go @@ -75,7 +75,7 @@ func (job *SyncNewCreatedPartitionJob) Execute() error { return nil } - err := loadPartitions(job.ctx, job.meta, job.cluster, job.broker, false, req.GetCollectionID(), req.GetPartitionID()) + err := loadPartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), req.GetPartitionID()) if err != nil { return err } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 0fc786dbbc763..3da96babff3e3 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -133,8 +133,6 @@ func (suite *JobSuite) SetupSuite() { suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything). Return(nil, nil) - suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything). - Return(nil, nil) suite.cluster = session.NewMockCluster(suite.T()) suite.cluster.EXPECT(). @@ -1385,109 +1383,6 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() { } } -func (suite *JobSuite) TestCallLoadPartitionFailed() { - // call LoadPartitions failed at get index info - getIndexErr := fmt.Errorf("mock get index error") - suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "ListIndexes" - }) - for _, collection := range suite.collections { - suite.broker.EXPECT().ListIndexes(mock.Anything, collection).Return(nil, getIndexErr) - loadCollectionReq := &querypb.LoadCollectionRequest{ - CollectionID: collection, - } - loadCollectionJob := NewLoadCollectionJob( - context.Background(), - loadCollectionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadCollectionJob) - err := loadCollectionJob.Wait() - suite.T().Logf("%s", err) - suite.ErrorIs(err, getIndexErr) - - loadPartitionReq := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - } - loadPartitionJob := NewLoadPartitionJob( - context.Background(), - loadPartitionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadPartitionJob) - err = loadPartitionJob.Wait() - suite.ErrorIs(err, getIndexErr) - } - - // call LoadPartitions failed at get schema - getSchemaErr := fmt.Errorf("mock get schema error") - suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "DescribeCollection" - }) - for _, collection := range suite.collections { - suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(nil, getSchemaErr) - loadCollectionReq := &querypb.LoadCollectionRequest{ - CollectionID: collection, - } - loadCollectionJob := NewLoadCollectionJob( - context.Background(), - loadCollectionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadCollectionJob) - err := loadCollectionJob.Wait() - suite.ErrorIs(err, getSchemaErr) - - loadPartitionReq := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - } - loadPartitionJob := NewLoadPartitionJob( - context.Background(), - loadPartitionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadPartitionJob) - err = loadPartitionJob.Wait() - suite.ErrorIs(err, getSchemaErr) - } - - suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "ListIndexes" && call.Method != "DescribeCollection" - }) - suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil) - suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil) -} - func (suite *JobSuite) TestCallReleasePartitionFailed() { ctx := context.Background() suite.loadAll() diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index 7f56794144480..bddf0ba119bb1 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -20,11 +20,12 @@ import ( "context" "time" + "go.opentelemetry.io/otel" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" @@ -71,41 +72,25 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c func loadPartitions(ctx context.Context, meta *meta.Meta, cluster session.Cluster, - broker meta.Broker, - withSchema bool, collection int64, partitions ...int64, ) error { - var err error - var schema *schemapb.CollectionSchema - if withSchema { - collectionInfo, err := broker.DescribeCollection(ctx, collection) - if err != nil { - return err - } - schema = collectionInfo.GetSchema() - } - indexes, err := broker.ListIndexes(ctx, collection) - if err != nil { - return err - } + _, span := otel.Tracer(typeutil.QueryCoordRole).Start(ctx, "loadPartitions") + defer span.End() + start := time.Now() replicas := meta.ReplicaManager.GetByCollection(collection) loadReq := &querypb.LoadPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadPartitions, }, - CollectionID: collection, - PartitionIDs: partitions, - Schema: schema, - IndexInfoList: indexes, + CollectionID: collection, + PartitionIDs: partitions, } for _, replica := range replicas { for _, node := range replica.GetNodes() { status, err := cluster.LoadPartitions(ctx, node, loadReq) - // There is no need to rollback LoadPartitions as the load job will fail - // and the Delegator will not be created, - // resulting in search and query requests failing due to the absence of Delegator. + // TODO: rollback LoadPartitions if failed if err != nil { return err } @@ -114,6 +99,9 @@ func loadPartitions(ctx context.Context, } } } + + log.Ctx(ctx).Info("load partitions done", zap.Int64("collectionID", collection), + zap.Int64s("partitionIDs", partitions), zap.Duration("dur", time.Since(start))) return nil }