Skip to content

Commit

Permalink
Databases: Add Logsinks CRUD support (#711)
Browse files Browse the repository at this point in the history
  • Loading branch information
danaelhe authored Aug 20, 2024
1 parent a372d43 commit 9fe130b
Show file tree
Hide file tree
Showing 2 changed files with 338 additions and 0 deletions.
128 changes: 128 additions & 0 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
databaseTopicsPath = databaseBasePath + "/%s/topics"
databaseMetricsCredentialsPath = databaseBasePath + "/metrics/credentials"
databaseEvents = databaseBasePath + "/%s/events"
databaseLogsinkPath = databaseBasePath + "/%s/logsink/%s"
databaseLogsinksPath = databaseBasePath + "/%s/logsink"
)

// SQL Mode constants allow for MySQL-specific SQL flavor configuration.
Expand Down Expand Up @@ -159,6 +161,11 @@ type DatabasesService interface {
GetMetricsCredentials(context.Context) (*DatabaseMetricsCredentials, *Response, error)
UpdateMetricsCredentials(context.Context, *DatabaseUpdateMetricsCredentialsRequest) (*Response, error)
ListDatabaseEvents(context.Context, string, *ListOptions) ([]DatabaseEvent, *Response, error)
CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error)
GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error)
ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error)
UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error)
DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error)
}

// DatabasesServiceOp handles communication with the Databases related methods
Expand Down Expand Up @@ -323,6 +330,14 @@ type DatabaseTopic struct {
Config *TopicConfig `json:"config,omitempty"`
}

// DatabaseLogsink represents a logsink
type DatabaseLogsink struct {
ID string `json:"sink_id"`
Name string `json:"sink_name,omitempty"`
Type string `json:"sink_type,omitempty"`
Config *LogsinkConfig `json:"config,omitempty"`
}

// TopicPartition represents the state of a Kafka topic partition
type TopicPartition struct {
EarliestOffset uint64 `json:"earliest_offset,omitempty"`
Expand Down Expand Up @@ -472,6 +487,35 @@ type DatabaseFirewallRule struct {
CreatedAt time.Time `json:"created_at"`
}

// DatabaseCreateLogsinkRequest is used to create logsink for a database cluster
type DatabaseCreateLogsinkRequest struct {
Name string `json:"sink_name"`
Type string `json:"sink_type"`
Config *LogsinkConfig `json:"config"`
}

// DatabaseUpdateLogsinkRequest ...
type DatabaseUpdateLogsinkRequest struct {
Config *LogsinkConfig `json:"config"`
}

// LogsinkConfig represents one of the configurable options (rsyslog_logsink, elasticsearch_logsink, or opensearch_logsink) for a logsink.
type LogsinkConfig struct {
URL string `json:"url,omitempty"`
IndexPrefix string `json:"index_prefix,omitempty"`
IndexDaysMax string `json:"index_days_max,omitempty"`
Timeout string `json:"timeout,omitempty"`
Server string `json:"server,omitempty"`
Port int `json:"port,omitempty"`
TLS bool `json:"tls,omitempty"`
Format string `json:"format,omitempty"`
Logline string `json:"logline,omitempty"`
SD string `json:"sd,omitempty"`
CA string `json:"ca,omitempty"`
Key string `json:"key,omitempty"`
Cert string `json:"cert,omitempty"`
}

// PostgreSQLConfig holds advanced configurations for PostgreSQL database clusters.
type PostgreSQLConfig struct {
AutovacuumFreezeMaxAge *int `json:"autovacuum_freeze_max_age,omitempty"`
Expand Down Expand Up @@ -680,6 +724,10 @@ type databaseTopicsRoot struct {
Topics []DatabaseTopic `json:"topics"`
}

type databaseLogsinksRoot struct {
Sinks []DatabaseLogsink `json:"sinks"`
}

type databaseMetricsCredentialsRoot struct {
Credentials *DatabaseMetricsCredentials `json:"credentials"`
}
Expand Down Expand Up @@ -1560,3 +1608,83 @@ func (svc *DatabasesServiceOp) ListDatabaseEvents(ctx context.Context, databaseI

return root.Events, resp, nil
}

// CreateLogsink creates a new logsink for a database
func (svc *DatabasesServiceOp) CreateLogsink(ctx context.Context, databaseID string, createLogsink *DatabaseCreateLogsinkRequest) (*DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPost, path, createLogsink)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// GetLogsink gets a logsink for a database
func (svc *DatabasesServiceOp) GetLogsink(ctx context.Context, databaseID string, logsinkID string) (*DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}

root := new(DatabaseLogsink)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// ListTopics returns all topics for a given kafka cluster
func (svc *DatabasesServiceOp) ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error) {
path := fmt.Sprintf(databaseLogsinksPath, databaseID)
path, err := addOptions(path, opts)
if err != nil {
return nil, nil, err
}
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}
root := new(databaseLogsinksRoot)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root.Sinks, resp, nil
}

// UpdateLogsink updates a logsink for a database cluster
func (svc *DatabasesServiceOp) UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, updateLogsink)
if err != nil {
return nil, err
}

resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}

// DeleteLogsink deletes a logsink for a database cluster
func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error) {
path := fmt.Sprintf(databaseLogsinkPath, databaseID, logsinkID)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
if err != nil {
return nil, err
}
resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}
210 changes: 210 additions & 0 deletions databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3225,3 +3225,213 @@ func TestDatabases_ListDatabaseEvents(t *testing.T) {
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestDatabases_CreateLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)

want := &DatabaseLogsink{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &LogsinkConfig{
URL: "https://user:[email protected]:25060",
IndexPrefix: "opensearch-logs",
},
}

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:[email protected]:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink", dbID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPost)
fmt.Fprint(w, body)
})

log, _, err := client.Databases.CreateLogsink(ctx, dbID, &DatabaseCreateLogsinkRequest{
Name: "logs-sink",
Type: "opensearch",
Config: &LogsinkConfig{
URL: "https://user:[email protected]:25060",
IndexPrefix: "opensearch-logs",
},
})

require.NoError(t, err)

require.Equal(t, want, log)
}

func TestDatabases_GetLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

want := &DatabaseLogsink{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &LogsinkConfig{
URL: "https://user:[email protected]:25060",
IndexPrefix: "opensearch-logs",
},
}

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:[email protected]:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, body)
})

got, _, err := client.Databases.GetLogsink(ctx, dbID, logsinkID)
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestDatabases_UpdateLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

body := `{
"sink_id":"deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:[email protected]:25060",
"index_prefix": "opensearch-logs"
}
}`

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPut)
fmt.Fprint(w, body)
})

_, err := client.Databases.UpdateLogsink(ctx, dbID, logsinkID, &DatabaseUpdateLogsinkRequest{
Config: &LogsinkConfig{
Server: "192.168.0.1",
Port: 514,
TLS: false,
Format: "rfc3164",
},
})

require.NoError(t, err)
}

func TestDatabases_ListLogsinks(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)

want := []DatabaseLogsink{
{
ID: "deadbeef-dead-4aa5-beef-deadbeef347d",
Name: "logs-sink",
Type: "opensearch",
Config: &LogsinkConfig{
URL: "https://user:[email protected]:25060",
IndexPrefix: "opensearch-logs",
},
},
{
ID: "d6e95157-5f58-48d0-9023-8cfb409d102a",
Name: "logs-sink-2",
Type: "opensearch",
Config: &LogsinkConfig{
URL: "https://user:[email protected]:25060",
IndexPrefix: "opensearch-logs",
},
}}

body := `{
"sinks": [
{
"sink_id": "deadbeef-dead-4aa5-beef-deadbeef347d",
"sink_name": "logs-sink",
"sink_type": "opensearch",
"config": {
"url": "https://user:[email protected]:25060",
"index_prefix": "opensearch-logs"
}
},
{
"sink_id": "d6e95157-5f58-48d0-9023-8cfb409d102a",
"sink_name": "logs-sink-2",
"sink_type": "opensearch",
"config": {
"url": "https://user:[email protected]:25060",
"index_prefix": "opensearch-logs"
}
}
]
}`

path := fmt.Sprintf("/v2/databases/%s/logsink", dbID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, body)
})

got, _, err := client.Databases.ListLogsinks(ctx, dbID, &ListOptions{})
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestDatabases_DeleteLogsink(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
logsinkID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

path := fmt.Sprintf("/v2/databases/%s/logsink/%s", dbID, logsinkID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodDelete)
})

_, err := client.Databases.DeleteLogsink(ctx, dbID, logsinkID)
require.NoError(t, err)
}

0 comments on commit 9fe130b

Please sign in to comment.