diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index d95da7acd37..aec8c3e2259 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -1258,12 +1258,12 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp // a bit different than for rdonly / replica to guarantee a smooth transition. // // The order is as follows: -// - Add BlacklistedTables on the source shard map for master -// - Refresh the source master, so it stops writing on the tables -// - Get the source master position, wait until destination master reaches it -// - Clear SourceShard on the destination Shard -// - Refresh the destination master, so its stops its filtered -// replication and starts accepting writes +// - Add BlacklistedTables on the source shard map for master +// - Refresh the source master, so it stops writing on the tables +// - Get the source master position, wait until destination master reaches it +// - Clear SourceShard on the destination Shard +// - Refresh the destination master, so its stops its filtered +// replication and starts accepting writes func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, tables []string, ev *events.MigrateServedFrom, filteredReplicationWaitTime time.Duration) error { // Read the data we need ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 6e5ffc15fbb..115d0e94112 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -25,6 +25,7 @@ import ( "sync" "text/template" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/log" @@ -696,7 +697,7 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s if err != nil { return err } - p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName()))) + p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message, source from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName()))) if err != nil { return err } @@ -708,8 +709,14 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s } state := row[1].ToString() message := row[2].ToString() - if sourceVindex.Owner == "" { - // If there's no owner, all streams need to be running. + var bls binlogdatapb.BinlogSource + sourceBytes := row[3].ToBytes() + if err := prototext.Unmarshal(sourceBytes, &bls); err != nil { + return err + } + if sourceVindex.Owner == "" || !bls.StopAfterCopy { + // If there's no owner or we've requested that the workflow NOT be stopped + // after the copy phase completes, then all streams need to be running. if state != binlogplayer.BlpRunning { return fmt.Errorf("stream %d for %v.%v is not in Running state: %v", id, targetShard.Keyspace(), targetShard.ShardName(), state) } @@ -753,7 +760,6 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s return wr.ts.RebuildSrvVSchema(ctx, nil) } -// func (wr *Wrangler) collectTargetStreams(ctx context.Context, mz *materializer) ([]string, error) { var shardTablets []string var mu sync.Mutex diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 868a4104ea4..1c1cd1198f2 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -264,7 +264,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou if err != nil { wr.Logger().Printf("Error converting report to json: %v", err.Error()) } - jsonOutput += fmt.Sprintf("%s", json) + jsonOutput += string(json) wr.logger.Printf("%s", jsonOutput) } else { for table, dr := range diffReports {