Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Sep 18, 2024
1 parent 4bf97bd commit 111541c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 37 deletions.
36 changes: 31 additions & 5 deletions go/vt/vttablet/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,17 @@ import (
"time"
)

/*
This file contains the model for all the configuration parameters for VReplication workflows. It also provides methods to
initialize the default configuration and to override the default configuration with user-provided values. The overrides
are stored in the `config` sub-document of the `options` attribute in `_vt.vreplication` and merged with the defaults
when the workflow is initialized.
*/

// VReplicationConfig has the all the configuration parameters for VReplication workflows, both applicable on the
// target (vreplication)and the source (vstreamer) side.
type VReplicationConfig struct {
// Config parameters applicable to the target side (vreplication)
ExperimentalFlags int64
NetReadTimeout int
NetWriteTimeout int
Expand All @@ -41,26 +51,36 @@ type VReplicationConfig struct {
ParallelInsertWorkers int
TabletTypesStr string

// Config parameters applicable to the source side (vstreamer)
// The coresponding Override fields are used to determine if the user has provided a value for the parameter so
// that they can be sent in the VStreamer API calls to the source.
VStreamPacketSize int
VStreamPacketSizeOverride bool
VStreamDynamicPacketSize bool
VStreamDynamicPacketSizeOverride bool
VStreamBinlogRotationThreshold int64
VStreamBinlogRotationThresholdOverride bool

// Overrides is a map of user-provided configuration values that override the default configuration.
Overrides map[string]string
}

var configMutex sync.Mutex
var defaultVReplicationConfig *VReplicationConfig

// DefaultVReplicationConfig has the default values for VReplicationConfig initialized from the vttablet flags
// when the workflow is initialized.
var DefaultVReplicationConfig *VReplicationConfig

// GetVReplicationConfigDefaults returns the default VReplicationConfig. If `useCached` is true, it returns the previously
// loaded configuration. Otherwise it reloads the configuration from the vttablet flags. useCached is set to false
// when the vttablet flags are updated in unit tests.
func GetVReplicationConfigDefaults(useCached bool) *VReplicationConfig {
configMutex.Lock()
defer configMutex.Unlock()
if useCached && defaultVReplicationConfig != nil {
return defaultVReplicationConfig
if useCached && DefaultVReplicationConfig != nil {
return DefaultVReplicationConfig
}
defaultVReplicationConfig = &VReplicationConfig{
DefaultVReplicationConfig = &VReplicationConfig{
ExperimentalFlags: vreplicationExperimentalFlags,
NetReadTimeout: vreplicationNetReadTimeout,
NetWriteTimeout: vreplicationNetWriteTimeout,
Expand All @@ -84,9 +104,10 @@ func GetVReplicationConfigDefaults(useCached bool) *VReplicationConfig {

Overrides: make(map[string]string),
}
return defaultVReplicationConfig
return DefaultVReplicationConfig
}

// InitVReplicationConfigDefaults initializes the default VReplicationConfig in an idempotent way.
func InitVReplicationConfigDefaults() *VReplicationConfig {
return GetVReplicationConfigDefaults(true)
}
Expand All @@ -99,6 +120,8 @@ func GetDefaultVReplicationConfig() *VReplicationConfig {
return c
}

// NewVReplicationConfig creates a new VReplicationConfig by merging the default configuration with the user-provided
// overrides. It returns an error if the user-provided values are invalid.
func NewVReplicationConfig(overrides map[string]string) (*VReplicationConfig, error) {
c := GetDefaultVReplicationConfig()
c.Overrides = maps.Clone(overrides)
Expand Down Expand Up @@ -229,6 +252,9 @@ func NewVReplicationConfig(overrides map[string]string) (*VReplicationConfig, er
return c, nil
}

// Map returns a map of the VReplicationConfig: the keys are the flag names and the values are string representations.
// Used in tests to compare the expected and actual configuration values and in validations to check if the user-provided
// keys are one of those that are supported.
func (c VReplicationConfig) Map() map[string]string {
return map[string]string{
"vreplication_experimental_flags": strconv.FormatInt(c.ExperimentalFlags, 10),
Expand Down
38 changes: 19 additions & 19 deletions go/vt/vttablet/common/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestNewVReplicationConfig(t *testing.T) {
VStreamPacketSize: 1024,
VStreamDynamicPacketSize: false,
VStreamBinlogRotationThreshold: 2048,
TabletTypesStr: defaultVReplicationConfig.TabletTypesStr,
TabletTypesStr: DefaultVReplicationConfig.TabletTypesStr,
VStreamPacketSizeOverride: true,
VStreamDynamicPacketSizeOverride: true,
VStreamBinlogRotationThresholdOverride: true,
Expand Down Expand Up @@ -101,28 +101,28 @@ func TestNewVReplicationConfig(t *testing.T) {
config: map[string]string{
"vreplication_experimental_flags": "5",
"vreplication_net_read_timeout": "150",
"vstream_dynamic_packet_size": strconv.FormatBool(!defaultVReplicationConfig.VStreamDynamicPacketSize),
"vreplication_store_compressed_gtid": strconv.FormatBool(!defaultVReplicationConfig.StoreCompressedGTID),
"vstream_dynamic_packet_size": strconv.FormatBool(!DefaultVReplicationConfig.VStreamDynamicPacketSize),
"vreplication_store_compressed_gtid": strconv.FormatBool(!DefaultVReplicationConfig.StoreCompressedGTID),
},
wantErr: 0,
want: &VReplicationConfig{
ExperimentalFlags: 5,
NetReadTimeout: 150,
NetWriteTimeout: defaultVReplicationConfig.NetWriteTimeout,
CopyPhaseDuration: defaultVReplicationConfig.CopyPhaseDuration,
RetryDelay: defaultVReplicationConfig.RetryDelay,
MaxTimeToRetryError: defaultVReplicationConfig.MaxTimeToRetryError,
RelayLogMaxSize: defaultVReplicationConfig.RelayLogMaxSize,
RelayLogMaxItems: defaultVReplicationConfig.RelayLogMaxItems,
ReplicaLagTolerance: defaultVReplicationConfig.ReplicaLagTolerance,
HeartbeatUpdateInterval: defaultVReplicationConfig.HeartbeatUpdateInterval,
StoreCompressedGTID: !defaultVReplicationConfig.StoreCompressedGTID,
ParallelInsertWorkers: defaultVReplicationConfig.ParallelInsertWorkers,
VStreamPacketSize: defaultVReplicationConfig.VStreamPacketSize,
VStreamDynamicPacketSize: !defaultVReplicationConfig.VStreamDynamicPacketSize,
VStreamBinlogRotationThreshold: defaultVReplicationConfig.VStreamBinlogRotationThreshold,
NetWriteTimeout: DefaultVReplicationConfig.NetWriteTimeout,
CopyPhaseDuration: DefaultVReplicationConfig.CopyPhaseDuration,
RetryDelay: DefaultVReplicationConfig.RetryDelay,
MaxTimeToRetryError: DefaultVReplicationConfig.MaxTimeToRetryError,
RelayLogMaxSize: DefaultVReplicationConfig.RelayLogMaxSize,
RelayLogMaxItems: DefaultVReplicationConfig.RelayLogMaxItems,
ReplicaLagTolerance: DefaultVReplicationConfig.ReplicaLagTolerance,
HeartbeatUpdateInterval: DefaultVReplicationConfig.HeartbeatUpdateInterval,
StoreCompressedGTID: !DefaultVReplicationConfig.StoreCompressedGTID,
ParallelInsertWorkers: DefaultVReplicationConfig.ParallelInsertWorkers,
VStreamPacketSize: DefaultVReplicationConfig.VStreamPacketSize,
VStreamDynamicPacketSize: !DefaultVReplicationConfig.VStreamDynamicPacketSize,
VStreamBinlogRotationThreshold: DefaultVReplicationConfig.VStreamBinlogRotationThreshold,
VStreamDynamicPacketSizeOverride: true,
TabletTypesStr: defaultVReplicationConfig.TabletTypesStr,
TabletTypesStr: DefaultVReplicationConfig.TabletTypesStr,
},
},
}
Expand All @@ -146,8 +146,8 @@ func TestNewVReplicationConfig(t *testing.T) {
}
}
if tt.want == nil {
require.EqualValuesf(t, defaultVReplicationConfig.Map(), got.Map(),
"NewVReplicationConfig() Map got = %v, want %v", got.Map(), defaultVReplicationConfig.Map())
require.EqualValuesf(t, DefaultVReplicationConfig.Map(), got.Map(),
"NewVReplicationConfig() Map got = %v, want %v", got.Map(), DefaultVReplicationConfig.Map())
} else {
tt.want.Overrides = tt.config
require.EqualValues(t, tt.want.Map(), got.Map(),
Expand Down
17 changes: 5 additions & 12 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
)

var (
// Default flags.
// Default flags: currently VReplicationExperimentalFlagVPlayerBatching is not enabled by default.
vreplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
vreplicationNetReadTimeout = 300
vreplicationNetWriteTimeout = 600
Expand All @@ -58,20 +58,13 @@ var (
VStreamerUseDynamicPacketSize = true
)

func GetVReplicationExperimentalFlags() int64 {
return vreplicationExperimentalFlags
}
func GetVReplicationNetReadTimeout() int {
return vreplicationNetReadTimeout
}
func GetVReplicationNetWriteTimeout() int {
return vreplicationNetWriteTimeout
}

func GetVReplicationCopyPhaseDuration() time.Duration {
return vreplicationCopyPhaseDuration
}

func init() {
servenv.OnParseFor("vttablet", registerFlags)
servenv.OnParseFor("vtcombo", registerFlags)
Expand All @@ -93,10 +86,10 @@ func registerFlags(fs *pflag.FlagSet) {

// vreplicationHeartbeatUpdateInterval determines how often the time_updated column is updated if there are no
// real events on the source and the source vstream is only sending heartbeats for this long. Keep this low if you
//expect high QPS and are monitoring this column to alert about potential outages. Keep this high if
// * you have too many streams the extra write qps or cpu load due to these updates are unacceptable
// * you have too many streams and/or a large source field (lot of participating tables) which generates
// unacceptable increase in your binlog size
// expect high QPS and are monitoring this column to alert about potential outages. Keep this high if
// * you have too many streams the extra write qps or cpu load due to these updates are unacceptable
// * you have too many streams and/or a large source field (lot of participating tables) which generates
// unacceptable increase in your binlog size
fs.IntVar(&vreplicationHeartbeatUpdateInterval, "vreplication_heartbeat_update_interval", vreplicationHeartbeatUpdateInterval, "Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling")
fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table")

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (tmc *fakeTMClient) VReplicationExec(ctx context.Context, tablet *topodatap
}
}
}
return nil, fmt.Errorf("****** query %q not found for tablet %d", query, tablet.Alias.Uid)
return nil, fmt.Errorf("query %q not found for tablet %d", query, tablet.Alias.Uid)
}

func (tmc *fakeTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) {
Expand Down

0 comments on commit 111541c

Please sign in to comment.