Skip to content

Commit

Permalink
[databases]: add support for Kafka advanced configuration (#727)
Browse files Browse the repository at this point in the history
* [databases]: add support for Kafka advanced configuration

* update some data types in Kafka config to safely store their max values
  • Loading branch information
loosla authored Sep 20, 2024
1 parent 8348cdd commit 57fbfeb
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 2 deletions.
60 changes: 60 additions & 0 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package godo
import (
"context"
"fmt"
"math/big"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -153,10 +154,12 @@ type DatabasesService interface {
GetRedisConfig(context.Context, string) (*RedisConfig, *Response, error)
GetMySQLConfig(context.Context, string) (*MySQLConfig, *Response, error)
GetMongoDBConfig(context.Context, string) (*MongoDBConfig, *Response, error)
GetKafkaConfig(context.Context, string) (*KafkaConfig, *Response, error)
UpdatePostgreSQLConfig(context.Context, string, *PostgreSQLConfig) (*Response, error)
UpdateRedisConfig(context.Context, string, *RedisConfig) (*Response, error)
UpdateMySQLConfig(context.Context, string, *MySQLConfig) (*Response, error)
UpdateMongoDBConfig(context.Context, string, *MongoDBConfig) (*Response, error)
UpdateKafkaConfig(context.Context, string, *KafkaConfig) (*Response, error)
ListOptions(todo context.Context) (*DatabaseOptions, *Response, error)
UpgradeMajorVersion(context.Context, string, *UpgradeVersionRequest) (*Response, error)
ListTopics(context.Context, string, *ListOptions) ([]DatabaseTopic, *Response, error)
Expand Down Expand Up @@ -659,6 +662,27 @@ type MongoDBConfig struct {
Verbosity *int `json:"verbosity,omitempty"`
}

// KafkaConfig holds advanced configurations for Kafka database clusters.
type KafkaConfig struct {
GroupInitialRebalanceDelayMs *int `json:"group_initial_rebalance_delay_ms,omitempty"`
GroupMinSessionTimeoutMs *int `json:"group_min_session_timeout_ms,omitempty"`
GroupMaxSessionTimeoutMs *int `json:"group_max_session_timeout_ms,omitempty"`
MessageMaxBytes *int `json:"message_max_bytes,omitempty"`
LogCleanerDeleteRetentionMs *int64 `json:"log_cleaner_delete_retention_ms,omitempty"`
LogCleanerMinCompactionLagMs *uint64 `json:"log_cleaner_min_compaction_lag_ms,omitempty"`
LogFlushIntervalMs *uint64 `json:"log_flush_interval_ms,omitempty"`
LogIndexIntervalBytes *int `json:"log_index_interval_bytes,omitempty"`
LogMessageDownconversionEnable *bool `json:"log_message_downconversion_enable,omitempty"`
LogMessageTimestampDifferenceMaxMs *uint64 `json:"log_message_timestamp_difference_max_ms,omitempty"`
LogPreallocate *bool `json:"log_preallocate,omitempty"`
LogRetentionBytes *big.Int `json:"log_retention_bytes,omitempty"`
LogRetentionHours *int `json:"log_retention_hours,omitempty"`
LogRetentionMs *big.Int `json:"log_retention_ms,omitempty"`
LogRollJitterMs *uint64 `json:"log_roll_jitter_ms,omitempty"`
LogSegmentDeleteDelayMs *int `json:"log_segment_delete_delay_ms,omitempty"`
AutoCreateTopicsEnable *bool `json:"auto_create_topics_enable,omitempty"`
}

type databaseUserRoot struct {
User *DatabaseUser `json:"user"`
}
Expand Down Expand Up @@ -703,6 +727,10 @@ type databaseMongoDBConfigRoot struct {
Config *MongoDBConfig `json:"config"`
}

type databaseKafkaConfigRoot struct {
Config *KafkaConfig `json:"config"`
}

type databaseBackupsRoot struct {
Backups []DatabaseBackup `json:"backups"`
}
Expand Down Expand Up @@ -1546,6 +1574,38 @@ func (svc *DatabasesServiceOp) UpdateMongoDBConfig(ctx context.Context, database
return resp, nil
}

// GetKafkaConfig retrieves the config for a Kafka database cluster.
func (svc *DatabasesServiceOp) GetKafkaConfig(ctx context.Context, databaseID string) (*KafkaConfig, *Response, error) {
path := fmt.Sprintf(databaseConfigPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseKafkaConfigRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Config, resp, nil
}

// UpdateKafkaConfig updates the config for a Kafka database cluster.
func (svc *DatabasesServiceOp) UpdateKafkaConfig(ctx context.Context, databaseID string, config *KafkaConfig) (*Response, error) {
path := fmt.Sprintf(databaseConfigPath, databaseID)
root := &databaseKafkaConfigRoot{
Config: config,
}
req, err := svc.client.NewRequest(ctx, http.MethodPatch, path, root)
if err != nil {
return nil, err
}
resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}

// ListOptions gets the database options available.
func (svc *DatabasesServiceOp) ListOptions(ctx context.Context) (*DatabaseOptions, *Response, error) {
root := new(databaseOptionsRoot)
Expand Down
102 changes: 100 additions & 2 deletions databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package godo
import (
"encoding/json"
"fmt"
"math/big"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -3008,8 +3009,7 @@ func TestDatabases_GetConfigMongoDB(t *testing.T) {
"slow_op_threshold_ms": 100,
"verbosity": 0
}
}
`
}`

mongoDBConfig = MongoDBConfig{
DefaultReadConcern: PtrTo("LOCAL"),
Expand Down Expand Up @@ -3064,6 +3064,104 @@ func TestDatabases_UpdateConfigMongoDB(t *testing.T) {
require.NoError(t, err)
}

func TestDatabases_GetConfigKafka(t *testing.T) {
setup()
defer teardown()

var (
dbSvc = client.Databases
dbID = "da4e0206-d019-41d7-b51f-deadbeefbb8f"
path = fmt.Sprintf("/v2/databases/%s/config", dbID)

kafkaConfigJSON = `{
"config": {
"group_initial_rebalance_delay_ms": 3000,
"group_min_session_timeout_ms": 6000,
"group_max_session_timeout_ms": 1800000,
"message_max_bytes": 1048588,
"log_cleaner_delete_retention_ms": 86400000,
"log_cleaner_min_compaction_lag_ms": 0,
"log_flush_interval_ms": 60000,
"log_index_interval_bytes": 4096,
"log_message_downconversion_enable": true,
"log_message_timestamp_difference_max_ms": 120000,
"log_preallocate": false,
"log_retention_bytes": -1,
"log_retention_hours": 168,
"log_retention_ms": 604800000,
"log_roll_jitter_ms": 0,
"log_segment_delete_delay_ms": 60000,
"auto_create_topics_enable": true
}
}`

kafkaConfig = KafkaConfig{
GroupInitialRebalanceDelayMs: PtrTo(3000),
GroupMinSessionTimeoutMs: PtrTo(6000),
GroupMaxSessionTimeoutMs: PtrTo(1800000),
MessageMaxBytes: PtrTo(1048588),
LogCleanerDeleteRetentionMs: PtrTo(int64(86400000)),
LogCleanerMinCompactionLagMs: PtrTo(uint64(0)),
LogFlushIntervalMs: PtrTo(uint64(60000)),
LogIndexIntervalBytes: PtrTo(4096),
LogMessageDownconversionEnable: PtrTo(true),
LogMessageTimestampDifferenceMaxMs: PtrTo(uint64(120000)),
LogPreallocate: PtrTo(false),
LogRetentionBytes: big.NewInt(int64(-1)),
LogRetentionHours: PtrTo(168),
LogRetentionMs: big.NewInt(int64(604800000)),
LogRollJitterMs: PtrTo(uint64(0)),
LogSegmentDeleteDelayMs: PtrTo(60000),
AutoCreateTopicsEnable: PtrTo(true),
}
)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, kafkaConfigJSON)
})

got, _, err := dbSvc.GetKafkaConfig(ctx, dbID)
require.NoError(t, err)
require.Equal(t, &kafkaConfig, got)
}

func TestDatabases_UpdateConfigKafka(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
path = fmt.Sprintf("/v2/databases/%s/config", dbID)
kafkaConfig = &KafkaConfig{
GroupInitialRebalanceDelayMs: PtrTo(3000),
GroupMinSessionTimeoutMs: PtrTo(6000),
GroupMaxSessionTimeoutMs: PtrTo(1800000),
MessageMaxBytes: PtrTo(1048588),
LogCleanerDeleteRetentionMs: PtrTo(int64(86400000)),
LogCleanerMinCompactionLagMs: PtrTo(uint64(0)),
}
)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPatch)

var b databaseKafkaConfigRoot
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&b)
require.NoError(t, err)

assert.Equal(t, b.Config, kafkaConfig)
assert.Equal(t, uint64(0), *b.Config.LogCleanerMinCompactionLagMs, "pointers to zero value should be sent")
assert.Nil(t, b.Config.LogFlushIntervalMs, "excluded value should not be sent")

w.WriteHeader(http.StatusNoContent)
})

_, err := client.Databases.UpdateKafkaConfig(ctx, dbID, kafkaConfig)
require.NoError(t, err)
}

func TestDatabases_UpgradeMajorVersion(t *testing.T) {
setup()
defer teardown()
Expand Down

0 comments on commit 57fbfeb

Please sign in to comment.