Skip to content

Commit

Permalink
feat: add reload listener for apm tracing config (#13514)
Browse files Browse the repository at this point in the history
* feat: add reload listener for apm tracing config
  • Loading branch information
kyungeunni authored Jul 2, 2024
1 parent 8655204 commit dfb1735
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ https://github.com/elastic/apm-server/compare/8.14\...main[View commits]
- Upgraded bundled APM Java agent attacher CLI to version 1.50.0 {pull}13326[13326]
- Enable Kibana curated UIs to work with hostmetrics from OpenTelemetry's https://pkg.go.dev/go.opentelemetry.io/collector/receiver/hostmetricsreceiver[hostmetricsreceiver] {pull}13196[13196]
- Add require data stream to bulk index requests {pull}13398[13398]
- Support self-instrumentation when in managed mode by getting tracing configs via reloader {pull}13514[13514]
9 changes: 9 additions & 0 deletions internal/beatcmd/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func TestRunManager(t *testing.T) {
})
assert.NoError(t, err)

expectRunnerParams(t, calls)
err = reload.RegisterV2.GetReloadableAPM().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"elastic.enabled": true, "elastic.environment": "testenv"}`),
})
assert.NoError(t, err)
args := expectRunnerParams(t, calls)
var m map[string]interface{}
err = args.Config.Unpack(&m)
Expand All @@ -177,6 +182,10 @@ func TestRunManager(t *testing.T) {
"enabled": true,
},
},
"instrumentation": map[string]interface{}{
"enabled": true,
"environment": "testenv",
},
}, m)

require.NotNil(t, manager.stopCallback)
Expand Down
55 changes: 46 additions & 9 deletions internal/beatcmd/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func NewReloader(info beat.Info, newRunner NewRunnerFunc) (*Reloader, error) {
if err := reload.RegisterV2.Register(reload.OutputRegName, reload.ReloadableFunc(r.reloadOutput)); err != nil {
return nil, fmt.Errorf("failed to register output reloader: %w", err)
}
if err := reload.RegisterV2.Register(reload.APMRegName, reload.ReloadableFunc(r.reloadAPMTracing)); err != nil {
return nil, fmt.Errorf("failed to register apm tracing reloader: %w", err)
}
return r, nil
}

Expand All @@ -84,10 +87,11 @@ type Reloader struct {
runner Runner
stopRunner func() error

mu sync.Mutex
inputConfig *config.C
outputConfig *config.C
stopped chan struct{}
mu sync.Mutex
inputConfig *config.C
outputConfig *config.C
apmTracingConfig *config.C
stopped chan struct{}
}

// Run runs the Reloader, blocking until ctx is cancelled or a fatal error occurs.
Expand Down Expand Up @@ -122,7 +126,7 @@ func (r *Reloader) reloadInputs(configs []*reload.ConfigWithMeta) error {
return fmt.Errorf("failed to extract input config revision: %w", err)
}

if err := r.reload(cfg, r.outputConfig); err != nil {
if err := r.reload(cfg, r.outputConfig, r.apmTracingConfig); err != nil {
return fmt.Errorf("failed to load input config: %w", err)
}
r.inputConfig = cfg
Expand All @@ -136,15 +140,31 @@ func (r *Reloader) reloadInputs(configs []*reload.ConfigWithMeta) error {
func (r *Reloader) reloadOutput(cfg *reload.ConfigWithMeta) error {
r.mu.Lock()
defer r.mu.Unlock()
if err := r.reload(r.inputConfig, cfg.Config); err != nil {
if err := r.reload(r.inputConfig, cfg.Config, r.apmTracingConfig); err != nil {
return fmt.Errorf("failed to load output config: %w", err)
}
r.outputConfig = cfg.Config
r.logger.Info("loaded output config")
return nil
}

func (r *Reloader) reload(inputConfig, outputConfig *config.C) error {
// reloadAPMTracing (re)loads apm tracing configuration.
func (r *Reloader) reloadAPMTracing(cfg *reload.ConfigWithMeta) error {
r.mu.Lock()
defer r.mu.Unlock()
var c *config.C
if cfg != nil {
c = cfg.Config
}
if err := r.reload(r.inputConfig, r.outputConfig, c); err != nil {
return fmt.Errorf("failed to load apm tracing config: %w", err)
}
r.apmTracingConfig = c
r.logger.Info("loaded apm tracing config")
return nil
}

func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C) error {
var outputNamespace config.Namespace
if outputConfig != nil {
if err := outputConfig.Unpack(&outputNamespace); err != nil {
Expand All @@ -153,6 +173,7 @@ func (r *Reloader) reload(inputConfig, outputConfig *config.C) error {
}
if inputConfig == nil || !outputNamespace.IsSet() {
// Wait until both input and output have been received.
// apm tracing config is not mandatory so not waiting for it
return nil
}
select {
Expand All @@ -165,11 +186,27 @@ func (r *Reloader) reload(inputConfig, outputConfig *config.C) error {
wrappedOutputConfig := config.MustNewConfigFrom(map[string]interface{}{
"output": outputConfig,
})
mergedConfig, err := config.MergeConfigs(inputConfig, wrappedOutputConfig)

var wrappedApmTracingConfig *config.C
// apmTracingConfig is nil when disabled
if apmTracingConfig != nil {
c, err := apmTracingConfig.Child("elastic", -1)
if err != nil {
return fmt.Errorf("APM tracing config for elastic not found")
}
// set enabled manually as APMConfig doesn't contain it
c.SetBool("enabled", -1, true)
wrappedApmTracingConfig = config.MustNewConfigFrom(map[string]interface{}{
"instrumentation": c,
})
} else {
// empty instrumentation config
wrappedApmTracingConfig = config.NewConfig()
}
mergedConfig, err := config.MergeConfigs(inputConfig, wrappedOutputConfig, wrappedApmTracingConfig)
if err != nil {
return err
}

// Create a new runner. We separate creation from starting to
// allow the runner to perform initialisations that must run
// synchronously.
Expand Down
27 changes: 24 additions & 3 deletions internal/beatcmd/reloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestReloader(t *testing.T) {
defer func() { assert.NoError(t, g.Wait()) }()
defer cancel()

// No reload until there's input and output configuration.
// No reload until there's input, output, apm tracing configuration.
assertNoReload()

err = reload.RegisterV2.GetInputList().Reload([]*reload.ConfigWithMeta{{
Expand All @@ -104,6 +104,10 @@ func TestReloader(t *testing.T) {
assert.NoError(t, err)
assertNoReload() // an output must be set

err = reload.RegisterV2.GetReloadableAPM().Reload(nil)
assert.NoError(t, err)
assertNoReload()

err = reload.RegisterV2.GetReloadableOutput().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"console.enabled": true}`),
})
Expand All @@ -130,8 +134,17 @@ func TestReloader(t *testing.T) {
expectEvent(t, r2.running, "new runner should have been started")
expectNoEvent(t, r2.stopped, "new runner should not have been stopped")

err = reload.RegisterV2.GetReloadableAPM().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"elastic.enabled": true, "elastic.api_key": "boo"}`),
})
assert.NoError(t, err)
r3 := assertReload()
expectEvent(t, r2.stopped, "old runner should have been stopped")
expectEvent(t, r3.running, "new runner should have been started")
expectNoEvent(t, r3.stopped, "new runner should not have been stopped")

cancel()
expectEvent(t, r2.stopped, "runner should have been stopped")
expectEvent(t, r3.stopped, "runner should have been stopped")
}

func TestReloaderNewRunnerParams(t *testing.T) {
Expand Down Expand Up @@ -159,13 +172,21 @@ func TestReloaderNewRunnerParams(t *testing.T) {
reload.RegisterV2.GetInputList().Reload([]*reload.ConfigWithMeta{{
Config: config.MustNewConfigFrom(`{"revision": 1, "input": 123}`),
}})

// reloader will wait until input and output are available.
// triggering APM reload before output reload will let the params to contain
// the apm tracing config too in this test setup
reload.RegisterV2.GetReloadableAPM().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"elastic.environment": "test"}`),
})

reload.RegisterV2.GetReloadableOutput().Reload(&reload.ConfigWithMeta{
Config: config.MustNewConfigFrom(`{"console.enabled": true}`),
})
args := <-calls
assert.NotNil(t, args.Logger)
assert.Equal(t, info, args.Info)
assert.Equal(t, config.MustNewConfigFrom(`{"revision": 1, "input": 123, "output.console.enabled": true}`), args.Config)
assert.Equal(t, config.MustNewConfigFrom(`{"revision": 1, "input": 123, "output.console.enabled": true, "instrumentation.enabled":true, "instrumentation.environment":"test"}`), args.Config)
}

func expectNoEvent(t testing.TB, ch <-chan struct{}, message string) {
Expand Down

0 comments on commit dfb1735

Please sign in to comment.