diff --git a/changelogs/8.16.asciidoc b/changelogs/8.16.asciidoc index 96e1a0a0cfc..b4d50ab837f 100644 --- a/changelogs/8.16.asciidoc +++ b/changelogs/8.16.asciidoc @@ -30,3 +30,4 @@ https://github.com/elastic/apm-server/compare/v8.15.2\...v8.16.0[View commits] ==== Added - APM Server will no longer retry an HTTP request that returned 502s, 503s, 504s. It will only retry 429s. {pull}13523[13523] +- APM Server now supports emitting distributed tracing for its own operation when running under Elastic Agent, and adds support for configuring a sampling rate {pull}14231[14231] diff --git a/internal/beatcmd/beat_test.go b/internal/beatcmd/beat_test.go index 2599647fc21..4f71188a9d8 100644 --- a/internal/beatcmd/beat_test.go +++ b/internal/beatcmd/beat_test.go @@ -183,7 +183,7 @@ func TestRunManager(t *testing.T) { }, }, "instrumentation": map[string]interface{}{ - "enabled": false, + "enabled": true, "environment": "testenv", }, }, m) diff --git a/internal/beatcmd/reloader.go b/internal/beatcmd/reloader.go index 297fa328cc9..2e98a2c9f7f 100644 --- a/internal/beatcmd/reloader.go +++ b/internal/beatcmd/reloader.go @@ -195,8 +195,7 @@ func (r *Reloader) reload(inputConfig, outputConfig, apmTracingConfig *config.C) return fmt.Errorf("APM tracing config for elastic not found") } // set enabled manually as APMConfig doesn't contain it. - // TODO set "enable" to true after the issue https://github.com/elastic/elastic-agent/issues/5211 gets resolved. - c.SetBool("enabled", -1, false) + c.SetBool("enabled", -1, true) wrappedApmTracingConfig = config.MustNewConfigFrom(map[string]interface{}{ "instrumentation": c, }) diff --git a/internal/beatcmd/reloader_test.go b/internal/beatcmd/reloader_test.go index 83a7fb38b6e..0dbd88195f3 100644 --- a/internal/beatcmd/reloader_test.go +++ b/internal/beatcmd/reloader_test.go @@ -186,7 +186,7 @@ func TestReloaderNewRunnerParams(t *testing.T) { args := <-calls assert.NotNil(t, args.Logger) assert.Equal(t, info, args.Info) - assert.Equal(t, config.MustNewConfigFrom(`{"revision": 1, "input": 123, "output.console.enabled": true, "instrumentation.enabled":false, "instrumentation.environment":"test"}`), args.Config) + assert.Equal(t, config.MustNewConfigFrom(`{"revision": 1, "input": 123, "output.console.enabled": true, "instrumentation.enabled":true, "instrumentation.environment":"test"}`), args.Config) } func expectNoEvent(t testing.TB, ch <-chan struct{}, message string) { diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 05549a5a45a..d48169e38e1 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -26,6 +26,7 @@ import ( "net/http" "os" "runtime" + "strconv" "time" "github.com/dustin/go-humanize" @@ -525,6 +526,7 @@ func newInstrumentation(rawConfig *agentconfig.C) (instrumentation.Instrumentati ServerCertificate string `config:"servercert"` ServerCA string `config:"serverca"` } `config:"tls"` + SamplingRate *float32 `config:"samplingrate"` } cfg, err := rawConfig.Child("instrumentation", -1) if err != nil || !cfg.Enabled() { @@ -541,6 +543,7 @@ func newInstrumentation(rawConfig *agentconfig.C) (instrumentation.Instrumentati envServerCert = "ELASTIC_APM_SERVER_CERT" envCACert = "ELASTIC_APM_SERVER_CA_CERT_FILE" envGlobalLabels = "ELASTIC_APM_GLOBAL_LABELS" + envSamplingRate = "ELASTIC_APM_TRANSACTION_SAMPLE_RATE" ) if apmCfg.APIKey != "" { os.Setenv(envAPIKey, apmCfg.APIKey) @@ -566,6 +569,11 @@ func newInstrumentation(rawConfig *agentconfig.C) (instrumentation.Instrumentati os.Setenv(envGlobalLabels, apmCfg.GlobalLabels) defer os.Unsetenv(envGlobalLabels) } + if apmCfg.SamplingRate != nil { + r := max(min(*apmCfg.SamplingRate, 1.0), 0.0) + os.Setenv(envSamplingRate, strconv.FormatFloat(float64(r), 'f', -1, 32)) + defer os.Unsetenv(envSamplingRate) + } return instrumentation.New(rawConfig, "apm-server", version.Version) } diff --git a/internal/beater/beater_test.go b/internal/beater/beater_test.go index 289ef975303..5a52994a6b2 100644 --- a/internal/beater/beater_test.go +++ b/internal/beater/beater_test.go @@ -24,10 +24,12 @@ import ( "encoding/pem" "errors" "fmt" + "io" "net/http" "net/http/httptest" "os" "path/filepath" + "strings" "testing" "time" @@ -285,6 +287,46 @@ func TestNewInstrumentation(t *testing.T) { assert.Equal(t, "Bearer secret", auth) } +func TestNewInstrumentationWithSampling(t *testing.T) { + runSampled := func(rate float32) { + var events int + s := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/intake/v2/events" { + zr, _ := zlib.NewReader(r.Body) + b, _ := io.ReadAll(zr) + // Skip metadata and transaction keys, only count span. + events = strings.Count(string(b), "\n") - 2 + } + w.WriteHeader(http.StatusOK) + })) + defer s.Close() + cfg := agentconfig.MustNewConfigFrom(map[string]interface{}{ + "instrumentation": map[string]interface{}{ + "enabled": true, + "hosts": []string{s.URL}, + "tls": map[string]interface{}{ + "skipverify": true, + }, + "samplingrate": fmt.Sprintf("%f", rate), + }, + }) + i, err := newInstrumentation(cfg) + require.NoError(t, err) + tracer := i.Tracer() + tr := tracer.StartTransaction("name", "type") + tr.StartSpan("span", "type", nil).End() + tr.End() + tracer.Flush(nil) + assert.Equal(t, int(rate), events) + } + t.Run("100% sampling", func(t *testing.T) { + runSampled(1.0) + }) + t.Run("0% sampling", func(t *testing.T) { + runSampled(0.0) + }) +} + func TestProcessMemoryLimit(t *testing.T) { l := logp.NewLogger("test") const gb = 1 << 30