Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding continueAfterCopyWithOwner flag #7

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2280,6 +2280,7 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl
//TODO: keep -cell around for backward compatibility and remove it in a future version
cell := subFlags.String("cell", "", "Cell to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.")
continueAfterCopyWithOwner := subFlags.Bool("continue_after_copy_with_owner", false, "Vindex will continue materialization after copy when an owner is provided")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -2294,7 +2295,7 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl
if err := json2.Unmarshal([]byte(subFlags.Arg(1)), specs); err != nil {
return err
}
return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes)
return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes, *continueAfterCopyWithOwner)
}

func commandExternalizeVindex(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
12 changes: 6 additions & 6 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,12 +1258,12 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp
// a bit different than for rdonly / replica to guarantee a smooth transition.
//
// The order is as follows:
// - Add BlacklistedTables on the source shard map for master
// - Refresh the source master, so it stops writing on the tables
// - Get the source master position, wait until destination master reaches it
// - Clear SourceShard on the destination Shard
// - Refresh the destination master, so its stops its filtered
// replication and starts accepting writes
// - Add BlacklistedTables on the source shard map for master
// - Refresh the source master, so it stops writing on the tables
// - Get the source master position, wait until destination master reaches it
// - Clear SourceShard on the destination Shard
// - Refresh the destination master, so its stops its filtered
// replication and starts accepting writes
func (wr *Wrangler) masterMigrateServedFrom(ctx context.Context, ki *topo.KeyspaceInfo, sourceShard *topo.ShardInfo, destinationShard *topo.ShardInfo, tables []string, ev *events.MigrateServedFrom, filteredReplicationWaitTime time.Duration) error {
// Read the data we need
ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
Expand Down
22 changes: 14 additions & 8 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"text/template"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -343,8 +344,8 @@ func (wr *Wrangler) checkIfPreviousJournalExists(ctx context.Context, mz *materi
}

// CreateLookupVindex creates a lookup vindex and sets up the backfill.
func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, cell, tabletTypes string) error {
ms, sourceVSchema, targetVSchema, err := wr.prepareCreateLookup(ctx, keyspace, specs)
func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, cell, tabletTypes string, continueAfterCopyWithOwner bool) error {
ms, sourceVSchema, targetVSchema, err := wr.prepareCreateLookup(ctx, keyspace, specs, continueAfterCopyWithOwner)
if err != nil {
return err
}
Expand All @@ -364,7 +365,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe
}

// prepareCreateLookup performs the preparatory steps for creating a lookup vindex.
func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) {
func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) {
// Important variables are pulled out here.
var (
// lookup vindex info
Expand Down Expand Up @@ -617,7 +618,7 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
Workflow: targetTableName + "_vdx",
SourceKeyspace: keyspace,
TargetKeyspace: targetKeyspace,
StopAfterCopy: vindex.Owner != "",
StopAfterCopy: vindex.Owner != "" && !continueAfterCopyWithOwner,
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: targetTableName,
SourceExpression: materializeQuery,
Expand Down Expand Up @@ -696,7 +697,7 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s
if err != nil {
return err
}
p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName())))
p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, state, message, source from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName())))
if err != nil {
return err
}
Expand All @@ -708,8 +709,14 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s
}
state := row[1].ToString()
message := row[2].ToString()
if sourceVindex.Owner == "" {
// If there's no owner, all streams need to be running.
var bls binlogdatapb.BinlogSource
sourceBytes := row[3].ToBytes()
if err := prototext.Unmarshal(sourceBytes, &bls); err != nil {
return err
}
if sourceVindex.Owner == "" || !bls.StopAfterCopy {
// If there's no owner or we've requested that the workflow NOT be stopped
// after the copy phase completes, then all streams need to be running.
if state != binlogplayer.BlpRunning {
return fmt.Errorf("stream %d for %v.%v is not in Running state: %v", id, targetShard.Keyspace(), targetShard.ShardName(), state)
}
Expand Down Expand Up @@ -753,7 +760,6 @@ func (wr *Wrangler) ExternalizeVindex(ctx context.Context, qualifiedVindexName s
return wr.ts.RebuildSrvVSchema(ctx, nil)
}

//
func (wr *Wrangler) collectTargetStreams(ctx context.Context, mz *materializer) ([]string, error) {
var shardTablets []string
var mu sync.Mutex
Expand Down
106 changes: 92 additions & 14 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestCreateLookupVindexFull(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='lkp_vdx'", &sqltypes.Result{})

ctx := context.Background()
err := env.wr.CreateLookupVindex(ctx, ms.SourceKeyspace, specs, "cell", "MASTER")
err := env.wr.CreateLookupVindex(ctx, ms.SourceKeyspace, specs, "cell", "MASTER", false)
require.NoError(t, err)

wantvschema := &vschemapb.Keyspace{
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
delete(env.tmc.schema, ms.SourceKeyspace+".t1")
}

outms, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, tcase.specs)
outms, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, tcase.specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
Expand Down Expand Up @@ -808,7 +808,7 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) {
t.Fatal(err)
}

_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs)
_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, tcase.out) {
t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out)
Expand Down Expand Up @@ -1041,7 +1041,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) {
t.Fatal(err)
}

_, _, got, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs)
_, _, got, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
Expand Down Expand Up @@ -1156,13 +1156,89 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) {
t.Fatal(err)
}

_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs)
_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, want) {
t.Errorf("same keyspace: got:\n%v, want\n%v", got, want)
}
}

func TestStopAfterCopyFlag(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
SourceKeyspace: "ks",
TargetKeyspace: "ks",
}
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()
specs := &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": "ks.lkp",
"from": "c1",
"to": "col2",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Column: "col2",
}},
},
},
}
// Dummy sourceSchema
sourceSchema := "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1"

vschema := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "col1",
}},
},
},
}
env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Fields: []*querypb.Field{{
Name: "col1",
Type: querypb.Type_INT64,
}, {
Name: "col2",
Type: querypb.Type_INT64,
}},
Schema: sourceSchema,
}},
}
if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil {
t.Fatal(err)
}

ms1, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
require.Equal(t, ms1.StopAfterCopy, true)

ms2, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, true)
require.NoError(t, err)
require.Equal(t, ms2.StopAfterCopy, false)
}

func TestCreateLookupVindexFailures(t *testing.T) {
topoServ := memorytopo.NewServer("cell")
wr := New(logutil.NewConsoleLogger(), topoServ, nil)
Expand Down Expand Up @@ -1422,7 +1498,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
err: "ColumnVindex for table t1 already exists: c1",
}}
for _, tcase := range testcases {
err := wr.CreateLookupVindex(context.Background(), "sourceks", tcase.input, "", "")
err := wr.CreateLookupVindex(context.Background(), "sourceks", tcase.input, "", "", false)
if !strings.Contains(err.Error(), tcase.err) {
t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
}
Expand Down Expand Up @@ -1484,11 +1560,13 @@ func TestExternalizeVindex(t *testing.T) {
},
}
fields := sqltypes.MakeTestFields(
"id|state|message",
"int64|varbinary|varbinary",
"id|state|message|source",
"int64|varbinary|varbinary|blob",
)
running := sqltypes.MakeTestResult(fields, "1|Running|msg")
stopped := sqltypes.MakeTestResult(fields, "1|Stopped|Stopped after copy")
sourceStopAfterCopy := `keyspace:"sourceKs",shard:"0",filter:{rules:{match:"owned" filter:"select * from t1 where in_keyrange(col1, 'sourceKs.hash', '-80')"}} stop_after_copy:true`
sourceKeepRunningAfterCopy := `keyspace:"sourceKs",shard:"0",filter:{rules:{match:"owned" filter:"select * from t1 where in_keyrange(col1, 'sourceKs.hash', '-80')"}}`
running := sqltypes.MakeTestResult(fields, "1|Running|msg|"+sourceKeepRunningAfterCopy)
stopped := sqltypes.MakeTestResult(fields, "1|Stopped|Stopped after copy|"+sourceStopAfterCopy)
testcases := []struct {
input string
vrResponse *sqltypes.Result
Expand All @@ -1511,9 +1589,9 @@ func TestExternalizeVindex(t *testing.T) {
input: "sourceks.bad",
err: "table name in vindex should be of the form keyspace.table: unqualified",
}, {
input: "sourceks.owned",
vrResponse: running,
err: "is not in Stopped after copy state",
input: "sourceks.owned",
vrResponse: running,
expectDelete: true,
}, {
input: "sourceks.unowned",
vrResponse: stopped,
Expand All @@ -1525,7 +1603,7 @@ func TestExternalizeVindex(t *testing.T) {
t.Fatal(err)
}
if tcase.vrResponse != nil {
validationQuery := "select id, state, message from _vt.vreplication where workflow='lkp_vdx' and db_name='vt_targetks'"
validationQuery := "select id, state, message, source from _vt.vreplication where workflow='lkp_vdx' and db_name='vt_targetks'"
env.tmc.expectVRQuery(200, validationQuery, tcase.vrResponse)
env.tmc.expectVRQuery(210, validationQuery, tcase.vrResponse)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou
if err != nil {
wr.Logger().Printf("Error converting report to json: %v", err.Error())
}
jsonOutput += fmt.Sprintf("%s", json)
jsonOutput += string(json)
wr.logger.Printf("%s", jsonOutput)
} else {
for table, dr := range diffReports {
Expand Down
Loading