Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unit type support #74

Merged
merged 2 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ $ go get github.com/Trendyol/go-dcp
| `dcp.group.name` | string | yes | | DCP group name for vbuckets. |
| `scopeName` | string | no | _default | Couchbase scope name. |
| `collectionNames` | []string | no | _default | Couchbase collection names. |
| `connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. |
| `connectionBufferSize` | uint, string | no | 20mb | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. |
| `connectionTimeout` | time.Duration | no | 5s | Couchbase connection timeout. |
| `secureConnection` | bool | no | false | Enable TLS connection of Couchbase. |
| `rootCAPath` | string | no | *not set | if `secureConnection` set `true` this field is required. |
| `debug` | bool | no | false | For debugging purpose. |
| `dcp.bufferSize` | int | no | 16777216 | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. |
| `dcp.connectionBufferSize` | uint | no | 20971520 | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. |
| `dcp.bufferSize` | int | no | 16mb | Go DCP listener pre-allocated buffer size. `16mb` is default. Check this if you get OOM Killed. |
| `dcp.connectionBufferSize` | uint, string | no | 20mb | [gocbcore](github.com/couchbase/gocbcore) library buffer size. `20mb` is default. Check this if you get OOM Killed. |
| `dcp.connectionTimeout` | time.Duration | no | 5s | DCP connection timeout. |
| `dcp.listener.bufferSize` | uint | no | 1000 | Go DCP listener buffered channel size. |
| `dcp.group.membership.type` | string | no | | DCP membership types. `couchbase`, `kubernetesHa`, `kubernetesStatefulSet` or `static`. Check examples for details. |
Expand Down
48 changes: 22 additions & 26 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strconv"
"time"

"github.com/Trendyol/go-dcp/helpers"

"github.com/Trendyol/go-dcp/logger"
)

Expand Down Expand Up @@ -47,9 +49,9 @@ type DCPListener struct {
}

type ExternalDcp struct {
BufferSize any `yaml:"bufferSize"`
ConnectionBufferSize any `yaml:"connectionBufferSize"`
Group DCPGroup `yaml:"group"`
BufferSize int `yaml:"bufferSize"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
Listener DCPListener `yaml:"listener"`
}
Expand Down Expand Up @@ -95,25 +97,26 @@ type RollbackMitigation struct {
}

type Metadata struct {
Config map[string]string `yaml:"config"`
Type string `yaml:"type"`
ReadOnly bool `json:"readOnly"`
Config map[string]any `yaml:"config"`
Type string `yaml:"type"`
ReadOnly bool `json:"readOnly"`
}

type Logging struct {
Level string `yaml:"level"`
}

type Dcp struct {
Logging Logging `yaml:"logging"`
ConnectionBufferSize any `yaml:"connectionBufferSize"`
BucketName string `yaml:"bucketName"`
ScopeName string `yaml:"scopeName"`
Password string `yaml:"password"`
RootCAPath string `yaml:"rootCAPath"`
Username string `yaml:"username"`
Logging Logging `yaml:"logging"`
Metadata Metadata `yaml:"metadata"`
Hosts []string `yaml:"hosts"`
CollectionNames []string `yaml:"collectionNames"`
Hosts []string `yaml:"hosts"`
Metric Metric `yaml:"metric"`
Checkpoint Checkpoint `yaml:"checkpoint"`
LeaderElection LeaderElection `yaml:"leaderElector"`
Expand All @@ -122,7 +125,6 @@ type Dcp struct {
RollbackMitigation RollbackMitigation `yaml:"rollbackMitigation"`
API API `yaml:"api"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
SecureConnection bool `yaml:"secureConnection"`
Debug bool `yaml:"debug"`
}
Expand All @@ -143,7 +145,7 @@ func (c *Dcp) GetFileMetadata() string {
var fileName string

if _, ok := c.Metadata.Config[FileMetadataFileNameConfig]; ok {
fileName = c.Metadata.Config[FileMetadataFileNameConfig]
fileName = c.Metadata.Config[FileMetadataFileNameConfig].(string)
} else {
err := errors.New("file metadata file name is not set")
logger.Log.Error("failed to get metadata file name: %v", err)
Expand Down Expand Up @@ -247,29 +249,23 @@ func (c *Dcp) GetCouchbaseMetadata() CouchbaseMetadata {
}

if bucket, ok := c.Metadata.Config[CouchbaseMetadataBucketConfig]; ok {
couchbaseMetadata.Bucket = bucket
couchbaseMetadata.Bucket = bucket.(string)
}

if scope, ok := c.Metadata.Config[CouchbaseMetadataScopeConfig]; ok {
couchbaseMetadata.Scope = scope
couchbaseMetadata.Scope = scope.(string)
}

if collection, ok := c.Metadata.Config[CouchbaseMetadataCollectionConfig]; ok {
couchbaseMetadata.Collection = collection
couchbaseMetadata.Collection = collection.(string)
}

if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok {
parsedConnectionBufferSize, err := strconv.ParseUint(connectionBufferSize, 10, 32)
if err != nil {
logger.Log.Error("failed to parse metadata connection buffer size: %v", err)
panic(err)
}

couchbaseMetadata.ConnectionBufferSize = uint(parsedConnectionBufferSize)
couchbaseMetadata.ConnectionBufferSize = uint(helpers.ResolveUnionIntOrStringValue(connectionBufferSize))
}

if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig]; ok {
parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout)
parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout.(string))
if err != nil {
logger.Log.Error("failed to parse metadata connection timeout: %v", err)
panic(err)
Expand Down Expand Up @@ -393,8 +389,8 @@ func (c *Dcp) applyDefaultScopeName() {
}

func (c *Dcp) applyDefaultConnectionBufferSize() {
if c.ConnectionBufferSize == 0 {
c.ConnectionBufferSize = 20971520
if c.ConnectionBufferSize == nil {
c.ConnectionBufferSize = helpers.ResolveUnionIntOrStringValue("20mb")
}
}

Expand Down Expand Up @@ -425,12 +421,12 @@ func (c *Dcp) applyDefaultLeaderElection() {
}

func (c *Dcp) applyDefaultDcp() {
if c.Dcp.BufferSize == 0 {
c.Dcp.BufferSize = 16777216
if c.Dcp.BufferSize == nil {
c.Dcp.BufferSize = helpers.ResolveUnionIntOrStringValue("16mb")
}

if c.Dcp.ConnectionBufferSize == 0 {
c.Dcp.ConnectionBufferSize = 20971520
if c.Dcp.ConnectionBufferSize == nil {
c.Dcp.ConnectionBufferSize = helpers.ResolveUnionIntOrStringValue("20mb")
}

if c.Dcp.Listener.BufferSize == 0 {
Expand Down
16 changes: 9 additions & 7 deletions config/dcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"testing"
"time"

"github.com/Trendyol/go-dcp/helpers"
)

func TestDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -37,7 +39,7 @@ func TestDefaultConfig(t *testing.T) {
func TestGetCouchbaseMetadata(t *testing.T) {
dcp := &Dcp{
Metadata: Metadata{
Config: map[string]string{
Config: map[string]any{
CouchbaseMetadataBucketConfig: "mybucket",
CouchbaseMetadataScopeConfig: "myscope",
},
Expand All @@ -50,7 +52,7 @@ func TestGetCouchbaseMetadata(t *testing.T) {
expectedBucket := "mybucket"
expectedScope := "myscope"
expectedCollection := DefaultCollectionName
expectedConnectionBufferSize := uint(5242880)
expectedConnectionBufferSize := helpers.ResolveUnionIntOrStringValue("5mb")
expectedConnectionTimeout := 5 * time.Second

if couchbaseMetadata.Bucket != expectedBucket {
Expand All @@ -65,7 +67,7 @@ func TestGetCouchbaseMetadata(t *testing.T) {
t.Errorf("Collection is not set to expected value")
}

if couchbaseMetadata.ConnectionBufferSize != expectedConnectionBufferSize {
if couchbaseMetadata.ConnectionBufferSize != uint(expectedConnectionBufferSize) {
t.Errorf("ConnectionBufferSize is not set to expected value")
}

Expand Down Expand Up @@ -122,7 +124,7 @@ func TestGetCouchbaseMembership(t *testing.T) {
func TestDcp_GetFileMetadata(t *testing.T) {
dcp := &Dcp{
Metadata: Metadata{
Config: map[string]string{
Config: map[string]any{
FileMetadataFileNameConfig: "testfile.json",
},
},
Expand Down Expand Up @@ -244,7 +246,7 @@ func TestDcpApplyDefaultConnectionBufferSize(t *testing.T) {
c := &Dcp{}
c.applyDefaultConnectionBufferSize()

if c.ConnectionBufferSize != 20971520 {
if c.ConnectionBufferSize.(int) != 20971520 {
t.Errorf("ConnectionBufferSize is not set to expected value")
}
}
Expand Down Expand Up @@ -296,11 +298,11 @@ func TestDcpApplyDefaultDcp(t *testing.T) {
c := &Dcp{}
c.applyDefaultDcp()

if c.Dcp.BufferSize != 16777216 {
if c.Dcp.BufferSize.(int) != 16777216 {
t.Errorf("Dcp.BufferSize is not set to expected value")
}

if c.Dcp.ConnectionBufferSize != 20971520 {
if c.Dcp.ConnectionBufferSize.(int) != 20971520 {
t.Errorf("Dcp.ConnectionBufferSize is not set to expected value")
}

Expand Down
8 changes: 5 additions & 3 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"time"

"github.com/Trendyol/go-dcp/helpers"

"github.com/couchbase/gocbcore/v10/connstr"

"github.com/Trendyol/go-dcp/config"
Expand Down Expand Up @@ -215,7 +217,7 @@ func resolveHostsAsHTTP(hosts []string) []string {
}

func (s *client) Connect() error {
connectionBufferSize := s.config.ConnectionBufferSize
connectionBufferSize := uint(helpers.ResolveUnionIntOrStringValue(s.config.ConnectionBufferSize))
connectionTimeout := s.config.ConnectionTimeout

if s.config.IsCouchbaseMetadata() {
Expand Down Expand Up @@ -287,11 +289,11 @@ func (s *client) DcpConnect() error {
Enabled: true,
},
DCPConfig: gocbcore.DCPConfig{
BufferSize: s.config.Dcp.BufferSize,
BufferSize: helpers.ResolveUnionIntOrStringValue(s.config.Dcp.BufferSize),
UseExpiryOpcode: true,
},
KVConfig: gocbcore.KVConfig{
ConnectionBufferSize: s.config.Dcp.ConnectionBufferSize,
ConnectionBufferSize: uint(helpers.ResolveUnionIntOrStringValue(s.config.Dcp.ConnectionBufferSize)),
},
}

Expand Down
61 changes: 61 additions & 0 deletions helpers/data_units.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package helpers

import (
"fmt"
"strconv"
"strings"
)

func ResolveUnionIntOrStringValue(input any) int {
switch value := input.(type) {
case int:
return value
case uint:
return int(value)
case string:
intValue, err := strconv.ParseInt(value, 10, 64)
if err == nil {
return int(intValue)
}

result, err := convertSizeUnitToByte(value)
if err != nil {
panic(err)
}

return result
}

return 0
}

func convertSizeUnitToByte(str string) (int, error) {
if len(str) < 2 {
return 0, fmt.Errorf("invalid input: %s", str)
}

// Extract the numeric part of the input
sizeStr := str[:len(str)-2]
sizeStr = strings.TrimSpace(sizeStr)
sizeStr = strings.ReplaceAll(sizeStr, ",", ".")

size, err := strconv.ParseFloat(sizeStr, 64)
if err != nil {
return 0, fmt.Errorf("cannot extract numeric part for the input %s, err = %w", str, err)
}

// Determine the unit (B, KB, MB, GB)
unit := str[len(str)-2:]
switch strings.ToUpper(unit) {
case "B":
return int(size), nil
case "KB":
return int(size * 1024), nil
case "MB":
return int(size * 1024 * 1024), nil
case "GB":
return int(size * 1024 * 1024 * 1024), nil
default:
return 0, fmt.Errorf("unsupported unit: %s, you can specify one of B, KB, MB and GB", unit)
}
}
85 changes: 85 additions & 0 deletions helpers/data_units_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package helpers

import "testing"

func TestDcp_ResolveConnectionBufferSize(t *testing.T) {
tests := []struct {
input any
name string
want int
}{
{
name: "When_Client_Gives_Int_Value",
input: 20971520,
want: 20971520,
},
{
name: "When_Client_Gives_UInt_Value",
input: uint(10971520),
want: 10971520,
},
{
name: "When_Client_Gives_StringInt_Value",
input: "15971520",
want: 15971520,
},
{
name: "When_Client_Gives_KB_Value",
input: "500kb",
want: 500 * 1024,
},
{
name: "When_Client_Gives_MB_Value",
input: "10mb",
want: 10 * 1024 * 1024,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ResolveUnionIntOrStringValue(tt.input); got != tt.want {
t.Errorf("ResolveConnectionBufferSize() = %v, want %v", got, tt.want)
}
})
}
}

func TestConvertToBytes(t *testing.T) {
testCases := []struct {
input string
expected int
err bool
}{
{"1kb", 1024, false},
{"5mb", 5 * 1024 * 1024, false},
{"5,5mb", 5.5 * 1024 * 1024, false},
{"8.5mb", 8.5 * 1024 * 1024, false},
{"10,25 mb", 10.25 * 1024 * 1024, false},
{"10gb", 10 * 1024 * 1024 * 1024, false},
{"1KB", 1024, false},
{"5MB", 5 * 1024 * 1024, false},
{"12 MB", 12 * 1024 * 1024, false},
{"10GB", 10 * 1024 * 1024 * 1024, false},
{"123", 0, true},
{"15TB", 0, true},
{"invalid", 0, true},
{"", 0, true},
{"123 KB", 123 * 1024, false},
{"1 MB", 1 * 1024 * 1024, false},
}

for _, tc := range testCases {
result, err := convertSizeUnitToByte(tc.input)

if tc.err && err == nil {
t.Errorf("Expected an error for input %s, but got none", tc.input)
}

if !tc.err && err != nil {
t.Errorf("Unexpected error for input %s: %v", tc.input, err)
}

if result != tc.expected {
t.Errorf("For input %s, expected %d bytes, but got %d", tc.input, tc.expected, result)
}
}
}