Skip to content

Commit

Permalink
Replace remaining usage of VtctlProcess
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 2, 2025
1 parent d817b3c commit efdefd7
Show file tree
Hide file tree
Showing 17 changed files with 50 additions and 66 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/cellalias/cell_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ func TestMain(m *testing.M) {
if err != nil {
return 1, err
}
err = localCluster.VtctlProcess.AddCellInfo(cell2)
err = localCluster.VtctldClientProcess.AddCellInfo(cell2)
if err != nil {
return 1, err
}

vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory)
vtctldClientProcess := cluster.VtctldClientProcessInstance(localCluster.VtctldProcess.GrpcPort, localCluster.TopoPort, "localhost", localCluster.TmpDirectory)
_, err = vtctldClientProcess.ExecuteCommandWithOutput("CreateKeyspace", keyspaceName, "--durability-policy=semi_sync")
if err != nil {
return 1, err
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (vtctld *VtctldProcess) TearDown() error {
}
}

// VtctldProcessInstance returns a VtctlProcess handle for vtctl process
// VtctldProcessInstance returns a VtctldProcess handle
// configured with the given Config.
// The process must be manually started by calling setup()
func VtctldProcessInstance(httpPort int, grpcPort int, topoPort int, hostname string, tmpDirectory string) *VtctldProcess {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func initializeCluster(t *testing.T) (int, error) {
for _, keyspaceStr := range []string{keyspace} {
KeyspacePtr := &cluster.Keyspace{Name: keyspaceStr}
keyspace := *KeyspacePtr
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil {
if err := clusterInstance.VtctldClientProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil {
return 1, err
}
shard := &cluster.Shard{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,28 +139,28 @@ func TestSecureTransport(t *testing.T) {
}

// setup replication
var vtctlClientArgs []string
var vtctldClientArgs []string

vtctlClientTmArgs := append(vtctlClientArgs, tmclientExtraArgs("vttablet-client-1")...)
vtctldClientTmArgs := append(vtctldClientArgs, tmclientExtraArgs("vttablet-client-1")...)

// Reparenting
vtctlClientArgs = append(vtctlClientTmArgs, "InitShardPrimary", "--", "--force", "test_keyspace/0", primaryTablet.Alias)
err = clusterInstance.VtctlProcess.ExecuteCommand(vtctlClientArgs...)
vtctldClientArgs = append(vtctldClientTmArgs, "InitShardPrimary", "--", "--force", "test_keyspace/0", primaryTablet.Alias)
err = clusterInstance.VtctldClientProcess.ExecuteCommand(vtctldClientArgs...)
require.NoError(t, err)

err = clusterInstance.StartVTOrc("test_keyspace")
require.NoError(t, err)

// Apply schema
var vtctlApplySchemaArgs = append(vtctlClientTmArgs, "ApplySchema", "--", "--sql", createVtInsertTest, "test_keyspace")
err = clusterInstance.VtctlProcess.ExecuteCommand(vtctlApplySchemaArgs...)
var vtctlApplySchemaArgs = append(vtctldClientTmArgs, "ApplySchema", "--sql", createVtInsertTest, "test_keyspace")
err = clusterInstance.VtctldClientProcess.ExecuteCommand(vtctlApplySchemaArgs...)
require.NoError(t, err)

for _, tablet := range []cluster.Vttablet{primaryTablet, replicaTablet} {
var vtctlTabletArgs []string
vtctlTabletArgs = append(vtctlTabletArgs, tmclientExtraArgs("vttablet-client-1")...)
vtctlTabletArgs = append(vtctlTabletArgs, "RunHealthCheck", tablet.Alias)
_, err = clusterInstance.VtctlProcess.ExecuteCommandWithOutput(vtctlTabletArgs...)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(vtctlTabletArgs...)
require.NoError(t, err)
}

Expand Down Expand Up @@ -349,7 +349,7 @@ func clusterSetUp(t *testing.T) (int, error) {
for _, keyspaceStr := range []string{keyspace} {
KeyspacePtr := &cluster.Keyspace{Name: keyspaceStr}
keyspace := *KeyspacePtr
if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil {
if err := clusterInstance.VtctldClientProcess.CreateKeyspace(keyspace.Name, sidecar.DefaultName, ""); err != nil {
return 1, err
}
shard := &cluster.Shard{
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/messaging/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMessage(t *testing.T) {

utils.Exec(t, conn, fmt.Sprintf("use %s", lookupKeyspace))
utils.Exec(t, conn, createMessage)
clusterInstance.VtctlProcess.ExecuteCommand(fmt.Sprintf("ReloadSchemaKeyspace %s", lookupKeyspace))
clusterInstance.VtctldClientProcess.ExecuteCommand(fmt.Sprintf("ReloadSchemaKeyspace %s", lookupKeyspace))

defer utils.Exec(t, conn, "drop table vitess_message")

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/mysqlctl/mysqlctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
return 1
}

if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil {
if err := clusterInstance.VtctldClientProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil {
return 1
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/mysqlctld/mysqlctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestMain(m *testing.M) {
return 1
}

if err := clusterInstance.VtctlProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil {
if err := clusterInstance.VtctldClientProcess.CreateKeyspace(keyspaceName, sidecar.DefaultName, ""); err != nil {
return 1
}

Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/recovery/recovery_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tabl
tablet.ValidateTabletRestart(t)
replicaTabletArgs := commonTabletArg

_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("GetKeyspace", restoreKSName)
_, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("GetKeyspace", restoreKSName)

if restoreTime.IsZero() {
restoreTime = time.Now().UTC()
}

if err != nil {
_, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("CreateKeyspace", "--",
"--keyspace_type=SNAPSHOT", "--base_keyspace="+keyspaceName,
"--snapshot_time", restoreTime.Format(time.RFC3339), restoreKSName)
_, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("CreateKeyspace", restoreKSName,
"--type=SNAPSHOT", "--base-keyspace="+keyspaceName,
"--snapshot-timestamp", restoreTime.Format(time.RFC3339))
require.Nil(t, err)
}

Expand Down
10 changes: 2 additions & 8 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func setupCluster(ctx context.Context, t *testing.T, shardName string, cells []s
require.NoError(t, err, "Error managing topo")
numCell := 1
for numCell < len(cells) {
err = clusterInstance.VtctlProcess.AddCellInfo(cells[numCell])
err = clusterInstance.VtctldClientProcess.AddCellInfo(cells[numCell])
require.NoError(t, err, "Error managing topo")
numCell++
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func setupCluster(ctx context.Context, t *testing.T, shardName string, cells []s
}
}
if clusterInstance.VtctlMajorVersion >= 14 {
clusterInstance.VtctldClientProcess = *cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)
clusterInstance.VtctldClientProcess = *cluster.VtctldClientProcessInstance(clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TopoPort, "localhost", clusterInstance.TmpDirectory)
out, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", KeyspaceName, fmt.Sprintf("--durability-policy=%s", durability))
require.NoError(t, err, out)
}
Expand Down Expand Up @@ -406,12 +406,6 @@ func ErsIgnoreTablet(clusterInstance *cluster.LocalProcessCluster, tab *cluster.
return clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args...)
}

// ErsWithVtctl runs ERS via vtctl binary
func ErsWithVtctl(clusterInstance *cluster.LocalProcessCluster) (string, error) {
args := []string{"EmergencyReparentShard", "--", "--keyspace_shard", fmt.Sprintf("%s/%s", KeyspaceName, ShardName)}
return clusterInstance.VtctlProcess.ExecuteCommandWithOutput(args...)
}

// endregion

// region validations
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/topoconncache/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ func TestMain(m *testing.M) {
if err != nil {
return 1, err
}
err = clusterInstance.VtctlProcess.AddCellInfo(cell2)
err = clusterInstance.VtctldClientProcess.AddCellInfo(cell2)
if err != nil {
return 1, err
}

vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TmpDirectory)
vtctldClientProcess := cluster.VtctldClientProcessInstance(clusterInstance.VtctldProcess.GrpcPort, clusterInstance.TopoPort, "localhost", clusterInstance.TmpDirectory)
_, err = vtctldClientProcess.ExecuteCommandWithOutput("CreateKeyspace", keyspaceName, "--durability-policy=semi_sync")
if err != nil {
return 1, err
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/topoconncache/topo_conn_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ func deleteTablet(t *testing.T, tablet *cluster.Vttablet) {

func addCellback(t *testing.T) {
// creating new cell , with same name as previously deleted one but at a different root path.
clusterInstance.VtctlProcess.TopoRootPath = "/org1/obj1/"
clusterInstance.VtctldClientProcess.TopoRootPath = "/org1/obj1/"

err := clusterInstance.VtctlProcess.AddCellInfo(cell2)
err := clusterInstance.VtctldClientProcess.AddCellInfo(cell2)
require.NoError(t, err)

// create new vttablets
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestTopoRestart(t *testing.T) {
// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestShardLocking(t *testing.T) {
// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestTopoDownServingQuery(t *testing.T) {
// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestShardLocking(t *testing.T) {
// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestKeyspaceLocking(t *testing.T) {
// TestLockingWithTTL tests that locking with the TTL override works as intended.
func TestLockingWithTTL(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()
Expand All @@ -224,7 +224,7 @@ func TestLockingWithTTL(t *testing.T) {
// TestNamedLocking tests that named locking works as intended.
func TestNamedLocking(t *testing.T) {
// Create topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestTopoDownServingQuery(t *testing.T) {
// TestShardLocking tests that shard locking works as intended.
func TestShardLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a shard lock.
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestShardLocking(t *testing.T) {
// TestKeyspaceLocking tests that keyspace locking works as intended.
func TestKeyspaceLocking(t *testing.T) {
// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

// Acquire a keyspace lock.
Expand Down
30 changes: 10 additions & 20 deletions go/test/endtoend/vtgate/queries/reference/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,24 +155,14 @@ func TestMain(m *testing.M) {
}()

// Materialize zip_detail to sharded keyspace.
output, err := clusterInstance.VtctlProcess.ExecuteCommandWithOutput(
output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"Materialize",
"--",
"--tablet_types",
"PRIMARY",
`{
"workflow": "copy_zip_detail",
"source_keyspace": "`+unshardedKeyspaceName+`",
"target_keyspace": "`+shardedKeyspaceName+`",
"tablet_types": "PRIMARY",
"table_settings": [
{
"target_table": "zip_detail",
"source_expression": "select * from zip_detail",
"create_ddl": "copy"
}
]
}`,
"--workflow", "copy_zip_detail",
"--target-keyspace", shardedKeyspaceName,
"create",
"--source-keyspace", unshardedKeyspaceName,
"--table-settings", `'[{"target_table": "zip_detail", "source_expression": "select * from zip_detail", "create_ddl": "copy" }]'`,
"--tablet-types", "PRIMARY",
)
fmt.Fprintf(os.Stderr, "Output from materialize: %s\n", output)
if err != nil {
Expand Down Expand Up @@ -214,10 +204,10 @@ func TestMain(m *testing.M) {
}

// Stop materialize zip_detail to sharded keyspace.
err = clusterInstance.VtctlProcess.ExecuteCommand(
err = clusterInstance.VtctldClientProcess.ExecuteCommand(
"Workflow",
"--",
shardedKeyspaceName+".copy_zip_detail",
"--keyspace", shardedKeyspaceName,
"--workflow", "copy_zip_detail",
"delete",
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {

// Change the args such that they match how we would invoke VTOrc
os.Args = []string{"vtorc",
"--topo_global_server_address", clusterInfo.ClusterInstance.VtctlProcess.TopoGlobalAddress,
"--topo_implementation", clusterInfo.ClusterInstance.VtctlProcess.TopoImplementation,
"--topo_global_root", clusterInfo.ClusterInstance.VtctlProcess.TopoGlobalRoot,
"--topo_global_server_address", clusterInfo.ClusterInstance.VtctldClientProcess.TopoGlobalAddress,
"--topo_implementation", clusterInfo.ClusterInstance.VtctldClientProcess.TopoImplementation,
"--topo_global_root", clusterInfo.ClusterInstance.VtctldClientProcess.TopoGlobalRoot,
}
servenv.ParseFlags("vtorc")
config.SetInstancePollTime(1 * time.Second)
Expand Down
8 changes: 4 additions & 4 deletions go/test/endtoend/vtorc/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func CreateClusterAndStartTopo(cellInfos []*CellInfo) (*VTOrcClusterInfo, error)
if err != nil {
return nil, err
}
err = clusterInstance.VtctlProcess.AddCellInfo(Cell2)
err = clusterInstance.VtctldClientProcess.AddCellInfo(Cell2)
if err != nil {
return nil, err
}
Expand All @@ -101,7 +101,7 @@ func CreateClusterAndStartTopo(cellInfos []*CellInfo) (*VTOrcClusterInfo, error)
}

// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
return &VTOrcClusterInfo{
ClusterInstance: clusterInstance,
Ts: ts,
Expand Down Expand Up @@ -855,7 +855,7 @@ func SetupNewClusterSemiSync(t *testing.T) *VTOrcClusterInfo {
require.NoError(t, err, out)

// create topo server connection
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctlProcess.TopoGlobalAddress, clusterInstance.VtctlProcess.TopoGlobalRoot)
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)
clusterInfo := &VTOrcClusterInfo{
ClusterInstance: clusterInstance,
Expand Down Expand Up @@ -926,7 +926,7 @@ func AddSemiSyncKeyspace(t *testing.T, clusterInfo *VTOrcClusterInfo) {
require.NoError(t, err)
}

vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", clusterInfo.ClusterInstance.VtctldProcess.GrpcPort, clusterInfo.ClusterInstance.TmpDirectory)
vtctldClientProcess := cluster.VtctldClientProcessInstance(clusterInfo.ClusterInstance.VtctldProcess.GrpcPort, clusterInfo.ClusterInstance.TopoPort, "localhost", clusterInfo.ClusterInstance.TmpDirectory)
out, err := vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceSemiSyncName, "--durability-policy=semi_sync")
require.NoError(t, err, out)
}
Expand Down

0 comments on commit efdefd7

Please sign in to comment.