Skip to content

Commit

Permalink
Improve carbonexporter, add retry and queue, use standard configs (op…
Browse files Browse the repository at this point in the history
…en-telemetry#29862)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Dec 13, 2023
1 parent 0b59407 commit 615aad9
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 22 deletions.
22 changes: 22 additions & 0 deletions .chloggen/carbonexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "breaking"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "carbonexporter"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Change Config member names"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29862]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
13 changes: 13 additions & 0 deletions .chloggen/carbonexporter_enhance.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "carbonexporter"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add retry and queue, use standard configs"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29862]
9 changes: 7 additions & 2 deletions exporter/carbonexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ The following settings are required:

- `endpoint` (default = `localhost:2003`): Address and port that the
exporter should send data to.
- `timeout` (default = `5s`): Maximum duration allowed to connect
and send data to the configured `endpoint`.

Example:

Expand All @@ -45,3 +43,10 @@ exporters:
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
## Advanced Configuration
Several helper files are leveraged to provide additional capabilities automatically:
- [net settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confignet/README.md)
- [Queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
20 changes: 10 additions & 10 deletions exporter/carbonexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ import (
"errors"
"fmt"
"net"
"time"

"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Defaults for not specified configuration settings.
const (
DefaultEndpoint = "localhost:2003"
DefaultSendTimeout = 5 * time.Second
defaultEndpoint = "localhost:2003"
)

// Config defines configuration for Carbon exporter.
type Config struct {

// Endpoint specifies host and port to send metrics in the Carbon plaintext
// format. The default value is defined by the DefaultEndpoint constant.
Endpoint string `mapstructure:"endpoint"`
// Specifies the connection endpoint config. The default value is "localhost:2003".
confignet.TCPAddr `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// Timeout is the maximum duration allowed to connecting and sending the
// data to the Carbon/Graphite backend.
// The default value is defined by the DefaultSendTimeout constant.
Timeout time.Duration `mapstructure:"timeout"`
// data to the Carbon/Graphite backend. The default value is 5s.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueSettings `mapstructure:"sending_queue"`
RetryConfig exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
}

func (cfg *Config) Validate() error {
Expand Down
31 changes: 27 additions & 4 deletions exporter/carbonexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter/internal/metadata"
)
Expand All @@ -35,8 +37,25 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "allsettings"),
expected: &Config{
Endpoint: "localhost:8080",
Timeout: 10 * time.Second,
TCPAddr: confignet.TCPAddr{
Endpoint: "localhost:8080",
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetryConfig: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
RandomizationFactor: 0.7,
Multiplier: 3.14,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueConfig: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
},
},
}
Expand Down Expand Up @@ -69,14 +88,18 @@ func TestValidateConfig(t *testing.T) {
{
name: "invalid_tcp_addr",
config: &Config{
Endpoint: "http://localhost:2003",
TCPAddr: confignet.TCPAddr{
Endpoint: "http://localhost:2003",
},
},
wantErr: true,
},
{
name: "invalid_timeout",
config: &Config{
Timeout: -5 * time.Second,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: -5 * time.Second,
},
},
wantErr: true,
},
Expand Down
5 changes: 4 additions & 1 deletion exporter/carbonexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
set,
cfg,
sender.pushMetricsData,
// We don't use exporterhelper.WithTimeout because the TCP connection does not accept writing with context.
exporterhelper.WithQueue(cfg.QueueConfig),
exporterhelper.WithRetry(cfg.RetryConfig),
exporterhelper.WithShutdown(sender.Shutdown))
}

Expand Down Expand Up @@ -57,7 +60,7 @@ func (cs *carbonSender) Shutdown(context.Context) error {
// https://github.com/signalfx/gateway/blob/master/protocol/carbon/conn_pool.go
// but not its implementation).
//
// It keeps a unbounded "stack" of TCPConn instances always "popping" the most
// It keeps an unbounded "stack" of TCPConn instances always "popping" the most
// recently returned to the pool. There is no accounting to terminating old
// unused connections as that was the case on the prior art mentioned above.
type connPool struct {
Expand Down
4 changes: 3 additions & 1 deletion exporter/carbonexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -98,7 +100,7 @@ func TestConsumeMetricsData(t *testing.T) {
defer ln.Close()
}

config := &Config{Endpoint: addr, Timeout: 1000 * time.Millisecond}
config := &Config{TCPAddr: confignet.TCPAddr{Endpoint: addr}, TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1000 * time.Millisecond}}
exp, err := newCarbonExporter(config, exportertest.NewNopCreateSettings())
require.NoError(t, err)

Expand Down
10 changes: 8 additions & 2 deletions exporter/carbonexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/carbonexporter/internal/metadata"
)
Expand All @@ -22,8 +24,12 @@ func NewFactory() exporter.Factory {

func createDefaultConfig() component.Config {
return &Config{
Endpoint: DefaultEndpoint,
Timeout: DefaultSendTimeout,
TCPAddr: confignet.TCPAddr{
Endpoint: defaultEndpoint,
},
TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(),
QueueConfig: exporterhelper.NewDefaultQueueSettings(),
RetryConfig: exporterhelper.NewDefaultRetrySettings(),
}
}

Expand Down
1 change: 1 addition & 0 deletions exporter/carbonexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.91.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.91.0
go.opentelemetry.io/collector/config/confignet v0.91.0
go.opentelemetry.io/collector/confmap v0.91.0
go.opentelemetry.io/collector/exporter v0.91.0
go.opentelemetry.io/collector/pdata v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions exporter/carbonexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions exporter/carbonexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,14 @@ carbon/allsettings:
# data to the Carbon/Graphite backend.
# The default is 5 seconds.
timeout: 10s
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
randomization_factor: 0.7
multiplier: 3.14
max_interval: 60s
max_elapsed_time: 10m
10 changes: 8 additions & 2 deletions testbed/datasenders/carbon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"fmt"
"time"

"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"

Expand Down Expand Up @@ -40,8 +42,12 @@ func NewCarbonDataSender(port int) *CarbonDataSender {
func (cs *CarbonDataSender) Start() error {
factory := carbonexporter.NewFactory()
cfg := &carbonexporter.Config{
Endpoint: cs.GetEndpoint().String(),
Timeout: 5 * time.Second,
TCPAddr: confignet.TCPAddr{
Endpoint: cs.GetEndpoint().String(),
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 5 * time.Second,
},
}
params := exportertest.NewNopCreateSettings()
params.Logger = zap.L()
Expand Down

0 comments on commit 615aad9

Please sign in to comment.