Skip to content

Commit

Permalink
test: Add unit tests for LookupVindexExternalize
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Oct 21, 2024
1 parent aba477f commit b22f3a5
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 19 deletions.
1 change: 1 addition & 0 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (tmc *testMaterializerTMClient) ReadVReplicationWorkflow(ctx context.Contex
Rules: rules,
},
},
State: binlogdatapb.VReplicationWorkflowState_Running,
}
}
return &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
Expand Down
20 changes: 1 addition & 19 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,25 +1197,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
return nil, err
}

// Create a parallelizer function.
forAllTargets := func(f func(*topo.ShardInfo) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, targetShard := range targetShards {
wg.Add(1)
go func(targetShard *topo.ShardInfo) {
defer wg.Done()

if err := f(targetShard); err != nil {
allErrors.RecordError(err)
}
}(targetShard)
}
wg.Wait()
return allErrors.AggrError(vterrors.Aggregate)
}

err = forAllTargets(func(targetShard *topo.ShardInfo) error {
err = forAllShards(targetShards, func(targetShard *topo.ShardInfo) error {
targetPrimary, err := s.ts.GetTablet(ctx, targetShard.PrimaryAlias)
if err != nil {
return err
Expand Down
122 changes: 122 additions & 0 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -1892,3 +1893,124 @@ func TestGetWorkflowsStreamLogs(t *testing.T) {
assert.Equal(t, gotLogs[0].State, "Running")
assert.Equal(t, gotLogs[0].Id, int64(3))
}

// func TestVDiffShow(t *testing.T) {
// ctx := context.Background()

// sourceKeyspace := "source_keyspace"
// targetKeyspace := "target_keyspace"
// workflow := "test_workflow"

// sourceShards := []string{"-"}
// targetShards := []string{"-"}

// te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{
// SourceKeyspace: sourceKeyspace,
// TargetKeyspace: targetKeyspace,
// Workflow: workflow,
// TableSettings: []*vtctldatapb.TableMaterializeSettings{
// {
// TargetTable: "table1",
// SourceExpression: fmt.Sprintf("select * from %s", "table1"),
// },
// {
// TargetTable: "table2",
// SourceExpression: fmt.Sprintf("select * from %s", "table2"),
// },
// },
// }, sourceShards, targetShards)

// res, err := te.ws.VDiffShow(ctx, &vtctldatapb.VDiffShowRequest{
// Workflow: workflow,
// TargetKeyspace: targetKeyspace,
// Arg: "last",
// })
// assert.NoError(t, err)
// // Expect 1 key-value pair in tablet responses as there's only a single shard.
// require.Len(t, res.TabletResponses, 1)

// // As there's no VDiff created for the workflow, VDiff UUID should be empty.
// status := res.TabletResponses["-"]
// assert.Empty(t, status.VdiffUuid)
// // copyStateResult := sqltypes.MakeTestResult(
// // sqltypes.MakeTestFields("vrepl_id|table_name|lastpk", "int64|varchar|int64"),
// // )
// te.tmc.expectVRQuery(200, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{})

// res2, err := te.ws.VDiffCreate(ctx, &vtctldatapb.VDiffCreateRequest{
// Workflow: workflow,
// TargetKeyspace: targetKeyspace,
// })
// fmt.Println(res2, err)
// require.NoError(t, err)
// assert.NotNil(t, res2)
// uuid := res2.UUID

// // te.tmc.expectVRQuery(200, "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", &sqltypes.Result{})
// res, err = te.ws.VDiffShow(ctx, &vtctldatapb.VDiffShowRequest{
// Workflow: workflow,
// TargetKeyspace: targetKeyspace,
// Arg: uuid,
// })
// fmt.Println(res, err)
// }

func TestLookupVindexExternalize(t *testing.T) {
ctx := context.Background()

sourceKeyspace := "source_keyspace"
targetKeyspace := "target_keyspace"
workflow := "test_workflow"

sourceShards := []string{"-"}
targetShards := []string{"-"}

te := newTestMaterializerEnv(t, ctx, &vtctldatapb.MaterializeSettings{
SourceKeyspace: sourceKeyspace,
TargetKeyspace: targetKeyspace,
Workflow: workflow,
}, sourceShards, targetShards)

vindex := &vschemapb.Vindex{
Type: "unicode_loose_xxhash",
Params: map[string]string{
"write_only": "",
},
}
err := te.ws.ts.SaveVSchema(ctx, sourceKeyspace, &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
workflow: vindex,
},
})
require.NoError(t, err)

res, err := te.ws.LookupVindexExternalize(ctx, &vtctldatapb.LookupVindexExternalizeRequest{
Keyspace: sourceKeyspace,
TableKeyspace: targetKeyspace,
Name: workflow,
})
assert.NoError(t, err)
assert.False(t, res.WorkflowDeleted, "WorkflowDeleted should be false as there's no vindex owner.")

vindex.Owner = "owner"
err = te.ws.ts.SaveVSchema(ctx, sourceKeyspace, &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
workflow: vindex,
},
})
require.NoError(t, err)

te.tmc.expectVRQuery(100, "delete from _vt.vreplication where db_name = 'vt_source_keyspace' and workflow = 'test_workflow_reverse'", &sqltypes.Result{})
te.tmc.expectFetchAsAllPrivsQuery(100, "optimize table _vt.copy_state", &sqltypes.Result{})
te.tmc.expectFetchAsAllPrivsQuery(100, "alter table _vt.copy_state auto_increment = 1", &sqltypes.Result{})
te.tmc.expectFetchAsAllPrivsQuery(200, "optimize table _vt.copy_state", &sqltypes.Result{})
te.tmc.expectFetchAsAllPrivsQuery(200, "alter table _vt.copy_state auto_increment = 1", &sqltypes.Result{})

res, err = te.ws.LookupVindexExternalize(ctx, &vtctldatapb.LookupVindexExternalizeRequest{
Keyspace: sourceKeyspace,
TableKeyspace: targetKeyspace,
Name: workflow,
})
assert.NoError(t, err)
assert.True(t, res.WorkflowDeleted, "WorkflowDeleted should be true as there's a vindex owner.")
}

0 comments on commit b22f3a5

Please sign in to comment.