Skip to content

Commit

Permalink
Add conclude transaction command to vtctld service (#16693)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Sep 16, 2024
1 parent fd18ae4 commit 8b57e7c
Show file tree
Hide file tree
Showing 34 changed files with 7,562 additions and 4,976 deletions.
29 changes: 0 additions & 29 deletions go/cmd/vtctldclient/command/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ var (
RunE: commandExecuteMultiFetchAsDBA,
Aliases: []string{"ExecuteMultiFetchAsDba"},
}
// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "GetUnresolvedTransactions <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,
}
)

var executeFetchAsAppOptions = struct {
Expand Down Expand Up @@ -205,26 +198,6 @@ func commandExecuteMultiFetchAsDBA(cmd *cobra.Command, args []string) error {
return nil
}

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Printf("%s\n", data)
return nil
}

func init() {
ExecuteFetchAsApp.Flags().Int64Var(&executeFetchAsAppOptions.MaxRows, "max-rows", 10_000, "The maximum number of rows to fetch from the remote tablet.")
ExecuteFetchAsApp.Flags().BoolVar(&executeFetchAsAppOptions.UsePool, "use-pool", false, "Use the tablet connection pool instead of creating a fresh connection.")
Expand All @@ -242,6 +215,4 @@ func init() {
ExecuteMultiFetchAsDBA.Flags().BoolVar(&executeMultiFetchAsDBAOptions.ReloadSchema, "reload-schema", false, "Instructs the tablet to reload its schema after executing the query.")
ExecuteMultiFetchAsDBA.Flags().BoolVarP(&executeMultiFetchAsDBAOptions.JSON, "json", "j", false, "Output the results in JSON instead of a human-readable table.")
Root.AddCommand(ExecuteMultiFetchAsDBA)

Root.AddCommand(GetUnresolvedTransactions)
}
134 changes: 134 additions & 0 deletions go/cmd/vtctldclient/command/transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package command

import (
"fmt"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
querypb "vitess.io/vitess/go/vt/proto/query"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
DistributedTransaction = &cobra.Command{
Use: "DistributedTransaction <cmd>",
Short: "Perform commands on distributed transaction",
Args: cobra.MinimumNArgs(2),

DisableFlagsInUseLine: true,
}

// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "list <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Aliases: []string{"List"},
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,

DisableFlagsInUseLine: true,
}

// ConcludeTransaction makes a ConcludeTransaction gRPC call to a vtctld.
ConcludeTransaction = &cobra.Command{
Use: "conclude <dtid> [<keyspace/shard> ...]",
Short: "Concludes the unresolved transaction by rolling back the prepared transaction on each participating shard and removing the transaction metadata record.",
Aliases: []string{"Conclude"},
Args: cobra.MinimumNArgs(1),
RunE: commandConcludeTransaction,

DisableFlagsInUseLine: true,
}
)

type ConcludeTransactionOutput struct {
Dtid string `json:"dtid"`
Message string `json:"message"`
Error string `json:"error,omitempty"`
}

const (
concludeSuccess = "Successfully concluded the distributed transaction"
concludeFailure = "Failed to conclude the distributed transaction"
)

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}

func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
allArgs := cmd.Flags().Args()
shards, err := cli.ParseKeyspaceShards(allArgs[1:])
if err != nil {
return err
}
cli.FinishedParsing(cmd)

dtid := allArgs[0]
var participants []*querypb.Target
for _, shard := range shards {
participants = append(participants, &querypb.Target{
Keyspace: shard.Keyspace,
Shard: shard.Name,
})
}
output := ConcludeTransactionOutput{
Dtid: dtid,
Message: concludeSuccess,
}

_, err = client.ConcludeTransaction(commandCtx,
&vtctldatapb.ConcludeTransactionRequest{
Dtid: dtid,
Participants: participants,
})
if err != nil {
output.Message = concludeFailure
output.Error = err.Error()
}

data, _ := cli.MarshalJSON(output)
fmt.Println(string(data))

return err
}

func init() {
DistributedTransaction.AddCommand(GetUnresolvedTransactions)
DistributedTransaction.AddCommand(ConcludeTransaction)

Root.AddCommand(DistributedTransaction)
}
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Available Commands:
DeleteShards Deletes the specified shards from the topology.
DeleteSrvVSchema Deletes the SrvVSchema object in the given cell.
DeleteTablets Deletes tablet(s) from the topology.
DistributedTransaction Perform commands on distributed transaction
EmergencyReparentShard Reparents the shard to the new primary. Assumes the old primary is dead and not responding.
ExecuteFetchAsApp Executes the given query as the App user on the remote tablet.
ExecuteFetchAsDBA Executes the given query as the DBA user on the remote tablet.
Expand Down Expand Up @@ -59,7 +60,6 @@ Available Commands:
GetTablets Looks up tablets according to filter criteria.
GetThrottlerStatus Get the throttler status for the given tablet.
GetTopologyPath Gets the value associated with the particular path (key) in the topology server.
GetUnresolvedTransactions Retrieves unresolved transactions for the given keyspace.
GetVSchema Prints a JSON representation of a keyspace's topo record.
GetWorkflows Gets all vreplication workflows (Reshard, MoveTables, etc) in the given keyspace.
LegacyVtctlCommand Invoke a legacy vtctlclient command. Flag parsing is best effort.
Expand Down
12 changes: 11 additions & 1 deletion go/test/endtoend/tabletmanager/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,19 @@ func TestTabletCommands(t *testing.T) {
})

t.Run("GetUnresolvedTransactions", func(t *testing.T) {
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetUnresolvedTransactions", keyspaceName)
_, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "list", keyspaceName)
require.NoError(t, err)
})
t.Run("ConcludeTransaction", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})
t.Run("ConcludeTransaction with participants", func(t *testing.T) {
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("DistributedTransaction", "conclude", "ks:0:1234", "ks/0")
assert.NoError(t, err)
assert.Contains(t, output, "Successfully concluded the distributed transaction")
})
// check Ping / RefreshState / RefreshStateByShard
err = clusterInstance.VtctldClientProcess.ExecuteCommand("PingTablet", primaryTablet.Alias)
require.Nil(t, err, "error should be Nil")
Expand Down
Loading

0 comments on commit 8b57e7c

Please sign in to comment.