Skip to content

Commit

Permalink
Correct logic
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 16, 2024
1 parent e3160a9 commit 3caa747
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 34 deletions.
36 changes: 20 additions & 16 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -61,7 +60,6 @@ var (
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = throttlerapp.VStreamerName
targetThrottlerAppName = throttlerapp.VPlayerName
testedPermissionChecks atomic.Bool
)

const (
Expand Down Expand Up @@ -801,10 +799,10 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
var shardNames []string
for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces["product"].Shards) {
for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) {
shardNames = append(shardNames, shardName)
}
testPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)
testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow)
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -1597,12 +1595,12 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}

// testPermissionsChecks confirms that for the SwitchTraffic command, the necessary
// permissions are checked properly.
func testPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) {
// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the
// necessary permissions are checked properly on the source keyspace's primary tablets.
// This ensures that we can create and manage the reverse vreplication workflow.
func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) {
applyPrivileges := func(query string) {
for _, shard := range sourceShards {
t.Logf("Applying privileges %q on shard %s/%s", query, sourceKeyspace, shard)
primary := vc.getPrimaryTablet(t, sourceKeyspace, shard)
_, err := primary.QueryTablet(query, primary.Keyspace, false)
require.NoError(t, err)
Expand All @@ -1611,37 +1609,43 @@ func testPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sou
runDryRunCmd := func(expectErr bool) {
_, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace,
"SwitchTraffic", "--tablet-types=primary", "--dry-run")
require.True(t, ((err == nil) != expectErr), "expected error: %t, got: %v", expectErr, err)
require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err)
}

defer func() {
// Put global privs back in place.
applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost")
}()

t.Run("test switch traffic permission checks", func(t *testing.T) {
t.Run("test without global privileges", func(t *testing.T) {
applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost")
runDryRunCmd(true)
})

t.Run("test with db level privileges", func(t *testing.T) {
applyPrivileges("grant select,insert,update,delete on _vt.* to vt_filtered@localhost")
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global or db level privileges", func(t *testing.T) {
applyPrivileges("revoke select,insert,update,delete on _vt.* from vt_filtered@localhost")
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})

t.Run("test with table level privileges", func(t *testing.T) {
applyPrivileges("grant select,insert,update,delete on _vt.vreplication to vt_filtered@localhost")
applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(false)
})

t.Run("test without global, db, or table level privileges", func(t *testing.T) {
applyPrivileges("revoke select,insert,update,delete on _vt.vreplication from vt_filtered@localhost")
applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost",
sidecarDBIdentifier))
runDryRunCmd(true)
})

// Put global privs back in place.
applyPrivileges("grant select,insert,update,delete on *.* from vt_filtered@localhost")
})
}

Expand Down
48 changes: 47 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sets"
"vitess.io/vitess/go/sqlescape"
Expand Down Expand Up @@ -3395,8 +3396,16 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}

if req.EnableReverseReplication {
// Does the source keyspace have tablets that are able to manage
// the reverse workflow?
if err := s.validatePrimaryTabletsHaveRequiredVReplicationPermissions(ctx, ts.SourceKeyspaceName(), ts.SourceShards()); err != nil {
return handleError(fmt.Sprintf("primary tablets are not able to manage reverse vreplication stream in the %s keyspace",
ts.SourceKeyspaceName()), err)
}
// Does the target keyspace have tablets available to stream from
// for the reverse workflow?
if err := areTabletsAvailableToStreamFrom(ctx, req, ts, ts.TargetKeyspaceName(), ts.TargetShards()); err != nil {
return handleError(fmt.Sprintf("no tablets were available to stream from in the %s keyspace", ts.SourceKeyspaceName()), err)
return handleError(fmt.Sprintf("no tablets were available to stream from in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}

Expand Down Expand Up @@ -4305,6 +4314,43 @@ func (s *Server) mirrorTraffic(ctx context.Context, req *vtctldatapb.WorkflowMir
return nil
}

// validatePrimaryTabletsHaveRequiredVReplicationPermissions checks that all primary tablets
// in the given keyspace shards have the required permissions necessary to perform actions on
// the workflow record.
func (s *Server) validatePrimaryTabletsHaveRequiredVReplicationPermissions(ctx context.Context, keyspace string, shards []*topo.ShardInfo) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
for _, shard := range shards {
primary := shard.PrimaryAlias
if primary == nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "%s/%s shard does not have a primary tablet", keyspace, shard.ShardName())
}
wg.Add(1)
go func() {
defer wg.Done()
tablet, err := s.ts.GetTablet(ctx, primary)
if err != nil {
allErrors.RecordError(vterrors.Wrapf(err, "failed to get primary tablet for %s/%s shard", keyspace, shard.ShardName()))
}
// Ensure the tablet has the minimum privileges required on the sidecar database
// table in order to manage the workflow.
res, err := s.tmc.ValidateVReplicationPermissions(ctx, tablet.Tablet, nil)
if err != nil {
allErrors.RecordError(vterrors.Wrapf(err, "failed to validate required vreplication metadata permissions on tablet %s", topoproto.TabletAliasString(tablet.Alias)))
}
if !res.GetOk() {
allErrors.RecordError(fmt.Errorf("user %s does not have the required set of permissions (select,insert,update,delete) on the %s.vreplication table on tablet %s",
res.GetUser(), sidecar.GetIdentifier(), topoproto.TabletAliasString(tablet.Alias)))
}
}()
}
wg.Wait()
if allErrors.HasErrors() {
return allErrors.Error()
}
return nil
}

func (s *Server) Logger() logutil.Logger {
if s.options.logger == nil {
s.options.logger = logutil.NewConsoleLogger() // Use default system logger
Expand Down
17 changes: 0 additions & 17 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sets"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -666,22 +665,6 @@ func areTabletsAvailableToStreamFrom(ctx context.Context, req *vtctldatapb.Workf
allErrors.RecordError(fmt.Errorf("no tablet found to source data in keyspace %s, shard %s", keyspace, shard.ShardName()))
return
}
// Ensure the tablet has the minimum privileges required on the sidecar database
// table in order to manage the reverse workflow as part of the traffic switch.
for _, tablet := range tablets {
wg.Add(1)
go func() {
defer wg.Done()
res, err := ts.ws.tmc.ValidateVReplicationPermissions(ctx, tablet.Tablet, nil)
if err != nil {
allErrors.RecordError(vterrors.Wrapf(err, "failed to validate required vreplication metadata permissions on tablet %s", topoproto.TabletAliasString(tablet.Alias)))
}
if !res.GetOk() {
allErrors.RecordError(fmt.Errorf("user %s does not have the required set of permissions (select,insert,update,delete) on the %s.vreplication table on tablet %s",
res.GetUser(), sidecar.GetIdentifier(), topoproto.TabletAliasString(tablet.Alias)))
}
}()
}
}(cells, keyspace, shard)
}

Expand Down

0 comments on commit 3caa747

Please sign in to comment.