Skip to content

Commit

Permalink
LookupVindex: Implement internalize command for lookup vindexes (#1…
Browse files Browse the repository at this point in the history
…7429)

Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 authored Jan 22, 2025
1 parent d1aa2f4 commit 5363f03
Show file tree
Hide file tree
Showing 25 changed files with 6,748 additions and 3,047 deletions.
100 changes: 96 additions & 4 deletions go/cmd/vtctldclient/command/vreplication/lookupvindex/lookupvindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ var (

externalizeOptions = struct {
Keyspace string
Delete bool
}{}

internalizeOptions = struct {
Keyspace string
}{}

completeOptions = struct {
Keyspace string
}{}

parseAndValidateCreate = func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -142,6 +151,18 @@ var (
RunE: commandCancel,
}

// complete makes a LookupVindexComplete call to a vtctld.
complete = &cobra.Command{
Use: "complete",
Short: "Complete the LookupVindex workflow. The Vindex must have been previously externalized. If you want to delete the workflow without externalizing the Vindex then use the cancel command instead.",
Example: `vtctldclient --server localhost:15999 LookupVindex --name corder_lookup_vdx --table-keyspace customer complete`,
SilenceUsage: true,
DisableFlagsInUseLine: true,
Aliases: []string{"Complete"},
Args: cobra.NoArgs,
RunE: commandComplete,
}

// create makes a LookupVindexCreate call to a vtctld.
create = &cobra.Command{
Use: "create",
Expand All @@ -158,7 +179,7 @@ var (
// externalize makes a LookupVindexExternalize call to a vtctld.
externalize = &cobra.Command{
Use: "externalize",
Short: "Externalize the Lookup Vindex. If the Vindex has an owner the VReplication workflow will also be deleted.",
Short: "Externalize the Lookup Vindex. If the Vindex has an owner the VReplication workflow will also be stopped/deleted.",
Example: `vtctldclient --server localhost:15999 LookupVindex --name corder_lookup_vdx --table-keyspace customer externalize`,
SilenceUsage: true,
DisableFlagsInUseLine: true,
Expand All @@ -167,6 +188,18 @@ var (
RunE: commandExternalize,
}

// internalize makes a LookupVindexInternalize call to a vtctld.
internalize = &cobra.Command{
Use: "internalize",
Short: "Internalize the Vindex again to continue the backfill, making it unusable for queries again.",
Example: `vtctldclient --server localhost:15999 LookupVindex --name corder_lookup_vdx --table-keyspace customer internalize`,
SilenceUsage: true,
DisableFlagsInUseLine: true,
Aliases: []string{"Internalize"},
Args: cobra.NoArgs,
RunE: commandInternalize,
}

// show makes a GetWorkflows call to a vtctld.
show = &cobra.Command{
Use: "show",
Expand Down Expand Up @@ -199,6 +232,30 @@ func commandCancel(cmd *cobra.Command, args []string) error {
return nil
}

func commandComplete(cmd *cobra.Command, args []string) error {
if completeOptions.Keyspace == "" {
completeOptions.Keyspace = baseOptions.TableKeyspace
}
cli.FinishedParsing(cmd)

_, err := common.GetClient().LookupVindexComplete(common.GetCommandCtx(), &vtctldatapb.LookupVindexCompleteRequest{
Keyspace: completeOptions.Keyspace,
// The name of the workflow and lookup vindex.
Name: baseOptions.Name,
// Where the lookup table and VReplication workflow were created.
TableKeyspace: baseOptions.TableKeyspace,
})

if err != nil {
return err
}

output := fmt.Sprintf("LookupVindex %s has been completed and the VReplication workflow has been deleted.", baseOptions.Name)
fmt.Println(output)

return nil
}

func commandCreate(cmd *cobra.Command, args []string) error {
tsp := common.GetTabletSelectionPreference(cmd)
cli.FinishedParsing(cmd)
Expand Down Expand Up @@ -236,21 +293,49 @@ func commandExternalize(cmd *cobra.Command, args []string) error {
Name: baseOptions.Name,
// Where the lookup table and VReplication workflow were created.
TableKeyspace: baseOptions.TableKeyspace,
// Delete the workflow after externalizing, instead of stopping.
DeleteWorkflow: externalizeOptions.Delete,
})

if err != nil {
return err
}

output := fmt.Sprintf("LookupVindex %s has been externalized", baseOptions.Name)
if resp.WorkflowDeleted {
output = output + fmt.Sprintf(" and the %s VReplication workflow has been deleted", baseOptions.Name)
if resp.WorkflowStopped {
output = output + " and the VReplication workflow has been stopped."
} else if resp.WorkflowDeleted {
output = output + " and the VReplication workflow has been deleted."
}
fmt.Println(output)

return nil
}

func commandInternalize(cmd *cobra.Command, args []string) error {
if internalizeOptions.Keyspace == "" {
internalizeOptions.Keyspace = baseOptions.TableKeyspace
}
cli.FinishedParsing(cmd)

_, err := common.GetClient().LookupVindexInternalize(common.GetCommandCtx(), &vtctldatapb.LookupVindexInternalizeRequest{
Keyspace: internalizeOptions.Keyspace,
// The name of the workflow and lookup vindex.
Name: baseOptions.Name,
// Where the lookup table and VReplication workflow were created.
TableKeyspace: baseOptions.TableKeyspace,
})

if err != nil {
return err
}

output := fmt.Sprintf("LookupVindex %s has been internalized and the VReplication workflow has been started.", baseOptions.Name)
fmt.Println(output)

return nil
}

func commandShow(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

Expand Down Expand Up @@ -304,12 +389,19 @@ func registerCommands(root *cobra.Command) {
// for the VReplication workflow used.
base.AddCommand(show)

// This will also delete the VReplication workflow if the
// This will also stop the VReplication workflow if the
// vindex has an owner as the lookup vindex will then be
// managed by VTGate.
externalize.Flags().StringVar(&externalizeOptions.Keyspace, "keyspace", "", "The keyspace containing the Lookup Vindex. If no value is specified then the table-keyspace will be used.")
externalize.Flags().BoolVar(&externalizeOptions.Delete, "delete", false, "Delete the VReplication workflow after externalizing the Vindex, instead of stopping (default false).")
base.AddCommand(externalize)

internalize.Flags().StringVar(&internalizeOptions.Keyspace, "keyspace", "", "The keyspace containing the Lookup Vindex. If no value is specified then the table-keyspace will be used.")
base.AddCommand(internalize)

complete.Flags().StringVar(&completeOptions.Keyspace, "keyspace", "", "The keyspace containing the Lookup Vindex. If no value is specified then the table-keyspace will be used.")
base.AddCommand(complete)

// The cancel command deletes the VReplication workflow used
// to backfill the lookup vindex. It ends up making a
// WorkflowDelete VtctldServer call.
Expand Down
49 changes: 44 additions & 5 deletions go/test/endtoend/vreplication/lookup_vindex_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ func (lv *lookupVindex) create() {
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex create: %v", err)
waitForWorkflowState(lv.t, vc, fmt.Sprintf("%s.%s", lv.ownerTableKeyspace, lv.name), binlogdatapb.VReplicationWorkflowState_Running.String())
lv.expectWriteOnly(true)
lv.expectParamsAndOwner(true)
}

func (lv *lookupVindex) cancel() {
panic("not implemented")
args := []string{
"LookupVindex",
"--name", lv.name,
"--table-keyspace=" + lv.ownerTableKeyspace,
"cancel",
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex complete: %v", err)
}

func (lv *lookupVindex) externalize() {
Expand All @@ -83,20 +90,52 @@ func (lv *lookupVindex) externalize() {
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex externalize: %v", err)
lv.expectWriteOnly(false)
lv.expectParamsAndOwner(false)
}

func (lv *lookupVindex) internalize() {
args := []string{
"LookupVindex",
"--name", lv.name,
"--table-keyspace=" + lv.ownerTableKeyspace,
"internalize",
"--keyspace=" + lv.tableKeyspace,
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex internalize: %v", err)
lv.expectParamsAndOwner(true)
}

func (lv *lookupVindex) complete() {
args := []string{
"LookupVindex",
"--name", lv.name,
"--table-keyspace=" + lv.ownerTableKeyspace,
"complete",
"--keyspace=" + lv.tableKeyspace,
}
err := vc.VtctldClient.ExecuteCommand(args...)
require.NoError(lv.t, err, "error executing LookupVindex complete: %v", err)
lv.expectParamsAndOwner(false)
}

func (lv *lookupVindex) show() error {
return nil
}

func (lv *lookupVindex) expectWriteOnly(expected bool) {
func (lv *lookupVindex) expectParamsAndOwner(expectedWriteOnlyParam bool) {
vschema, err := vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", lv.ownerTableKeyspace)
require.NoError(lv.t, err, "error executing GetVSchema: %v", err)
vdx := gjson.Get(vschema, fmt.Sprintf("vindexes.%s", lv.name))
require.NotNil(lv.t, vdx, "lookup vindex %s not found", lv.name)

expectedOwner, expectedFrom, expectedTo := lv.ownerTable, strings.Join(lv.columns, ","), "keyspace_id"
require.Equal(lv.t, expectedOwner, vdx.Get("owner").String(), "expected 'owner' parameter to be %s", expectedOwner)
require.Equal(lv.t, expectedFrom, vdx.Get("params.from").String(), "expected 'from' parameter to be %s", expectedFrom)
require.Equal(lv.t, expectedTo, vdx.Get("params.to").String(), "expected 'to' parameter to be %s", expectedTo)

want := ""
if expected {
if expectedWriteOnlyParam {
want = "true"
}
require.Equal(lv.t, want, vdx.Get("params.write_only").String(), "expected write_only parameter to be %s", want)
Expand Down
Loading

0 comments on commit 5363f03

Please sign in to comment.