From 57fbfebd23d49213214ab255584dfa73f7c63a30 Mon Sep 17 00:00:00 2001 From: Anna Lushnikova Date: Fri, 20 Sep 2024 15:48:33 -0400 Subject: [PATCH] [databases]: add support for Kafka advanced configuration (#727) * [databases]: add support for Kafka advanced configuration * update some data types in Kafka config to safely store their max values --- databases.go | 60 +++++++++++++++++++++++++++ databases_test.go | 102 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 160 insertions(+), 2 deletions(-) diff --git a/databases.go b/databases.go index c0c91e2..e168186 100644 --- a/databases.go +++ b/databases.go @@ -3,6 +3,7 @@ package godo import ( "context" "fmt" + "math/big" "net/http" "strings" "time" @@ -153,10 +154,12 @@ type DatabasesService interface { GetRedisConfig(context.Context, string) (*RedisConfig, *Response, error) GetMySQLConfig(context.Context, string) (*MySQLConfig, *Response, error) GetMongoDBConfig(context.Context, string) (*MongoDBConfig, *Response, error) + GetKafkaConfig(context.Context, string) (*KafkaConfig, *Response, error) UpdatePostgreSQLConfig(context.Context, string, *PostgreSQLConfig) (*Response, error) UpdateRedisConfig(context.Context, string, *RedisConfig) (*Response, error) UpdateMySQLConfig(context.Context, string, *MySQLConfig) (*Response, error) UpdateMongoDBConfig(context.Context, string, *MongoDBConfig) (*Response, error) + UpdateKafkaConfig(context.Context, string, *KafkaConfig) (*Response, error) ListOptions(todo context.Context) (*DatabaseOptions, *Response, error) UpgradeMajorVersion(context.Context, string, *UpgradeVersionRequest) (*Response, error) ListTopics(context.Context, string, *ListOptions) ([]DatabaseTopic, *Response, error) @@ -659,6 +662,27 @@ type MongoDBConfig struct { Verbosity *int `json:"verbosity,omitempty"` } +// KafkaConfig holds advanced configurations for Kafka database clusters. +type KafkaConfig struct { + GroupInitialRebalanceDelayMs *int `json:"group_initial_rebalance_delay_ms,omitempty"` + GroupMinSessionTimeoutMs *int `json:"group_min_session_timeout_ms,omitempty"` + GroupMaxSessionTimeoutMs *int `json:"group_max_session_timeout_ms,omitempty"` + MessageMaxBytes *int `json:"message_max_bytes,omitempty"` + LogCleanerDeleteRetentionMs *int64 `json:"log_cleaner_delete_retention_ms,omitempty"` + LogCleanerMinCompactionLagMs *uint64 `json:"log_cleaner_min_compaction_lag_ms,omitempty"` + LogFlushIntervalMs *uint64 `json:"log_flush_interval_ms,omitempty"` + LogIndexIntervalBytes *int `json:"log_index_interval_bytes,omitempty"` + LogMessageDownconversionEnable *bool `json:"log_message_downconversion_enable,omitempty"` + LogMessageTimestampDifferenceMaxMs *uint64 `json:"log_message_timestamp_difference_max_ms,omitempty"` + LogPreallocate *bool `json:"log_preallocate,omitempty"` + LogRetentionBytes *big.Int `json:"log_retention_bytes,omitempty"` + LogRetentionHours *int `json:"log_retention_hours,omitempty"` + LogRetentionMs *big.Int `json:"log_retention_ms,omitempty"` + LogRollJitterMs *uint64 `json:"log_roll_jitter_ms,omitempty"` + LogSegmentDeleteDelayMs *int `json:"log_segment_delete_delay_ms,omitempty"` + AutoCreateTopicsEnable *bool `json:"auto_create_topics_enable,omitempty"` +} + type databaseUserRoot struct { User *DatabaseUser `json:"user"` } @@ -703,6 +727,10 @@ type databaseMongoDBConfigRoot struct { Config *MongoDBConfig `json:"config"` } +type databaseKafkaConfigRoot struct { + Config *KafkaConfig `json:"config"` +} + type databaseBackupsRoot struct { Backups []DatabaseBackup `json:"backups"` } @@ -1546,6 +1574,38 @@ func (svc *DatabasesServiceOp) UpdateMongoDBConfig(ctx context.Context, database return resp, nil } +// GetKafkaConfig retrieves the config for a Kafka database cluster. +func (svc *DatabasesServiceOp) GetKafkaConfig(ctx context.Context, databaseID string) (*KafkaConfig, *Response, error) { + path := fmt.Sprintf(databaseConfigPath, databaseID) + req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil) + if err != nil { + return nil, nil, err + } + root := new(databaseKafkaConfigRoot) + resp, err := svc.client.Do(ctx, req, root) + if err != nil { + return nil, resp, err + } + return root.Config, resp, nil +} + +// UpdateKafkaConfig updates the config for a Kafka database cluster. +func (svc *DatabasesServiceOp) UpdateKafkaConfig(ctx context.Context, databaseID string, config *KafkaConfig) (*Response, error) { + path := fmt.Sprintf(databaseConfigPath, databaseID) + root := &databaseKafkaConfigRoot{ + Config: config, + } + req, err := svc.client.NewRequest(ctx, http.MethodPatch, path, root) + if err != nil { + return nil, err + } + resp, err := svc.client.Do(ctx, req, nil) + if err != nil { + return resp, err + } + return resp, nil +} + // ListOptions gets the database options available. func (svc *DatabasesServiceOp) ListOptions(ctx context.Context) (*DatabaseOptions, *Response, error) { root := new(databaseOptionsRoot) diff --git a/databases_test.go b/databases_test.go index 2721b9d..c5a3c07 100644 --- a/databases_test.go +++ b/databases_test.go @@ -3,6 +3,7 @@ package godo import ( "encoding/json" "fmt" + "math/big" "net/http" "testing" "time" @@ -3008,8 +3009,7 @@ func TestDatabases_GetConfigMongoDB(t *testing.T) { "slow_op_threshold_ms": 100, "verbosity": 0 } -} -` +}` mongoDBConfig = MongoDBConfig{ DefaultReadConcern: PtrTo("LOCAL"), @@ -3064,6 +3064,104 @@ func TestDatabases_UpdateConfigMongoDB(t *testing.T) { require.NoError(t, err) } +func TestDatabases_GetConfigKafka(t *testing.T) { + setup() + defer teardown() + + var ( + dbSvc = client.Databases + dbID = "da4e0206-d019-41d7-b51f-deadbeefbb8f" + path = fmt.Sprintf("/v2/databases/%s/config", dbID) + + kafkaConfigJSON = `{ + "config": { + "group_initial_rebalance_delay_ms": 3000, + "group_min_session_timeout_ms": 6000, + "group_max_session_timeout_ms": 1800000, + "message_max_bytes": 1048588, + "log_cleaner_delete_retention_ms": 86400000, + "log_cleaner_min_compaction_lag_ms": 0, + "log_flush_interval_ms": 60000, + "log_index_interval_bytes": 4096, + "log_message_downconversion_enable": true, + "log_message_timestamp_difference_max_ms": 120000, + "log_preallocate": false, + "log_retention_bytes": -1, + "log_retention_hours": 168, + "log_retention_ms": 604800000, + "log_roll_jitter_ms": 0, + "log_segment_delete_delay_ms": 60000, + "auto_create_topics_enable": true + } +}` + + kafkaConfig = KafkaConfig{ + GroupInitialRebalanceDelayMs: PtrTo(3000), + GroupMinSessionTimeoutMs: PtrTo(6000), + GroupMaxSessionTimeoutMs: PtrTo(1800000), + MessageMaxBytes: PtrTo(1048588), + LogCleanerDeleteRetentionMs: PtrTo(int64(86400000)), + LogCleanerMinCompactionLagMs: PtrTo(uint64(0)), + LogFlushIntervalMs: PtrTo(uint64(60000)), + LogIndexIntervalBytes: PtrTo(4096), + LogMessageDownconversionEnable: PtrTo(true), + LogMessageTimestampDifferenceMaxMs: PtrTo(uint64(120000)), + LogPreallocate: PtrTo(false), + LogRetentionBytes: big.NewInt(int64(-1)), + LogRetentionHours: PtrTo(168), + LogRetentionMs: big.NewInt(int64(604800000)), + LogRollJitterMs: PtrTo(uint64(0)), + LogSegmentDeleteDelayMs: PtrTo(60000), + AutoCreateTopicsEnable: PtrTo(true), + } + ) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, kafkaConfigJSON) + }) + + got, _, err := dbSvc.GetKafkaConfig(ctx, dbID) + require.NoError(t, err) + require.Equal(t, &kafkaConfig, got) +} + +func TestDatabases_UpdateConfigKafka(t *testing.T) { + setup() + defer teardown() + + var ( + dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" + path = fmt.Sprintf("/v2/databases/%s/config", dbID) + kafkaConfig = &KafkaConfig{ + GroupInitialRebalanceDelayMs: PtrTo(3000), + GroupMinSessionTimeoutMs: PtrTo(6000), + GroupMaxSessionTimeoutMs: PtrTo(1800000), + MessageMaxBytes: PtrTo(1048588), + LogCleanerDeleteRetentionMs: PtrTo(int64(86400000)), + LogCleanerMinCompactionLagMs: PtrTo(uint64(0)), + } + ) + + mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPatch) + + var b databaseKafkaConfigRoot + decoder := json.NewDecoder(r.Body) + err := decoder.Decode(&b) + require.NoError(t, err) + + assert.Equal(t, b.Config, kafkaConfig) + assert.Equal(t, uint64(0), *b.Config.LogCleanerMinCompactionLagMs, "pointers to zero value should be sent") + assert.Nil(t, b.Config.LogFlushIntervalMs, "excluded value should not be sent") + + w.WriteHeader(http.StatusNoContent) + }) + + _, err := client.Databases.UpdateKafkaConfig(ctx, dbID, kafkaConfig) + require.NoError(t, err) +} + func TestDatabases_UpgradeMajorVersion(t *testing.T) { setup() defer teardown()