Skip to content

Commit

Permalink
Externalize VIndexes properly when not stopping after copy
Browse files Browse the repository at this point in the history
We did not check for this config state at externalize.
When -stop_after_copy=false was used for the lookup vindex
materialization the workflow state will be Running when
it's healthy.

Signed-off-by: Matt Lord <[email protected]>
Signed-off-by: Vaidas Balys <[email protected]>
  • Loading branch information
mattlord authored and vbalys committed Sep 22, 2023
1 parent 4d028a5 commit b4e50ec
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
12 changes: 6 additions & 6 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,12 +1258,12 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp
// a bit different than for rdonly / replica to guarantee a smooth transition.
//
// The order is as follows:
// - Add BlacklistedTables on the source shard map for master
// - Refresh the source master, so it stops writing on the tables
// - Get the source master position, wait until destination master reaches it
// - Clear SourceShard on the destination Shard
// - Refresh the destination master, so its stops its filtered
// replication and starts accepting writes
// - Add BlacklistedTables on the source shard map for master
// - Refresh the source master, so it stops writing on the tables
// - Get the source master position, wait until destination master reaches it
// - Clear SourceShard on the destination Shard
// - Refresh the destination master, so its stops its filtered
// replication and starts accepting writes
func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, tables []string, ev *events.MigrateServedFrom, filteredReplicationWaitTime time.Duration) error {
// Read the data we need
ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
Expand Down
14 changes: 10 additions & 4 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"text/template"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -696,7 +697,7 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s
if err != nil {
return err
}
p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName())))
p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message, source from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName())))
if err != nil {
return err
}
Expand All @@ -708,8 +709,14 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s
}
state := row[1].ToString()
message := row[2].ToString()
if sourceVindex.Owner == "" {
// If there's no owner, all streams need to be running.
var bls binlogdatapb.BinlogSource
sourceBytes := row[3].ToBytes()
if err := prototext.Unmarshal(sourceBytes, &bls); err != nil {
return err
}
if sourceVindex.Owner == "" || !bls.StopAfterCopy {
// If there's no owner or we've requested that the workflow NOT be stopped
// after the copy phase completes, then all streams need to be running.
if state != binlogplayer.BlpRunning {
return fmt.Errorf("stream %d for %v.%v is not in Running state: %v", id, targetShard.Keyspace(), targetShard.ShardName(), state)
}
Expand Down Expand Up @@ -753,7 +760,6 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s
return wr.ts.RebuildSrvVSchema(ctx, nil)
}

//
func (wr *Wrangler) collectTargetStreams(ctx context.Context, mz *materializer) ([]string, error) {
var shardTablets []string
var mu sync.Mutex
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou
if err != nil {
wr.Logger().Printf("Error converting report to json: %v", err.Error())
}
jsonOutput += fmt.Sprintf("%s", json)
jsonOutput += string(json)
wr.logger.Printf("%s", jsonOutput)
} else {
for table, dr := range diffReports {
Expand Down

0 comments on commit b4e50ec

Please sign in to comment.