From 615aad997ab60a8123d5660d61497fce67f16948 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 13 Dec 2023 15:59:53 -0800 Subject: [PATCH] Improve carbonexporter, add retry and queue, use standard configs (#29862) Signed-off-by: Bogdan Drutu --- .chloggen/carbonexporter.yaml | 22 ++++++++++++++ .chloggen/carbonexporter_enhance.yaml | 13 ++++++++ exporter/carbonexporter/README.md | 9 ++++-- exporter/carbonexporter/config.go | 20 ++++++------- exporter/carbonexporter/config_test.go | 31 +++++++++++++++++--- exporter/carbonexporter/exporter.go | 5 +++- exporter/carbonexporter/exporter_test.go | 4 ++- exporter/carbonexporter/factory.go | 10 +++++-- exporter/carbonexporter/go.mod | 1 + exporter/carbonexporter/go.sum | 2 ++ exporter/carbonexporter/testdata/config.yaml | 11 +++++++ testbed/datasenders/carbon.go | 10 +++++-- 12 files changed, 116 insertions(+), 22 deletions(-) create mode 100755 .chloggen/carbonexporter.yaml create mode 100755 .chloggen/carbonexporter_enhance.yaml diff --git a/.chloggen/carbonexporter.yaml b/.chloggen/carbonexporter.yaml new file mode 100755 index 000000000000..647bcda92930 --- /dev/null +++ b/.chloggen/carbonexporter.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "breaking" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "carbonexporter" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Change Config member names" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29862] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/carbonexporter_enhance.yaml b/.chloggen/carbonexporter_enhance.yaml new file mode 100755 index 000000000000..362a3e9ac281 --- /dev/null +++ b/.chloggen/carbonexporter_enhance.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "carbonexporter" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add retry and queue, use standard configs" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29862] diff --git a/exporter/carbonexporter/README.md b/exporter/carbonexporter/README.md index 327a66fda990..55f7e315b4f8 100644 --- a/exporter/carbonexporter/README.md +++ b/exporter/carbonexporter/README.md @@ -24,8 +24,6 @@ The following settings are required: - `endpoint` (default = `localhost:2003`): Address and port that the exporter should send data to. -- `timeout` (default = `5s`): Maximum duration allowed to connect - and send data to the configured `endpoint`. Example: @@ -45,3 +43,10 @@ exporters: The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). + +## Advanced Configuration + +Several helper files are leveraged to provide additional capabilities automatically: + +- [net settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confignet/README.md) +- [Queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) diff --git a/exporter/carbonexporter/config.go b/exporter/carbonexporter/config.go index ead7eee80219..59b1811a20e4 100644 --- a/exporter/carbonexporter/config.go +++ b/exporter/carbonexporter/config.go @@ -7,26 +7,26 @@ import ( "errors" "fmt" "net" - "time" + + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) // Defaults for not specified configuration settings. const ( - DefaultEndpoint = "localhost:2003" - DefaultSendTimeout = 5 * time.Second + defaultEndpoint = "localhost:2003" ) // Config defines configuration for Carbon exporter. type Config struct { - - // Endpoint specifies host and port to send metrics in the Carbon plaintext - // format. The default value is defined by the DefaultEndpoint constant. - Endpoint string `mapstructure:"endpoint"` + // Specifies the connection endpoint config. The default value is "localhost:2003". + confignet.TCPAddr `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. // Timeout is the maximum duration allowed to connecting and sending the - // data to the Carbon/Graphite backend. - // The default value is defined by the DefaultSendTimeout constant. - Timeout time.Duration `mapstructure:"timeout"` + // data to the Carbon/Graphite backend. The default value is 5s. + exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + QueueConfig exporterhelper.QueueSettings `mapstructure:"sending_queue"` + RetryConfig exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` } func (cfg *Config) Validate() error { diff --git a/exporter/carbonexporter/config_test.go b/exporter/carbonexporter/config_test.go index 07cfaf48e42b..b85392c3a448 100644 --- a/exporter/carbonexporter/config_test.go +++ b/exporter/carbonexporter/config_test.go @@ -11,7 +11,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter/internal/metadata" ) @@ -35,8 +37,25 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, "allsettings"), expected: &Config{ - Endpoint: "localhost:8080", - Timeout: 10 * time.Second, + TCPAddr: confignet.TCPAddr{ + Endpoint: "localhost:8080", + }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + RetryConfig: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 10 * time.Second, + RandomizationFactor: 0.7, + Multiplier: 3.14, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + QueueConfig: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + }, }, }, } @@ -69,14 +88,18 @@ func TestValidateConfig(t *testing.T) { { name: "invalid_tcp_addr", config: &Config{ - Endpoint: "http://localhost:2003", + TCPAddr: confignet.TCPAddr{ + Endpoint: "http://localhost:2003", + }, }, wantErr: true, }, { name: "invalid_timeout", config: &Config{ - Timeout: -5 * time.Second, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: -5 * time.Second, + }, }, wantErr: true, }, diff --git a/exporter/carbonexporter/exporter.go b/exporter/carbonexporter/exporter.go index e6a8fbc62ccb..2b18fdd71461 100644 --- a/exporter/carbonexporter/exporter.go +++ b/exporter/carbonexporter/exporter.go @@ -25,6 +25,9 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri set, cfg, sender.pushMetricsData, + // We don't use exporterhelper.WithTimeout because the TCP connection does not accept writing with context. + exporterhelper.WithQueue(cfg.QueueConfig), + exporterhelper.WithRetry(cfg.RetryConfig), exporterhelper.WithShutdown(sender.Shutdown)) } @@ -57,7 +60,7 @@ func (cs *carbonSender) Shutdown(context.Context) error { // https://github.com/signalfx/gateway/blob/master/protocol/carbon/conn_pool.go // but not its implementation). // -// It keeps a unbounded "stack" of TCPConn instances always "popping" the most +// It keeps an unbounded "stack" of TCPConn instances always "popping" the most // recently returned to the pool. There is no accounting to terminating old // unused connections as that was the case on the prior art mentioned above. type connPool struct { diff --git a/exporter/carbonexporter/exporter_test.go b/exporter/carbonexporter/exporter_test.go index a97f61cf5e32..94561932e446 100644 --- a/exporter/carbonexporter/exporter_test.go +++ b/exporter/carbonexporter/exporter_test.go @@ -20,6 +20,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confignet" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -98,7 +100,7 @@ func TestConsumeMetricsData(t *testing.T) { defer ln.Close() } - config := &Config{Endpoint: addr, Timeout: 1000 * time.Millisecond} + config := &Config{TCPAddr: confignet.TCPAddr{Endpoint: addr}, TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1000 * time.Millisecond}} exp, err := newCarbonExporter(config, exportertest.NewNopCreateSettings()) require.NoError(t, err) diff --git a/exporter/carbonexporter/factory.go b/exporter/carbonexporter/factory.go index 87d5666332e4..d4bdfed0e04e 100644 --- a/exporter/carbonexporter/factory.go +++ b/exporter/carbonexporter/factory.go @@ -7,7 +7,9 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter/internal/metadata" ) @@ -22,8 +24,12 @@ func NewFactory() exporter.Factory { func createDefaultConfig() component.Config { return &Config{ - Endpoint: DefaultEndpoint, - Timeout: DefaultSendTimeout, + TCPAddr: confignet.TCPAddr{ + Endpoint: defaultEndpoint, + }, + TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), + QueueConfig: exporterhelper.NewDefaultQueueSettings(), + RetryConfig: exporterhelper.NewDefaultRetrySettings(), } } diff --git a/exporter/carbonexporter/go.mod b/exporter/carbonexporter/go.mod index 621283cb2ec1..116b2249aba3 100644 --- a/exporter/carbonexporter/go.mod +++ b/exporter/carbonexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.91.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.91.0 + go.opentelemetry.io/collector/config/confignet v0.91.0 go.opentelemetry.io/collector/confmap v0.91.0 go.opentelemetry.io/collector/exporter v0.91.0 go.opentelemetry.io/collector/pdata v1.0.0 diff --git a/exporter/carbonexporter/go.sum b/exporter/carbonexporter/go.sum index d741f02ad8c2..4895414681f0 100644 --- a/exporter/carbonexporter/go.sum +++ b/exporter/carbonexporter/go.sum @@ -99,6 +99,8 @@ go.opentelemetry.io/collector v0.91.0 h1:C7sGUJDJ5nwm+CkWpAaVP3lNsuYpwSRbkmLncFj go.opentelemetry.io/collector v0.91.0/go.mod h1:YhQpIDZsn+bICAAqgBwXk9wqK8GKZDv+aogfG52zUuE= go.opentelemetry.io/collector/component v0.91.0 h1:aBT1i2zGyfh9PalYJLfXVvQp+osHyalwyDFselI1CtA= go.opentelemetry.io/collector/component v0.91.0/go.mod h1:2KBHvjNFdU7oOjsObQeC4Ta2Ef607OISU5obznW00fw= +go.opentelemetry.io/collector/config/confignet v0.91.0 h1:3huNXh04O3wXaN4qPhmmiefyz4dYbOlNcR/OKMByqig= +go.opentelemetry.io/collector/config/confignet v0.91.0/go.mod h1:cpO8JYWGONaViOygKVw+Hd2UoBcn2cUiyi0WWeFTwJY= go.opentelemetry.io/collector/config/configtelemetry v0.91.0 h1:mEwvqrYfwUJ7LwYfpcF9M8z7LHFoYaKhEPhnERD/88E= go.opentelemetry.io/collector/config/configtelemetry v0.91.0/go.mod h1:+LAXM5WFMW/UbTlAuSs6L/W72WC+q8TBJt/6z39FPOU= go.opentelemetry.io/collector/confmap v0.91.0 h1:7U2MT+u74oEzq/WWrpXSLKB7nX5jPNC4drwtQdYfwKk= diff --git a/exporter/carbonexporter/testdata/config.yaml b/exporter/carbonexporter/testdata/config.yaml index 991b655ea415..b180460d5bf4 100644 --- a/exporter/carbonexporter/testdata/config.yaml +++ b/exporter/carbonexporter/testdata/config.yaml @@ -8,3 +8,14 @@ carbon/allsettings: # data to the Carbon/Graphite backend. # The default is 5 seconds. timeout: 10s + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 10 + retry_on_failure: + enabled: true + initial_interval: 10s + randomization_factor: 0.7 + multiplier: 3.14 + max_interval: 60s + max_elapsed_time: 10m diff --git a/testbed/datasenders/carbon.go b/testbed/datasenders/carbon.go index d3bf33dce248..e6a67fc13a24 100644 --- a/testbed/datasenders/carbon.go +++ b/testbed/datasenders/carbon.go @@ -8,7 +8,9 @@ import ( "fmt" "time" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exportertest" "go.uber.org/zap" @@ -40,8 +42,12 @@ func NewCarbonDataSender(port int) *CarbonDataSender { func (cs *CarbonDataSender) Start() error { factory := carbonexporter.NewFactory() cfg := &carbonexporter.Config{ - Endpoint: cs.GetEndpoint().String(), - Timeout: 5 * time.Second, + TCPAddr: confignet.TCPAddr{ + Endpoint: cs.GetEndpoint().String(), + }, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 5 * time.Second, + }, } params := exportertest.NewNopCreateSettings() params.Logger = zap.L()