diff --git a/go/vt/vtctl/workflow/lookup_vindex.go b/go/vt/vtctl/workflow/lookup_vindex.go index 1a6e1978421..d2194d75c7d 100644 --- a/go/vt/vtctl/workflow/lookup_vindex.go +++ b/go/vt/vtctl/workflow/lookup_vindex.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -542,9 +543,25 @@ func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (stri return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines) } +// validateExternalizedVindex checks if a given vindex is externalized. +// A vindex is considered externalized if it has an owner and is not in write-only mode. +func (lv *lookupVindex) validateExternalizedVindex(vindex *vschemapb.Vindex) error { + writeOnly, ok := vindex.Params["write_only"] + if ok && writeOnly == "true" { + return fmt.Errorf("vindex is in write-only mode") + } + if vindex.Owner == "" { + return fmt.Errorf("vindex has no owner") + } + return nil +} + +// validateExternalized checks if the vindex has been externalized +// and verifies the state of the VReplication workflow on the target shards. +// It ensures that all streams in the workflow are frozen. func (lv *lookupVindex) validateExternalized(ctx context.Context, vindex *vschemapb.Vindex, name string, targetShards []*topo.ShardInfo) error { - if _, ok := vindex.Params["write_only"]; ok { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "write_only param found in vindex %s", name) + if err := lv.validateExternalizedVindex(vindex); err != nil { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "vindex %s has not been externalized yet: %v", name, err) } err := forAllShards(targetShards, func(targetShard *topo.ShardInfo) error { @@ -559,19 +576,12 @@ func (lv *lookupVindex) validateExternalized(ctx context.Context, vindex *vschem return err } if res == nil || res.Workflow == "" { - return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", name, targetPrimary.Alias) + return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", name, topoproto.TabletAliasString(targetPrimary.Alias)) } for _, stream := range res.Streams { - if vindex.Owner == "" { - // If there's no owner, all streams need to be running. - if stream.State != binlogdatapb.VReplicationWorkflowState_Running { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Running state: %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State) - } - } else { - // If there's an owner, all streams need to be frozen. - if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped || stream.Message != Frozen { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not frozen: %v, %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State, stream.Message) - } + // All streams need to be frozen. + if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped || stream.Message != Frozen { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "stream %d for %v.%v is not frozen: %v, %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State, stream.Message) } } return nil diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 5125ef35022..9db0b54e046 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -128,7 +128,7 @@ const ( // Time to wait between LOCK TABLES cycles on the sources during SwitchWrites. lockTablesCycleDelay = time.Duration(100 * time.Millisecond) - SqlUnfreezeWorkflow = "update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s" + SqlUnfreezeWorkflow = "update _vt.vreplication set state='Running', message='' where db_name=%a and workflow=%a" ) var ( @@ -559,7 +559,7 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN // LookupVindexComplete checks if the lookup vindex has been externalized, // and if the vindex has an owner, it deletes the workflow. func (s *Server) LookupVindexComplete(ctx context.Context, req *vtctldatapb.LookupVindexCompleteRequest) (*vtctldatapb.LookupVindexCompleteResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.LookupVindexInternalize") + span, ctx := trace.NewSpan(ctx, "workflow.Server.LookupVindexComplete") defer span.Finish() span.Annotate("keyspace", req.Keyspace) @@ -581,20 +581,18 @@ func (s *Server) LookupVindexComplete(ctx context.Context, req *vtctldatapb.Look return nil, err } - // Assuming that the lookup vindex was externalized, we don't need to - // delete the write_only parameter from the vindex. + // Now that we have checked that the vindex has been externalized, + // we don't need to delete the write_only parameter from the vindex. resp := &vtctldatapb.LookupVindexCompleteResponse{} - if vindex.Owner != "" { - if _, derr := s.WorkflowDelete(ctx, &vtctldatapb.WorkflowDeleteRequest{ - Keyspace: req.TableKeyspace, - Workflow: req.Name, - KeepData: true, - KeepRoutingRules: true, - }); derr != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to delete workflow %s: %v", req.Name, derr) - } - resp.WorkflowDeleted = true - } + if _, derr := s.WorkflowDelete(ctx, &vtctldatapb.WorkflowDeleteRequest{ + Keyspace: req.TableKeyspace, + Workflow: req.Name, + KeepData: true, + KeepRoutingRules: true, + }); derr != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to delete workflow %s: %v", req.Name, derr) + } + resp.WorkflowDeleted = true return resp, nil } @@ -681,7 +679,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L return err } if res == nil || res.Workflow == "" { - return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", req.Name, targetPrimary.Alias) + return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", req.Name, topoproto.TabletAliasString(targetPrimary.Alias)) } for _, stream := range res.Streams { if stream.Bls.Filter == nil || len(stream.Bls.Filter.Rules) != 1 { @@ -720,7 +718,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L KeepData: true, // Not relevant KeepRoutingRules: true, // Not relevant }); derr != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to delete workflow %s: %v", req.Name, derr) + return nil, vterrors.Wrapf(derr, "failed to delete workflow %s", req.Name) } resp.WorkflowDeleted = true } else { @@ -780,6 +778,12 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L return nil, err } + // Make the vindex back to write_only and save the source vschema. + vindex.Params["write_only"] = "true" + if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil { + return nil, err + } + resp := &vtctldatapb.LookupVindexInternalizeResponse{} if vindex.Owner != "" { err := forAllShards(targetShards, func(si *topo.ShardInfo) error { @@ -787,8 +791,13 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L if err != nil { return err } - query := fmt.Sprintf(SqlUnfreezeWorkflow, - encodeString(tabletInfo.DbName()), encodeString(req.Name)) + query, err := sqlparser.ParseAndBind(SqlUnfreezeWorkflow, + sqltypes.StringBindVariable(tabletInfo.DbName()), + sqltypes.StringBindVariable(req.Name), + ) + if err != nil { + return err + } _, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query) return err }) @@ -798,11 +807,6 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L resp.WorkflowStarted = true } - // Make the vindex back to write_only and save the source vschema. - vindex.Params["write_only"] = "true" - if err := s.ts.SaveVSchema(ctx, req.Keyspace, sourceVSchema); err != nil { - return nil, err - } return resp, s.ts.RebuildSrvVSchema(ctx, nil) } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 3c83f1eeb2f..522128cbf60 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -568,7 +568,7 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if !textutil.ValueIsSimulatedNull(req.TabletTypes) { tabletTypes = req.TabletTypes } - if req.Message != nil && *req.Message != sqltypes.Null.String() { + if req.Message != nil { message = *req.Message } tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index b88dbd39ee2..116abd44cd4 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -2267,12 +2267,6 @@ func TestInternalizeLookupVindex(t *testing.T) { ms.SourceKeyspace, ms.SourceKeyspace) ownedRunning := sqltypes.MakeTestResult(fields, "1|Running|msg|"+ownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") ownedStopped := sqltypes.MakeTestResult(fields, "1|Stopped|"+workflow.Frozen+"|"+ownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") - unownedSourceStopAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}} stop_after_copy:true`, - ms.SourceKeyspace, ms.SourceKeyspace) - unownedSourceKeepRunningAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}}`, - ms.SourceKeyspace, ms.SourceKeyspace) - unownedRunning := sqltypes.MakeTestResult(fields, "2|Running|msg|"+unownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") - unownedStopped := sqltypes.MakeTestResult(fields, "2|Stopped|Stopped after copy|"+unownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") testcases := []struct { request *vtctldatapb.LookupVindexInternalizeRequest @@ -2308,7 +2302,6 @@ func TestInternalizeLookupVindex(t *testing.T) { Keyspace: ms.SourceKeyspace, TableKeyspace: ms.TargetKeyspace, }, - vrResponse: unownedStopped, expectedVschema: &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ "unowned_lookup": { @@ -2321,7 +2314,7 @@ func TestInternalizeLookupVindex(t *testing.T) { }, }, }, - err: "is not in Running state", + err: "no owner", }, { request: &vtctldatapb.LookupVindexInternalizeRequest{ @@ -2352,7 +2345,6 @@ func TestInternalizeLookupVindex(t *testing.T) { Keyspace: ms.SourceKeyspace, TableKeyspace: ms.TargetKeyspace, }, - vrResponse: unownedRunning, expectedVschema: &vschemapb.Keyspace{ Vindexes: map[string]*vschemapb.Vindex{ "unowned_lookup": { @@ -2366,6 +2358,7 @@ func TestInternalizeLookupVindex(t *testing.T) { }, }, }, + err: "no owner", }, { request: &vtctldatapb.LookupVindexInternalizeRequest{ @@ -2399,10 +2392,16 @@ func TestInternalizeLookupVindex(t *testing.T) { require.NotNil(t, tcase.request, "No request provided") for _, targetTablet := range targetShards { - targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, tcase.request.Name, tenv.dbName), tcase.vrResponse, nil) + if tcase.vrResponse != nil { + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, tcase.request.Name, tenv.dbName), tcase.vrResponse, nil) + } // Update queries are required only if the Vindex is owned. if len(tcase.expectedVschema.Vindexes) > 0 && tcase.expectedVschema.Vindexes[tcase.request.Name].Owner != "" { - unfreezeQuery := fmt.Sprintf(workflow.SqlUnfreezeWorkflow, sqltypes.EncodeStringSQL("vt_targetks"), sqltypes.EncodeStringSQL(tcase.request.Name)) + unfreezeQuery, err := sqlparser.ParseAndBind(workflow.SqlUnfreezeWorkflow, + sqltypes.StringBindVariable("vt_targetks"), + sqltypes.StringBindVariable(tcase.request.Name), + ) + require.NoError(t, err) tenv.tmc.setVReplicationExecResults(targetTablet.tablet, unfreezeQuery, &sqltypes.Result{}) } }