From 128198df9abc88fdb80341e2c81abb58741cd3f5 Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Mon, 30 Dec 2024 01:18:17 +0530 Subject: [PATCH] test(LookupVindex): Add tests for internalize and complete Signed-off-by: Noble Mittal --- .../vreplication/lookup_vindex_helper_test.go | 13 + .../vreplication/lookup_vindex_test.go | 11 + .../vreplication/vreplication_test.go | 8 + go/vt/vtctl/workflow/server.go | 7 +- .../vttablet/tabletmanager/framework_test.go | 5 + .../tabletmanager/rpc_vreplication_test.go | 555 +++++++++++++++++- 6 files changed, 592 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vreplication/lookup_vindex_helper_test.go b/go/test/endtoend/vreplication/lookup_vindex_helper_test.go index 1c74dadc642..80677c2bdde 100644 --- a/go/test/endtoend/vreplication/lookup_vindex_helper_test.go +++ b/go/test/endtoend/vreplication/lookup_vindex_helper_test.go @@ -86,6 +86,19 @@ func (lv *lookupVindex) externalize() { lv.expectWriteOnly(false) } +func (lv *lookupVindex) internalize() { + args := []string{ + "LookupVindex", + "--name", lv.name, + "--table-keyspace=" + lv.ownerTableKeyspace, + "internalize", + "--keyspace=" + lv.tableKeyspace, + } + err := vc.VtctldClient.ExecuteCommand(args...) + require.NoError(lv.t, err, "error executing LookupVindex internalize: %v", err) + lv.expectWriteOnly(true) +} + func (lv *lookupVindex) show() error { return nil } diff --git a/go/test/endtoend/vreplication/lookup_vindex_test.go b/go/test/endtoend/vreplication/lookup_vindex_test.go index c0864b26cca..08858517104 100644 --- a/go/test/endtoend/vreplication/lookup_vindex_test.go +++ b/go/test/endtoend/vreplication/lookup_vindex_test.go @@ -84,6 +84,7 @@ type lookupTestCase struct { initQuery string runningQuery string postExternalizeQuery string + postInternalizeQuery string cleanupQuery string } @@ -106,6 +107,7 @@ func TestLookupVindex(t *testing.T) { initQuery := "insert into t1 (c1, c2, val) values (1, 1, 'val1'), (2, 2, 'val2'), (3, 3, 'val3')" runningQuery := "insert into t1 (c1, c2, val) values (4, 4, 'val4'), (5, 5, 'val5'), (6, 6, 'val6')" postExternalizeQuery := "insert into t1 (c1, c2, val) values (7, 7, 'val7'), (8, 8, 'val8'), (9, 9, 'val9')" + postInternalizeQuery := "insert into t1 (c1, c2, val) values (10, 10, 'val10'), (11, 11, 'val11'), (12, 12, 'val12')" cleanupQuery := "delete from t1" testCases := []lookupTestCase{ @@ -158,6 +160,7 @@ func TestLookupVindex(t *testing.T) { tc.initQuery = initQuery tc.runningQuery = runningQuery tc.postExternalizeQuery = postExternalizeQuery + tc.postInternalizeQuery = postInternalizeQuery tc.cleanupQuery = cleanupQuery testLookupVindex(t, &tc) }) @@ -196,6 +199,14 @@ func testLookupVindex(t *testing.T, tc *lookupTestCase) { waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) }) + t.Run("internalize", func(t *testing.T) { + tc.lv.internalize() + totalRows += getNumRowsInQuery(t, tc.postInternalizeQuery) + _, err := vtgateConn.ExecuteFetch(tc.postInternalizeQuery, 1000, false) + require.NoError(t, err) + waitForRowCount(t, vtgateConn, tc.lv.ownerTableKeyspace, lv.name, totalRows) + }) + t.Run("cleanup", func(t *testing.T) { _, err := vtgateConn.ExecuteFetch(tc.cleanupQuery, 1000, false) require.NoError(t, err) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 955afde2f18..292dd126838 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -417,6 +417,14 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string vdx = gjson.Get(customerVSchema, fmt.Sprintf("vindexes.%s", vindexName)) require.NotNil(t, vdx, "lookup vindex %s not found", vindexName) require.NotEqual(t, "true", vdx.Get("params.write_only").String(), "did not expect write_only parameter to be true") + + err = vc.VtctldClient.ExecuteCommand("LookupVindex", "--name", vindexName, "--table-keyspace=product", "internalize", "--keyspace=customer") + require.NoError(t, err, "error executing LookupVindex internalize: %v", err) + customerVSchema, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "customer") + require.NoError(t, err, "error executing GetVSchema: %v", err) + vdx = gjson.Get(customerVSchema, fmt.Sprintf("vindexes.%s", vindexName)) + require.NotNil(t, vdx, "lookup vindex %s not found", vindexName) + require.Equal(t, "true", vdx.Get("params.write_only").String(), "expected write_only parameter to be true") }) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index c319ce546e2..c03e89ab248 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -127,6 +127,9 @@ const ( lockTablesCycles = 2 // Time to wait between LOCK TABLES cycles on the sources during SwitchWrites. lockTablesCycleDelay = time.Duration(100 * time.Millisecond) + + SqlFreezeWorkflow = "update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s" + SqlUnfreezeWorkflow = "update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s" ) var ( @@ -762,7 +765,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L return vterrors.Wrapf(err, "failed to stop workflow %s on shard %s/%s", req.Name, tabletInfo.Keyspace, tabletInfo.Shard) } // Mark workflow as frozen. - query := fmt.Sprintf("update _vt.vreplication set message = '%s' where db_name=%s and workflow=%s", Frozen, + query := fmt.Sprintf(SqlFreezeWorkflow, Frozen, encodeString(tabletInfo.DbName()), encodeString(req.Name)) _, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query) return err @@ -849,7 +852,7 @@ func (s *Server) LookupVindexInternalize(ctx context.Context, req *vtctldatapb.L if err != nil { return err } - query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", + query := fmt.Sprintf(SqlUnfreezeWorkflow, encodeString(tabletInfo.DbName()), encodeString(req.Name)) _, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query) return err diff --git a/go/vt/vttablet/tabletmanager/framework_test.go b/go/vt/vttablet/tabletmanager/framework_test.go index 35aa7a08b46..8e03f73bf79 100644 --- a/go/vt/vttablet/tabletmanager/framework_test.go +++ b/go/vt/vttablet/tabletmanager/framework_test.go @@ -399,6 +399,8 @@ type fakeTMClient struct { getSchemaCounts map[string]int // Used to confirm the number of times WorkflowDelete was called. workflowDeleteCalls int + // Used to confirm the number of times UpdateVReplicationWorkflow with state as Stopped was called. + workflowStopCalls int } func newFakeTMClient() *fakeTMClient { @@ -580,5 +582,8 @@ func (tmc *fakeTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet * func (tmc *fakeTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) { tmc.mu.Lock() defer tmc.mu.Unlock() + if *req.State == binlogdatapb.VReplicationWorkflowState_Stopped { + tmc.workflowStopCalls++ + } return tmc.tablets[int(tablet.Alias.Uid)].tm.UpdateVReplicationWorkflow(ctx, req) } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 3f8bc85ac7f..f6c05272027 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -1914,7 +1914,7 @@ func TestExternalizeLookupVindex(t *testing.T) { vrResponse *sqltypes.Result err string expectedVschema *vschemapb.Keyspace - expectDelete bool + expectStopped bool }{ { request: &vtctldatapb.LookupVindexExternalizeRequest{ @@ -1936,7 +1936,7 @@ func TestExternalizeLookupVindex(t *testing.T) { }, }, }, - expectDelete: true, + expectStopped: true, }, { request: &vtctldatapb.LookupVindexExternalizeRequest{ @@ -1979,7 +1979,7 @@ func TestExternalizeLookupVindex(t *testing.T) { }, }, }, - expectDelete: true, + expectStopped: true, }, { request: &vtctldatapb.LookupVindexExternalizeRequest{ @@ -2032,6 +2032,551 @@ func TestExternalizeLookupVindex(t *testing.T) { require.NotNil(t, tcase.request, "No request provided") + bls := fmt.Sprintf("keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"}}", sourceKs, sourceShard) + + idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a", + sqltypes.Int64BindVariable(int64(vreplID))) + require.NoError(t, err) + idRes := sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id", + "int64", + ), + fmt.Sprintf("%d", vreplID), + ) + + streamsResult := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|state|cell|tablet_types|source", + "int64|varchar|varchar|varchar|varchar"), + fmt.Sprintf("%d|%s|cell1|PRIMARY|keyspace:\"%s\" shard:\"%s\"", 1, binlogdatapb.VReplicationWorkflowState_Stopped.String(), sourceKs, sourceShard), + ) + for _, targetTablet := range targetShards { + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, tcase.request.Name, tenv.dbName), tcase.vrResponse, nil) + // Update queries are required only if the Vindex is owned. + if len(tcase.expectedVschema.Vindexes) > 0 && tcase.expectedVschema.Vindexes[tcase.request.Name].Owner != "" { + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflowConfig, tcase.request.Name), sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|source|cell|tablet_types|state|message", + "int64|blob|varchar|varchar|varchar|varchar", + ), + fmt.Sprintf("%d|%s||primary|Stopped|", vreplID, bls), + ), nil) + targetTablet.vrdbClient.ExpectRequest(idQuery, idRes, nil) + targetTablet.vrdbClient.ExpectRequest(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:"sourceks" shard:"0" filter:{rules:{match:"t1" filter:"select * from t1"}}', cell = '', tablet_types = '' where id in (1)`, &sqltypes.Result{}, nil) + targetTablet.vrdbClient.ExpectRequest(`select * from _vt.vreplication where id = 1`, streamsResult, nil) + + freezeQuery := fmt.Sprintf(workflow.SqlFreezeWorkflow, workflow.Frozen, sqltypes.EncodeStringSQL("vt_targetks"), sqltypes.EncodeStringSQL(tcase.request.Name)) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, freezeQuery, &sqltypes.Result{}) + } + } + + preWorkflowStopCalls := tenv.tmc.workflowStopCalls + _, err = ws.LookupVindexExternalize(ctx, tcase.request) + if tcase.err != "" { + if err == nil || !strings.Contains(err.Error(), tcase.err) { + require.FailNow(t, "LookupVindexExternalize error", "LookupVindexExternalize(%v) err: %v, must contain %v", tcase.request, err, tcase.err) + } + return + } + require.NoError(t, err) + expectedWorkflowStopCalls := preWorkflowStopCalls + if tcase.expectStopped { + // We expect the RPC to be called on each target shard. + expectedWorkflowStopCalls = preWorkflowStopCalls + (len(targetShards)) + } + require.Equal(t, expectedWorkflowStopCalls, tenv.tmc.workflowStopCalls) + + aftervschema, err := tenv.ts.GetVSchema(ctx, ms.SourceKeyspace) + require.NoError(t, err) + vindex := aftervschema.Vindexes[tcase.request.Name] + expectedVindex := tcase.expectedVschema.Vindexes[tcase.request.Name] + require.NotNil(t, vindex, "vindex %s not found in vschema", tcase.request.Name) + require.NotContains(t, vindex.Params, "write_only", tcase.request) + require.Equal(t, expectedVindex, vindex, "vindex mismatch. expected: %+v, got: %+v", expectedVindex, vindex) + }) + } +} + +func TestInternalizeLookupVindex(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sourceKs := "sourceks" + sourceShard := "0" + sourceTabletUID := 200 + targetKs := "targetks" + targetShards := make(map[string]*fakeTabletConn) + targetTabletUID := 300 + wf := "testwf" + vreplID := 1 + vtenv := vtenv.NewTestEnv() + tenv := newTestEnv(t, ctx, sourceKs, []string{shard}) + defer tenv.close() + + sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, sourceShard) + defer tenv.deleteTablet(sourceTablet.tablet) + + targetShards["-80"] = tenv.addTablet(t, targetTabletUID, targetKs, "-80") + defer tenv.deleteTablet(targetShards["-80"].tablet) + addInvariants(targetShards["-80"].vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) + targetShards["80-"] = tenv.addTablet(t, targetTabletUID+10, targetKs, "80-") + defer tenv.deleteTablet(targetShards["80-"].tablet) + addInvariants(targetShards["80-"].vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) + + ws := workflow.NewServer(vtenv, tenv.ts, tenv.tmc) + ms := &vtctldatapb.MaterializeSettings{ + // Keyspace where the vindex is created. + SourceKeyspace: sourceKs, + // Keyspace where the lookup table and VReplication workflow is created. + TargetKeyspace: targetKs, + Cell: tenv.cells[0], + TabletTypes: topoproto.MakeStringTypeCSV([]topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_RDONLY, + }), + } + + sourceVschema := &vschemapb.Keyspace{ + Sharded: false, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + "owned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.owned_lookup", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + "unowned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.unowned_lookup", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + }, + "unqualified_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "unqualified", + "from": "c1", + "to": "c2", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "xxhash", + Column: "col1", + }, { + Name: "owned_lookup", + Column: "col2", + }}, + }, + }, + } + + trxTS := fmt.Sprintf("%d", time.Now().Unix()) + fields := sqltypes.MakeTestFields( + "id|state|message|source|workflow_type|workflow_sub_type|max_tps|max_replication_lag|time_updated|time_heartbeat|time_throttled|transaction_timestamp|rows_copied|options", + "int64|varbinary|varbinary|blob|int64|int64|int64|int64|int64|int64|int64|int64|int64|varchar", + ) + wftype := fmt.Sprintf("%d", binlogdatapb.VReplicationWorkflowType_CreateLookupIndex) + ownedSourceStopAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"owned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}} stop_after_copy:true`, + ms.SourceKeyspace, ms.SourceKeyspace) + ownedSourceKeepRunningAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"owned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}}`, + ms.SourceKeyspace, ms.SourceKeyspace) + ownedRunning := sqltypes.MakeTestResult(fields, "1|Running|msg|"+ownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + ownedStopped := sqltypes.MakeTestResult(fields, "1|Stopped|"+workflow.Frozen+"|"+ownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + unownedSourceStopAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}} stop_after_copy:true`, + ms.SourceKeyspace, ms.SourceKeyspace) + unownedSourceKeepRunningAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}}`, + ms.SourceKeyspace, ms.SourceKeyspace) + unownedRunning := sqltypes.MakeTestResult(fields, "2|Running|msg|"+unownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + unownedStopped := sqltypes.MakeTestResult(fields, "2|Stopped|Stopped after copy|"+unownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + + testcases := []struct { + request *vtctldatapb.LookupVindexInternalizeRequest + vrResponse *sqltypes.Result + err string + expectedVschema *vschemapb.Keyspace + }{ + { + request: &vtctldatapb.LookupVindexInternalizeRequest{ + Name: "owned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: ownedStopped, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "owned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.owned_lookup", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + }, + }, + { + request: &vtctldatapb.LookupVindexInternalizeRequest{ + Name: "unowned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: unownedStopped, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "unowned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.unowned_lookup", + "from": "c1", + "to": "c2", + }, + }, + }, + }, + err: "is not in Running state", + }, + { + request: &vtctldatapb.LookupVindexInternalizeRequest{ + Name: "owned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: ownedRunning, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "owned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.owned_lookup", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + Owner: "t1", + }, + }, + }, + err: "not frozen", + }, + { + request: &vtctldatapb.LookupVindexInternalizeRequest{ + Name: "unowned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: unownedRunning, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "unowned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.unowned_lookup", + "from": "c1", + "to": "c2", + "write_only": "true", + }, + }, + }, + }, + }, + { + request: &vtctldatapb.LookupVindexInternalizeRequest{ + Name: "absent_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "absent_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.absent_lookup", + "from": "c1", + "to": "c2", + }, + }, + }, + }, + err: "vindex absent_lookup not found in the sourceks keyspace", + }, + } + for _, tcase := range testcases { + t.Run(tcase.request.Name, func(t *testing.T) { + // Resave the source schema for every iteration. + err := tenv.ts.SaveVSchema(ctx, tcase.request.Keyspace, sourceVschema) + require.NoError(t, err) + err = tenv.ts.RebuildSrvVSchema(ctx, []string{tenv.cells[0]}) + require.NoError(t, err) + + require.NotNil(t, tcase.request, "No request provided") + + for _, targetTablet := range targetShards { + targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, tcase.request.Name, tenv.dbName), tcase.vrResponse, nil) + // Update queries are required only if the Vindex is owned. + if len(tcase.expectedVschema.Vindexes) > 0 && tcase.expectedVschema.Vindexes[tcase.request.Name].Owner != "" { + unfreezeQuery := fmt.Sprintf(workflow.SqlUnfreezeWorkflow, sqltypes.EncodeStringSQL("vt_targetks"), sqltypes.EncodeStringSQL(tcase.request.Name)) + tenv.tmc.setVReplicationExecResults(targetTablet.tablet, unfreezeQuery, &sqltypes.Result{}) + } + } + + _, err = ws.LookupVindexInternalize(ctx, tcase.request) + if tcase.err != "" { + if err == nil || !strings.Contains(err.Error(), tcase.err) { + require.FailNow(t, "LookupVindexInternalize error", "LookupVindexInternalize(%v) err: %v, must contain %v", tcase.request, err, tcase.err) + } + return + } + require.NoError(t, err) + aftervschema, err := tenv.ts.GetVSchema(ctx, ms.SourceKeyspace) + require.NoError(t, err) + vindex := aftervschema.Vindexes[tcase.request.Name] + expectedVindex := tcase.expectedVschema.Vindexes[tcase.request.Name] + require.NotNil(t, vindex, "vindex %s not found in vschema", tcase.request.Name) + require.Equal(t, expectedVindex, vindex, "vindex mismatch. expected: %+v, got: %+v", expectedVindex, vindex) + }) + } +} + +func TestCompleteLookupVindex(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sourceKs := "sourceks" + sourceShard := "0" + sourceTabletUID := 200 + targetKs := "targetks" + targetShards := make(map[string]*fakeTabletConn) + targetTabletUID := 300 + wf := "testwf" + vreplID := 1 + vtenv := vtenv.NewTestEnv() + tenv := newTestEnv(t, ctx, sourceKs, []string{shard}) + defer tenv.close() + + sourceTablet := tenv.addTablet(t, sourceTabletUID, sourceKs, sourceShard) + defer tenv.deleteTablet(sourceTablet.tablet) + + targetShards["-80"] = tenv.addTablet(t, targetTabletUID, targetKs, "-80") + defer tenv.deleteTablet(targetShards["-80"].tablet) + addInvariants(targetShards["-80"].vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) + targetShards["80-"] = tenv.addTablet(t, targetTabletUID+10, targetKs, "80-") + defer tenv.deleteTablet(targetShards["80-"].tablet) + addInvariants(targetShards["80-"].vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0]) + + ws := workflow.NewServer(vtenv, tenv.ts, tenv.tmc) + ms := &vtctldatapb.MaterializeSettings{ + // Keyspace where the vindex is created. + SourceKeyspace: sourceKs, + // Keyspace where the lookup table and VReplication workflow is created. + TargetKeyspace: targetKs, + Cell: tenv.cells[0], + TabletTypes: topoproto.MakeStringTypeCSV([]topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_RDONLY, + }), + } + + sourceVschema := &vschemapb.Keyspace{ + Sharded: false, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + "owned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.owned_lookup", + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + "unowned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.unowned_lookup", + "from": "c1", + "to": "c2", + }, + }, + "unqualified_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "unqualified", + "from": "c1", + "to": "c2", + }, + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "xxhash", + Column: "col1", + }, { + Name: "owned_lookup", + Column: "col2", + }}, + }, + }, + } + + trxTS := fmt.Sprintf("%d", time.Now().Unix()) + fields := sqltypes.MakeTestFields( + "id|state|message|source|workflow_type|workflow_sub_type|max_tps|max_replication_lag|time_updated|time_heartbeat|time_throttled|transaction_timestamp|rows_copied|options", + "int64|varbinary|varbinary|blob|int64|int64|int64|int64|int64|int64|int64|int64|int64|varchar", + ) + wftype := fmt.Sprintf("%d", binlogdatapb.VReplicationWorkflowType_CreateLookupIndex) + ownedSourceStopAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"owned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}} stop_after_copy:true`, + ms.SourceKeyspace, ms.SourceKeyspace) + ownedSourceKeepRunningAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"owned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}}`, + ms.SourceKeyspace, ms.SourceKeyspace) + ownedRunning := sqltypes.MakeTestResult(fields, "1|Running|msg|"+ownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + ownedStopped := sqltypes.MakeTestResult(fields, "1|Stopped|"+workflow.Frozen+"|"+ownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + unownedSourceStopAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}} stop_after_copy:true`, + ms.SourceKeyspace, ms.SourceKeyspace) + unownedSourceKeepRunningAfterCopy := fmt.Sprintf(`keyspace:"%s",shard:"0",filter:{rules:{match:"unowned_lookup" filter:"select * from t1 where in_keyrange(col1, '%s.xxhash', '-80')"}}`, + ms.SourceKeyspace, ms.SourceKeyspace) + unownedRunning := sqltypes.MakeTestResult(fields, "2|Running|msg|"+unownedSourceKeepRunningAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + unownedStopped := sqltypes.MakeTestResult(fields, "2|Stopped|Stopped after copy|"+unownedSourceStopAfterCopy+"|"+wftype+"|0|0|0|0|0|0|"+trxTS+"|5|{}") + + testcases := []struct { + request *vtctldatapb.LookupVindexCompleteRequest + vrResponse *sqltypes.Result + err string + expectedVschema *vschemapb.Keyspace + expectDelete bool + }{ + { + request: &vtctldatapb.LookupVindexCompleteRequest{ + Name: "owned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: ownedStopped, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "owned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.owned_lookup", + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + }, + expectDelete: true, + }, + { + request: &vtctldatapb.LookupVindexCompleteRequest{ + Name: "unowned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: unownedStopped, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "unowned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.unowned_lookup", + "from": "c1", + "to": "c2", + }, + }, + }, + }, + err: "is not in Running state", + }, + { + request: &vtctldatapb.LookupVindexCompleteRequest{ + Name: "owned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: ownedRunning, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "owned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.owned_lookup", + "from": "c1", + "to": "c2", + }, + Owner: "t1", + }, + }, + }, + err: "not frozen", + }, + { + request: &vtctldatapb.LookupVindexCompleteRequest{ + Name: "unowned_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + vrResponse: unownedRunning, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "unowned_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.unowned_lookup", + "from": "c1", + "to": "c2", + }, + }, + }, + }, + }, + { + request: &vtctldatapb.LookupVindexCompleteRequest{ + Name: "absent_lookup", + Keyspace: ms.SourceKeyspace, + TableKeyspace: ms.TargetKeyspace, + }, + expectedVschema: &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "absent_lookup": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "targetks.absent_lookup", + "from": "c1", + "to": "c2", + }, + }, + }, + }, + err: "vindex absent_lookup not found in the sourceks keyspace", + }, + } + for _, tcase := range testcases { + t.Run(tcase.request.Name, func(t *testing.T) { + // Resave the source schema for every iteration. + err := tenv.ts.SaveVSchema(ctx, tcase.request.Keyspace, sourceVschema) + require.NoError(t, err) + err = tenv.ts.RebuildSrvVSchema(ctx, []string{tenv.cells[0]}) + require.NoError(t, err) + + require.NotNil(t, tcase.request, "No request provided") + for _, targetTablet := range targetShards { targetTablet.vrdbClient.ExpectRequest(fmt.Sprintf(readWorkflow, tcase.request.Name, tenv.dbName), tcase.vrResponse, nil) if tcase.err == "" { @@ -2042,10 +2587,10 @@ func TestExternalizeLookupVindex(t *testing.T) { } preWorkflowDeleteCalls := tenv.tmc.workflowDeleteCalls - _, err = ws.LookupVindexExternalize(ctx, tcase.request) + _, err = ws.LookupVindexComplete(ctx, tcase.request) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { - require.FailNow(t, "LookupVindexExternalize error", "ExternalizeVindex(%v) err: %v, must contain %v", tcase.request, err, tcase.err) + require.FailNow(t, "LookupVindexComplete error", "LookupVindexComplete(%v) err: %v, must contain %v", tcase.request, err, tcase.err) } return }