Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DBaaS engine Kafka #633

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 170 additions & 7 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
databaseOptionsPath = databaseBasePath + "/options"
databaseUpgradeMajorVersionPath = databaseBasePath + "/%s/upgrade"
databasePromoteReplicaToPrimaryPath = databaseReplicaPath + "/promote"
databaseTopicPath = databaseBasePath + "/%s/topics/%s"
databaseTopicsPath = databaseBasePath + "/%s/topics"
)

// SQL Mode constants allow for MySQL-specific SQL flavor configuration.
Expand Down Expand Up @@ -146,6 +148,11 @@ type DatabasesService interface {
UpdateMySQLConfig(context.Context, string, *MySQLConfig) (*Response, error)
ListOptions(todo context.Context) (*DatabaseOptions, *Response, error)
UpgradeMajorVersion(context.Context, string, *UpgradeVersionRequest) (*Response, error)
ListTopics(context.Context, string, *ListOptions) ([]DatabaseTopic, *Response, error)
CreateTopic(context.Context, string, *DatabaseCreateTopicRequest) (*DatabaseTopic, *Response, error)
GetTopic(context.Context, string, string) (*DatabaseTopic, *Response, error)
DeleteTopic(context.Context, string, string) (*Response, error)
UpdateTopic(context.Context, string, string, *DatabaseUpdateTopicRequest) (*Response, error)
}

// DatabasesServiceOp handles communication with the Databases related methods
Expand Down Expand Up @@ -188,21 +195,38 @@ type DatabaseCA struct {

// DatabaseConnection represents a database connection
type DatabaseConnection struct {
URI string `json:"uri,omitempty"`
Database string `json:"database,omitempty"`
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
SSL bool `json:"ssl,omitempty"`
Protocol string `json:"protocol"`
URI string `json:"uri,omitempty"`
Database string `json:"database,omitempty"`
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
User string `json:"user,omitempty"`
Password string `json:"password,omitempty"`
SSL bool `json:"ssl,omitempty"`
ApplicationPorts map[string]uint32 `json:"application_ports,omitempty"`
}

// DatabaseUser represents a user in the database
type DatabaseUser struct {
Name string `json:"name,omitempty"`
Role string `json:"role,omitempty"`
Password string `json:"password,omitempty"`
AccessCert string `json:"access_cert,omitempty"`
AccessKey string `json:"access_key,omitempty"`
MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"`
Settings *DatabaseUserSettings `json:"settings,omitempty"`
}

// KafkaACL contains Kafka specific user access control information
type KafkaACL struct {
ID string `json:"id,omitempty"`
Permission string `json:"permission,omitempty"`
Topic string `json:"topic,omitempty"`
}

// DatabaseUserSettings contains Kafka-specific user settings
type DatabaseUserSettings struct {
ACL []*KafkaACL `json:"acl,omitempty"`
}

// DatabaseMySQLUserSettings contains MySQL-specific user settings
Expand Down Expand Up @@ -271,6 +295,56 @@ type DatabaseDB struct {
Name string `json:"name"`
}

// DatabaseTopic represents a Kafka topic
type DatabaseTopic struct {
Name string `json:"name"`
PartitionCount *uint32 `json:"partition_count,omitempty"`
ReplicationFactor *uint32 `json:"replication_factor,omitempty"`
State string `json:"state,omitempty"`
Config *TopicConfig `json:"config,omitempty"`
andrewsomething marked this conversation as resolved.
Show resolved Hide resolved
}

// TopicConfig represents all configurable options for a Kafka topic
type TopicConfig struct {
CleanupPolicy string `json:"cleanup_policy,omitempty"`
CompressionType string `json:"compression_type,omitempty"`
DeleteRetentionMS *uint64 `json:"delete_retention_ms,omitempty"`
FileDeleteDelayMS *uint64 `json:"file_delete_delay_ms,omitempty"`
FlushMessages *uint64 `json:"flush_messages,omitempty"`
FlushMS *uint64 `json:"flush_ms,omitempty"`
IndexIntervalBytes *uint64 `json:"index_interval_bytes,omitempty"`
MaxCompactionLagMS *uint64 `json:"max_compaction_lag_ms,omitempty"`
MaxMessageBytes *uint64 `json:"max_message_bytes,omitempty"`
MessageDownConversionEnable *bool `json:"message_down_conversion_enable,omitempty"`
MessageFormatVersion string `json:"message_format_version,omitempty"`
MessageTimestampDifferenceMaxMS *uint64 `json:"message_timestamp_difference_max_ms,omitempty"`
MessageTimestampType string `json:"message_timestamp_type,omitempty"`
MinCleanableDirtyRatio *float32 `json:"min_cleanable_dirty_ratio,omitempty"`
MinCompactionLagMS *uint64 `json:"min_compaction_lag_ms,omitempty"`
MinInsyncReplicas *uint32 `json:"min_insync_replicas,omitempty"`
Preallocate *bool `json:"preallocate,omitempty"`
RetentionBytes *int64 `json:"retention_bytes,omitempty"`
RetentionMS *int64 `json:"retention_ms,omitempty"`
SegmentBytes *uint64 `json:"segment_bytes,omitempty"`
SegmentIndexBytes *uint64 `json:"segment_index_bytes,omitempty"`
SegmentJitterMS *uint64 `json:"segment_jitter_ms,omitempty"`
SegmentMS *uint64 `json:"segment_ms,omitempty"`
UncleanLeaderElectionEnable *bool `json:"unclean_leader_election_enable,omitempty"`
}

// DatabaseCreateTopicRequest is used to create a new topic within a kafka cluster
type DatabaseCreateTopicRequest struct {
Name string `json:"name"`
PartitionCount *uint32 `json:"partition_count,omitempty"`
ReplicationFactor *uint32 `json:"replication_factor,omitempty"`
Config *TopicConfig `json:"config,omitempty"`
}

// DatabaseUpdateTopicRequest ...
type DatabaseUpdateTopicRequest struct {
Topic *DatabaseTopic `json:"topic"` // note: `name` field in Topic unused on update
}

// DatabaseReplica represents a read-only replica of a particular database
type DatabaseReplica struct {
ID string `json:"id"`
Expand Down Expand Up @@ -316,11 +390,13 @@ type DatabaseUpdatePoolRequest struct {
type DatabaseCreateUserRequest struct {
Name string `json:"name"`
MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"`
Settings *DatabaseUserSettings `json:"settings,omitempty"`
}

// DatabaseResetUserAuthRequest is used to reset a users DB auth
type DatabaseResetUserAuthRequest struct {
MySQLSettings *DatabaseMySQLUserSettings `json:"mysql_settings,omitempty"`
Settings *DatabaseUserSettings `json:"settings,omitempty"`
}

// DatabaseCreateDBRequest is used to create a new engine-specific database within the cluster
Expand Down Expand Up @@ -551,12 +627,21 @@ type databaseOptionsRoot struct {
Options *DatabaseOptions `json:"options"`
}

type databaseTopicRoot struct {
Topic *DatabaseTopic `json:"topic"`
}

type databaseTopicsRoot struct {
Topics []DatabaseTopic `json:"topics"`
}

// DatabaseOptions represents the available database engines
type DatabaseOptions struct {
MongoDBOptions DatabaseEngineOptions `json:"mongodb"`
MySQLOptions DatabaseEngineOptions `json:"mysql"`
PostgresSQLOptions DatabaseEngineOptions `json:"pg"`
RedisOptions DatabaseEngineOptions `json:"redis"`
KafkaOptions DatabaseEngineOptions `json:"kafka"`
}

// DatabaseEngineOptions represents the configuration options that are available for a given database engine
Expand Down Expand Up @@ -1257,3 +1342,81 @@ func (svc *DatabasesServiceOp) UpgradeMajorVersion(ctx context.Context, database

return resp, nil
}

// ListTopics returns all topics for a given kafka cluster
func (svc *DatabasesServiceOp) ListTopics(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseTopic, *Response, error) {
path := fmt.Sprintf(databaseTopicsPath, databaseID)
path, err := addOptions(path, opts)
if err != nil {
return nil, nil, err
}
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseTopicsRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Topics, resp, nil
}

// GetTopic returns a single kafka topic by name
func (svc *DatabasesServiceOp) GetTopic(ctx context.Context, databaseID, name string) (*DatabaseTopic, *Response, error) {
path := fmt.Sprintf(databaseTopicPath, databaseID, name)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseTopicRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Topic, resp, nil
}

// CreateTopic will create a new kafka topic
func (svc *DatabasesServiceOp) CreateTopic(ctx context.Context, databaseID string, createTopic *DatabaseCreateTopicRequest) (*DatabaseTopic, *Response, error) {
path := fmt.Sprintf(databaseTopicsPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createTopic)
if err != nil {
return nil, nil, err
}
root := new(databaseTopicRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Topic, resp, nil
}

// UpdateTopic updates a single kafka topic
func (svc *DatabasesServiceOp) UpdateTopic(ctx context.Context, databaseID string, name string, updateTopic *DatabaseUpdateTopicRequest) (*Response, error) {
path := fmt.Sprintf(databaseTopicPath, databaseID, name)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateTopic)
if err != nil {
return nil, err
}
root := new(databaseTopicRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return resp, err
}
return resp, nil
}

// DeleteTopic will delete an existing kafka topic
func (svc *DatabasesServiceOp) DeleteTopic(ctx context.Context, databaseID, name string) (*Response, error) {
path := fmt.Sprintf(databaseTopicPath, databaseID, name)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
if err != nil {
return nil, err
}
resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}
Loading
Loading