diff --git a/README.md b/README.md index 21b555d..9cf3cd3 100644 --- a/README.md +++ b/README.md @@ -80,13 +80,13 @@ $ go get github.com/Trendyol/go-dcp | `dcp.group.name` | string | yes | | DCP group name for vbuckets. | | `scopeName` | string | no | _default | Couchbase scope name. | | `collectionNames` | []string | no | _default | Couchbase collection names. | -| `connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | +| `connectionBufferSize` | uint, string | no | 20mb | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | | `connectionTimeout` | time.Duration | no | 5s | Couchbase connection timeout. | | `secureConnection` | bool | no | false | Enable TLS connection of Couchbase. | | `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. | | `debug` | bool | no | false | For debugging purpose. | -| `dcp.bufferSize` | int | no | 16777216 | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. | -| `dcp.connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | +| `dcp.bufferSize` | int | no | 16mb | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. | +| `dcp.connectionBufferSize` | uint, string | no | 20mb | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. | | `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. | | `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. | | `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet` or `static`. Check examples for details. | diff --git a/config/dcp.go b/config/dcp.go index 6dabefb..7791ccf 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -6,6 +6,8 @@ import ( "strconv" "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/Trendyol/go-dcp/logger" ) @@ -47,9 +49,9 @@ type DCPListener struct { } type ExternalDcp struct { + BufferSize any `yaml:"bufferSize"` + ConnectionBufferSize any `yaml:"connectionBufferSize"` Group DCPGroup `yaml:"group"` - BufferSize int `yaml:"bufferSize"` - ConnectionBufferSize uint `yaml:"connectionBufferSize"` ConnectionTimeout time.Duration `yaml:"connectionTimeout"` Listener DCPListener `yaml:"listener"` } @@ -95,9 +97,9 @@ type RollbackMitigation struct { } type Metadata struct { - Config map[string]string `yaml:"config"` - Type string `yaml:"type"` - ReadOnly bool `json:"readOnly"` + Config map[string]any `yaml:"config"` + Type string `yaml:"type"` + ReadOnly bool `json:"readOnly"` } type Logging struct { @@ -105,15 +107,16 @@ type Logging struct { } type Dcp struct { - Logging Logging `yaml:"logging"` + ConnectionBufferSize any `yaml:"connectionBufferSize"` BucketName string `yaml:"bucketName"` ScopeName string `yaml:"scopeName"` Password string `yaml:"password"` RootCAPath string `yaml:"rootCAPath"` Username string `yaml:"username"` + Logging Logging `yaml:"logging"` Metadata Metadata `yaml:"metadata"` - Hosts []string `yaml:"hosts"` CollectionNames []string `yaml:"collectionNames"` + Hosts []string `yaml:"hosts"` Metric Metric `yaml:"metric"` Checkpoint Checkpoint `yaml:"checkpoint"` LeaderElection LeaderElection `yaml:"leaderElector"` @@ -122,7 +125,6 @@ type Dcp struct { RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"` API API `yaml:"api"` ConnectionTimeout time.Duration `yaml:"connectionTimeout"` - ConnectionBufferSize uint `yaml:"connectionBufferSize"` SecureConnection bool `yaml:"secureConnection"` Debug bool `yaml:"debug"` } @@ -143,7 +145,7 @@ func (c *Dcp) GetFileMetadata() string { var fileName string if _, ok := c.Metadata.Config[FileMetadataFileNameConfig]; ok { - fileName = c.Metadata.Config[FileMetadataFileNameConfig] + fileName = c.Metadata.Config[FileMetadataFileNameConfig].(string) } else { err := errors.New("file metadata file name is not set") logger.Log.Error("failed to get metadata file name: %v", err) @@ -247,29 +249,23 @@ func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata { } if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig]; ok { - couchbaseMetadata.Bucket = bucket + couchbaseMetadata.Bucket = bucket.(string) } if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig]; ok { - couchbaseMetadata.Scope = scope + couchbaseMetadata.Scope = scope.(string) } if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig]; ok { - couchbaseMetadata.Collection = collection + couchbaseMetadata.Collection = collection.(string) } if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok { - parsedConnectionBufferSize, err := strconv.ParseUint(connectionBufferSize, 10, 32) - if err != nil { - logger.Log.Error("failed to parse metadata connection buffer size: %v", err) - panic(err) - } - - couchbaseMetadata.ConnectionBufferSize = uint(parsedConnectionBufferSize) + couchbaseMetadata.ConnectionBufferSize = uint(helpers.ResolveUnionIntOrStringValue(connectionBufferSize)) } if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig]; ok { - parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout) + parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout.(string)) if err != nil { logger.Log.Error("failed to parse metadata connection timeout: %v", err) panic(err) @@ -393,8 +389,8 @@ func (c *Dcp) applyDefaultScopeName() { } func (c *Dcp) applyDefaultConnectionBufferSize() { - if c.ConnectionBufferSize == 0 { - c.ConnectionBufferSize = 20971520 + if c.ConnectionBufferSize == nil { + c.ConnectionBufferSize = helpers.ResolveUnionIntOrStringValue("20mb") } } @@ -425,12 +421,12 @@ func (c *Dcp) applyDefaultLeaderElection() { } func (c *Dcp) applyDefaultDcp() { - if c.Dcp.BufferSize == 0 { - c.Dcp.BufferSize = 16777216 + if c.Dcp.BufferSize == nil { + c.Dcp.BufferSize = helpers.ResolveUnionIntOrStringValue("16mb") } - if c.Dcp.ConnectionBufferSize == 0 { - c.Dcp.ConnectionBufferSize = 20971520 + if c.Dcp.ConnectionBufferSize == nil { + c.Dcp.ConnectionBufferSize = helpers.ResolveUnionIntOrStringValue("20mb") } if c.Dcp.Listener.BufferSize == 0 { diff --git a/config/dcp_test.go b/config/dcp_test.go index db15b92..5c210ed 100644 --- a/config/dcp_test.go +++ b/config/dcp_test.go @@ -3,6 +3,8 @@ package config import ( "testing" "time" + + "github.com/Trendyol/go-dcp/helpers" ) func TestDefaultConfig(t *testing.T) { @@ -37,7 +39,7 @@ func TestDefaultConfig(t *testing.T) { func TestGetCouchbaseMetadata(t *testing.T) { dcp := &Dcp{ Metadata: Metadata{ - Config: map[string]string{ + Config: map[string]any{ CouchbaseMetadataBucketConfig: "mybucket", CouchbaseMetadataScopeConfig: "myscope", }, @@ -50,7 +52,7 @@ func TestGetCouchbaseMetadata(t *testing.T) { expectedBucket := "mybucket" expectedScope := "myscope" expectedCollection := DefaultCollectionName - expectedConnectionBufferSize := uint(5242880) + expectedConnectionBufferSize := helpers.ResolveUnionIntOrStringValue("5mb") expectedConnectionTimeout := 5 * time.Second if couchbaseMetadata.Bucket != expectedBucket { @@ -65,7 +67,7 @@ func TestGetCouchbaseMetadata(t *testing.T) { t.Errorf("Collection is not set to expected value") } - if couchbaseMetadata.ConnectionBufferSize != expectedConnectionBufferSize { + if couchbaseMetadata.ConnectionBufferSize != uint(expectedConnectionBufferSize) { t.Errorf("ConnectionBufferSize is not set to expected value") } @@ -122,7 +124,7 @@ func TestGetCouchbaseMembership(t *testing.T) { func TestDcp_GetFileMetadata(t *testing.T) { dcp := &Dcp{ Metadata: Metadata{ - Config: map[string]string{ + Config: map[string]any{ FileMetadataFileNameConfig: "testfile.json", }, }, @@ -244,7 +246,7 @@ func TestDcpApplyDefaultConnectionBufferSize(t *testing.T) { c := &Dcp{} c.applyDefaultConnectionBufferSize() - if c.ConnectionBufferSize != 20971520 { + if c.ConnectionBufferSize.(int) != 20971520 { t.Errorf("ConnectionBufferSize is not set to expected value") } } @@ -296,11 +298,11 @@ func TestDcpApplyDefaultDcp(t *testing.T) { c := &Dcp{} c.applyDefaultDcp() - if c.Dcp.BufferSize != 16777216 { + if c.Dcp.BufferSize.(int) != 16777216 { t.Errorf("Dcp.BufferSize is not set to expected value") } - if c.Dcp.ConnectionBufferSize != 20971520 { + if c.Dcp.ConnectionBufferSize.(int) != 20971520 { t.Errorf("Dcp.ConnectionBufferSize is not set to expected value") } diff --git a/couchbase/client.go b/couchbase/client.go index ee9c05d..6d15efc 100644 --- a/couchbase/client.go +++ b/couchbase/client.go @@ -8,6 +8,8 @@ import ( "os" "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/couchbase/gocbcore/v10/connstr" "github.com/Trendyol/go-dcp/config" @@ -215,7 +217,7 @@ func resolveHostsAsHTTP(hosts []string) []string { } func (s *client) Connect() error { - connectionBufferSize := s.config.ConnectionBufferSize + connectionBufferSize := uint(helpers.ResolveUnionIntOrStringValue(s.config.ConnectionBufferSize)) connectionTimeout := s.config.ConnectionTimeout if s.config.IsCouchbaseMetadata() { @@ -287,11 +289,11 @@ func (s *client) DcpConnect() error { Enabled: true, }, DCPConfig: gocbcore.DCPConfig{ - BufferSize: s.config.Dcp.BufferSize, + BufferSize: helpers.ResolveUnionIntOrStringValue(s.config.Dcp.BufferSize), UseExpiryOpcode: true, }, KVConfig: gocbcore.KVConfig{ - ConnectionBufferSize: s.config.Dcp.ConnectionBufferSize, + ConnectionBufferSize: uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize)), }, } diff --git a/helpers/data_units.go b/helpers/data_units.go new file mode 100644 index 0000000..8fa9a3c --- /dev/null +++ b/helpers/data_units.go @@ -0,0 +1,61 @@ +package helpers + +import ( + "fmt" + "strconv" + "strings" +) + +func ResolveUnionIntOrStringValue(input any) int { + switch value := input.(type) { + case int: + return value + case uint: + return int(value) + case string: + intValue, err := strconv.ParseInt(value, 10, 64) + if err == nil { + return int(intValue) + } + + result, err := convertSizeUnitToByte(value) + if err != nil { + panic(err) + } + + return result + } + + return 0 +} + +func convertSizeUnitToByte(str string) (int, error) { + if len(str) < 2 { + return 0, fmt.Errorf("invalid input: %s", str) + } + + // Extract the numeric part of the input + sizeStr := str[:len(str)-2] + sizeStr = strings.TrimSpace(sizeStr) + sizeStr = strings.ReplaceAll(sizeStr, ",", ".") + + size, err := strconv.ParseFloat(sizeStr, 64) + if err != nil { + return 0, fmt.Errorf("cannot extract numeric part for the input %s, err = %w", str, err) + } + + // Determine the unit (B, KB, MB, GB) + unit := str[len(str)-2:] + switch strings.ToUpper(unit) { + case "B": + return int(size), nil + case "KB": + return int(size * 1024), nil + case "MB": + return int(size * 1024 * 1024), nil + case "GB": + return int(size * 1024 * 1024 * 1024), nil + default: + return 0, fmt.Errorf("unsupported unit: %s, you can specify one of B, KB, MB and GB", unit) + } +} diff --git a/helpers/data_units_test.go b/helpers/data_units_test.go new file mode 100644 index 0000000..91dd7de --- /dev/null +++ b/helpers/data_units_test.go @@ -0,0 +1,85 @@ +package helpers + +import "testing" + +func TestDcp_ResolveConnectionBufferSize(t *testing.T) { + tests := []struct { + input any + name string + want int + }{ + { + name: "When_Client_Gives_Int_Value", + input: 20971520, + want: 20971520, + }, + { + name: "When_Client_Gives_UInt_Value", + input: uint(10971520), + want: 10971520, + }, + { + name: "When_Client_Gives_StringInt_Value", + input: "15971520", + want: 15971520, + }, + { + name: "When_Client_Gives_KB_Value", + input: "500kb", + want: 500 * 1024, + }, + { + name: "When_Client_Gives_MB_Value", + input: "10mb", + want: 10 * 1024 * 1024, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ResolveUnionIntOrStringValue(tt.input); got != tt.want { + t.Errorf("ResolveConnectionBufferSize() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestConvertToBytes(t *testing.T) { + testCases := []struct { + input string + expected int + err bool + }{ + {"1kb", 1024, false}, + {"5mb", 5 * 1024 * 1024, false}, + {"5,5mb", 5.5 * 1024 * 1024, false}, + {"8.5mb", 8.5 * 1024 * 1024, false}, + {"10,25 mb", 10.25 * 1024 * 1024, false}, + {"10gb", 10 * 1024 * 1024 * 1024, false}, + {"1KB", 1024, false}, + {"5MB", 5 * 1024 * 1024, false}, + {"12 MB", 12 * 1024 * 1024, false}, + {"10GB", 10 * 1024 * 1024 * 1024, false}, + {"123", 0, true}, + {"15TB", 0, true}, + {"invalid", 0, true}, + {"", 0, true}, + {"123 KB", 123 * 1024, false}, + {"1 MB", 1 * 1024 * 1024, false}, + } + + for _, tc := range testCases { + result, err := convertSizeUnitToByte(tc.input) + + if tc.err && err == nil { + t.Errorf("Expected an error for input %s, but got none", tc.input) + } + + if !tc.err && err != nil { + t.Errorf("Unexpected error for input %s: %v", tc.input, err) + } + + if result != tc.expected { + t.Errorf("For input %s, expected %d bytes, but got %d", tc.input, tc.expected, result) + } + } +}