Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into traffic_force
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 17, 2024
2 parents dfa8773 + 8816a2d commit da37c1a
Show file tree
Hide file tree
Showing 96 changed files with 17,745 additions and 14,758 deletions.
2 changes: 1 addition & 1 deletion examples/common/scripts/vtadmin-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ vtadmin \
--alsologtostderr \
--rbac \
--rbac-config="${script_dir}/../vtadmin/rbac.yaml" \
--cluster "id=${cluster_name},name=${cluster_name},discovery=staticfile,discovery-staticfile-path=${script_dir}/../vtadmin/discovery.json,tablet-fqdn-tmpl=http://{{ .Tablet.Hostname }}:15{{ .Tablet.Alias.Uid }}" \
--cluster "id=${cluster_name},name=${cluster_name},discovery=staticfile,discovery-staticfile-path=${script_dir}/../vtadmin/discovery.json,tablet-fqdn-tmpl=http://{{ .Tablet.Hostname }}:15{{ .Tablet.Alias.Uid }},schema-cache-default-expiration=1m" \
> "${log_dir}/vtadmin-api.out" 2>&1 &

vtadmin_api_pid=$!
Expand Down
29 changes: 0 additions & 29 deletions go/cmd/vtctldclient/command/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,6 @@ var (
RunE: commandExecuteMultiFetchAsDBA,
Aliases: []string{"ExecuteMultiFetchAsDba"},
}
// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "GetUnresolvedTransactions <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,
}
)

var executeFetchAsAppOptions = struct {
Expand Down Expand Up @@ -205,26 +198,6 @@ func commandExecuteMultiFetchAsDBA(cmd *cobra.Command, args []string) error {
return nil
}

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Printf("%s\n", data)
return nil
}

func init() {
ExecuteFetchAsApp.Flags().Int64Var(&executeFetchAsAppOptions.MaxRows, "max-rows", 10_000, "The maximum number of rows to fetch from the remote tablet.")
ExecuteFetchAsApp.Flags().BoolVar(&executeFetchAsAppOptions.UsePool, "use-pool", false, "Use the tablet connection pool instead of creating a fresh connection.")
Expand All @@ -242,6 +215,4 @@ func init() {
ExecuteMultiFetchAsDBA.Flags().BoolVar(&executeMultiFetchAsDBAOptions.ReloadSchema, "reload-schema", false, "Instructs the tablet to reload its schema after executing the query.")
ExecuteMultiFetchAsDBA.Flags().BoolVarP(&executeMultiFetchAsDBAOptions.JSON, "json", "j", false, "Output the results in JSON instead of a human-readable table.")
Root.AddCommand(ExecuteMultiFetchAsDBA)

Root.AddCommand(GetUnresolvedTransactions)
}
134 changes: 134 additions & 0 deletions go/cmd/vtctldclient/command/transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package command

import (
"fmt"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
querypb "vitess.io/vitess/go/vt/proto/query"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var (
DistributedTransaction = &cobra.Command{
Use: "DistributedTransaction <cmd>",
Short: "Perform commands on distributed transaction",
Args: cobra.MinimumNArgs(2),

DisableFlagsInUseLine: true,
}

// GetUnresolvedTransactions makes an GetUnresolvedTransactions gRPC call to a vtctld.
GetUnresolvedTransactions = &cobra.Command{
Use: "list <keyspace>",
Short: "Retrieves unresolved transactions for the given keyspace.",
Aliases: []string{"List"},
Args: cobra.ExactArgs(1),
RunE: commandGetUnresolvedTransactions,

DisableFlagsInUseLine: true,
}

// ConcludeTransaction makes a ConcludeTransaction gRPC call to a vtctld.
ConcludeTransaction = &cobra.Command{
Use: "conclude <dtid> [<keyspace/shard> ...]",
Short: "Concludes the unresolved transaction by rolling back the prepared transaction on each participating shard and removing the transaction metadata record.",
Aliases: []string{"Conclude"},
Args: cobra.MinimumNArgs(1),
RunE: commandConcludeTransaction,

DisableFlagsInUseLine: true,
}
)

type ConcludeTransactionOutput struct {
Dtid string `json:"dtid"`
Message string `json:"message"`
Error string `json:"error,omitempty"`
}

const (
concludeSuccess = "Successfully concluded the distributed transaction"
concludeFailure = "Failed to conclude the distributed transaction"
)

func commandGetUnresolvedTransactions(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

keyspace := cmd.Flags().Arg(0)
resp, err := client.GetUnresolvedTransactions(commandCtx,
&vtctldatapb.GetUnresolvedTransactionsRequest{
Keyspace: keyspace,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp.Transactions)
if err != nil {
return err
}
fmt.Println(string(data))
return nil
}

func commandConcludeTransaction(cmd *cobra.Command, args []string) error {
allArgs := cmd.Flags().Args()
shards, err := cli.ParseKeyspaceShards(allArgs[1:])
if err != nil {
return err
}
cli.FinishedParsing(cmd)

dtid := allArgs[0]
var participants []*querypb.Target
for _, shard := range shards {
participants = append(participants, &querypb.Target{
Keyspace: shard.Keyspace,
Shard: shard.Name,
})
}
output := ConcludeTransactionOutput{
Dtid: dtid,
Message: concludeSuccess,
}

_, err = client.ConcludeTransaction(commandCtx,
&vtctldatapb.ConcludeTransactionRequest{
Dtid: dtid,
Participants: participants,
})
if err != nil {
output.Message = concludeFailure
output.Error = err.Error()
}

data, _ := cli.MarshalJSON(output)
fmt.Println(string(data))

return err
}

func init() {
DistributedTransaction.AddCommand(GetUnresolvedTransactions)
DistributedTransaction.AddCommand(ConcludeTransaction)

Root.AddCommand(DistributedTransaction)
}
10 changes: 4 additions & 6 deletions go/cmd/vtctldclient/command/vreplication/common/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import (
"sort"
"strings"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -143,9 +142,8 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: workflowUpdateOptions.Workflow,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
State: state,
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
State: &state,
},
}

Expand Down
6 changes: 2 additions & 4 deletions go/cmd/vtctldclient/command/vreplication/workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

Expand Down Expand Up @@ -79,9 +78,8 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
TabletRequest: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: baseOptions.Workflow,
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
State: state,
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
State: &state,
},
}

Expand Down
25 changes: 11 additions & 14 deletions go/cmd/vtctldclient/command/vreplication/workflow/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"vitess.io/vitess/go/cmd/vtctldclient/cli"
"vitess.io/vitess/go/cmd/vtctldclient/command/vreplication/common"
"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/textutil"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -65,14 +66,14 @@ var (
}
changes = true
} else {
updateOptions.TabletTypes = []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}
updateOptions.TabletTypes = textutil.SimulatedNullTabletTypeSlice
}
if cmd.Flags().Lookup("on-ddl").Changed { // Validate the provided value
changes = true
if _, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; !ok {
return fmt.Errorf("invalid on-ddl value: %s", updateOptions.OnDDL)
}
} // Simulated NULL will need to be handled in command
}
if !changes {
return fmt.Errorf("no configuration options specified to update")
}
Expand All @@ -85,15 +86,6 @@ var (
func commandUpdate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

// We've already validated any provided value, if one WAS provided.
// Now we need to do the mapping from the string representation to
// the enum value.
onddl := int32(textutil.SimulatedNullInt) // Simulated NULL when no value provided
if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; ok {
onddl = val
}

// Simulated NULL when no value is provided.
tsp := tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN
if cmd.Flags().Lookup("tablet-types-in-order").Changed {
if updateOptions.TabletTypesInPreferenceOrder {
Expand All @@ -109,12 +101,17 @@ func commandUpdate(cmd *cobra.Command, args []string) error {
Workflow: baseOptions.Workflow,
Cells: updateOptions.Cells,
TabletTypes: updateOptions.TabletTypes,
TabletSelectionPreference: tsp,
OnDdl: binlogdatapb.OnDDLAction(onddl),
State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), // We don't allow changing this in the client command
TabletSelectionPreference: &tsp,
},
}

// We've already validated any provided value, if one WAS provided.
// Now we need to do the mapping from the string representation to
// the enum value.
if val, ok := binlogdatapb.OnDDLAction_value[strings.ToUpper(updateOptions.OnDDL)]; ok {
req.TabletRequest.OnDdl = ptr.Of(binlogdatapb.OnDDLAction(val))
}

resp, err := common.GetClient().WorkflowUpdate(common.GetCommandCtx(), req)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Available Commands:
DeleteShards Deletes the specified shards from the topology.
DeleteSrvVSchema Deletes the SrvVSchema object in the given cell.
DeleteTablets Deletes tablet(s) from the topology.
DistributedTransaction Perform commands on distributed transaction
EmergencyReparentShard Reparents the shard to the new primary. Assumes the old primary is dead and not responding.
ExecuteFetchAsApp Executes the given query as the App user on the remote tablet.
ExecuteFetchAsDBA Executes the given query as the DBA user on the remote tablet.
Expand Down Expand Up @@ -59,7 +60,6 @@ Available Commands:
GetTablets Looks up tablets according to filter criteria.
GetThrottlerStatus Get the throttler status for the given tablet.
GetTopologyPath Gets the value associated with the particular path (key) in the topology server.
GetUnresolvedTransactions Retrieves unresolved transactions for the given keyspace.
GetVSchema Prints a JSON representation of a keyspace's topo record.
GetWorkflows Gets all vreplication workflows (Reshard, MoveTables, etc) in the given keyspace.
LegacyVtctlCommand Invoke a legacy vtctlclient command. Flag parsing is best effort.
Expand Down
2 changes: 2 additions & 0 deletions go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type BinlogEvent interface {

// Timestamp returns the timestamp from the event header.
Timestamp() uint32
// ServerID returns the server ID from the event header.
ServerID() uint32

// Format returns a BinlogFormat struct based on the event data.
// This is only valid if IsFormatDescription() returns true.
Expand Down
4 changes: 4 additions & 0 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ func (ev filePosFakeEvent) Timestamp() uint32 {
return ev.timestamp
}

func (ev filePosFakeEvent) ServerID() uint32 {
return 1
}

func (ev filePosFakeEvent) Format() (BinlogFormat, error) {
return BinlogFormat{}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func TestRowsEvent(t *testing.T) {
require.True(t, reflect.DeepEqual(gotRows, rows), "NewRowsEvent().Rows() got Rows:\n%v\nexpected:\n%v", gotRows, rows)

assert.NotZero(t, event.Timestamp())
assert.NotZero(t, event.ServerID())
}

func TestHeartbeatEvent(t *testing.T) {
Expand All @@ -384,6 +385,7 @@ func TestHeartbeatEvent(t *testing.T) {
require.NotNil(t, event)
assert.True(t, event.IsHeartbeat())
assert.Zero(t, event.Timestamp())
assert.NotZero(t, event.ServerID())
}

func TestRotateRotateEvent(t *testing.T) {
Expand Down
10 changes: 8 additions & 2 deletions go/test/endtoend/onlineddl/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func CreateTempScript(t *testing.T, content string) (fileName string) {
func MysqlClientExecFile(t *testing.T, mysqlParams *mysql.ConnParams, testDataPath, testName string, fileName string) (output string) {
t.Helper()

errorFile, err := os.CreateTemp("", "onlineddl-test-")
require.NoError(t, err)
defer os.Remove(errorFile.Name())

bashPath, err := exec.LookPath("bash")
require.NoError(t, err)
mysqlPath, err := exec.LookPath("mysql")
Expand All @@ -55,13 +59,15 @@ func MysqlClientExecFile(t *testing.T, mysqlParams *mysql.ConnParams, testDataPa
if !filepath.IsAbs(fileName) {
filePath, _ = filepath.Abs(path.Join(testDataPath, testName, fileName))
}
bashCommand := fmt.Sprintf(`%s -u%s --socket=%s --database=%s -s -s < %s 2> /tmp/error.log`, mysqlPath, mysqlParams.Uname, mysqlParams.UnixSocket, mysqlParams.DbName, filePath)
bashCommand := fmt.Sprintf(`%s -u%s --socket=%s --database=%s -s -s < %s 2> %s`, mysqlPath, mysqlParams.Uname, mysqlParams.UnixSocket, mysqlParams.DbName, filePath, errorFile.Name())
cmd, err := exec.Command(
bashPath,
"-c",
bashCommand,
).Output()

require.NoError(t, err)
errorContent, readerr := os.ReadFile(errorFile.Name())
require.NoError(t, readerr)
require.NoError(t, err, "error details: %s", errorContent)
return string(cmd)
}
Loading

0 comments on commit da37c1a

Please sign in to comment.