diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6f049e46b06..1c2c9316cd7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -59,7 +59,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553] - Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619] -- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025] - Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation - Add support for Kibana status metricset in v8 format {pull}40275[40275] diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 2f5a43d2dc4..2fbfea0d1ad 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -19,7 +19,7 @@ package helper import ( "fmt" - "io" + "io/ioutil" "net" "net/http" "net/http/httptest" @@ -31,7 +31,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -56,7 +55,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { content := []byte(test.Content) - tmpfile, err := os.CreateTemp("", "token") + tmpfile, err := ioutil.TempFile("", "token") if err != nil { t.Fatal(err) } @@ -237,14 +236,14 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - go http.Serve(l, mux) //nolint:all // Ignore the error + go http.Serve(l, mux) return l } for title, c := range cases { t.Run(title, func(t *testing.T) { - tmpDir, err := os.MkdirTemp("", "testsocket") + tmpDir, err := ioutil.TempDir("", "testsocket") require.NoError(t, err) defer os.RemoveAll(tmpDir) @@ -263,7 +262,7 @@ func TestOverUnixSocket(t *testing.T) { r, err := h.FetchResponse() require.NoError(t, err) defer r.Body.Close() - content, err := io.ReadAll(r.Body) + content, err := ioutil.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, []byte("ehlo!"), content) }) @@ -328,5 +327,3 @@ func (*dummyModule) Config() mb.ModuleConfig { func (*dummyModule) UnpackConfig(interface{}) error { return nil } -func (dummyModule) UpdateStatus(_ status.Status, _ string) {} -func (dummyModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 0be1db7cef3..7e18dc9029d 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -27,7 +27,6 @@ import ( "net/url" "time" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -63,11 +62,9 @@ const ( // Module is the common interface for all Module implementations. type Module interface { - Name() string // Name returns the name of the Module. - Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. - UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. - UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent. - SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module. + Name() string // Name returns the name of the Module. + Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. + UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. } // BaseModule implements the Module interface. @@ -76,10 +73,9 @@ type Module interface { // MetricSets, it can embed this type into another struct to satisfy the // Module interface requirements. type BaseModule struct { - name string - config ModuleConfig - rawConfig *conf.C - statusReporter status.StatusReporter + name string + config ModuleConfig + rawConfig *conf.C } func (m *BaseModule) String() string { @@ -99,18 +95,6 @@ func (m *BaseModule) UnpackConfig(to interface{}) error { return m.rawConfig.Unpack(to) } -// UpdateStatus updates the status of the module. Reflected on elastic-agent. -func (m *BaseModule) UpdateStatus(status status.Status, msg string) { - if m.statusReporter != nil { - m.statusReporter.UpdateStatus(status, msg) - } -} - -// SetStatusReporter sets the status repoter of the module. -func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) { - m.statusReporter = statusReporter -} - // WithConfig re-configures the module with the given raw configuration and returns a // copy of the module. // Intended to be called from module factories. Note that if metricsets are specified diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index aedb443e9a8..1b0a621d705 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -124,7 +123,3 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup { func (mr *runner) String() string { return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets)) } - -func (mr *runner) SetStatusReporter(reporter status.StatusReporter) { - mr.mod.SetStatusReporter(reporter) -} diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index b4d92d29f56..e020cd87d55 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" - "github.com/elastic/beats/v7/libbeat/management/status" ) type runnerGroup struct { @@ -41,14 +40,6 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner { } } -func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) { - for _, runner := range rg.runners { - if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok { - runnerWithStatus.SetStatusReporter(reporter) - } - } -} - func (rg *runnerGroup) Start() { rg.startOnce.Do(func() { for _, runner := range rg.runners { diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index feaf11363fa..d41bdf01497 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -147,7 +146,6 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event { registry.Add(metricsPath, msw.Metrics(), monitoring.Full) monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String()) - msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name())) msw.run(done, out) }(msw) } @@ -255,20 +253,14 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { err := fetcher.Fetch(reporter.V2()) if err != nil { reporter.V2().Error(err) - msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) - } else { - msw.module.UpdateStatus(status.Running, "") } case mb.ReportingMetricSetV2WithContext: reporter.StartFetchTimer() err := fetcher.Fetch(ctx, reporter.V2()) if err != nil { reporter.V2().Error(err) - msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) - } else { - msw.module.UpdateStatus(status.Running, "") } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 736bb1f40e6..8c6e09df537 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -53,7 +53,6 @@ that Metricbeat does it and with the same validations. } } */ - package testing import ( @@ -61,7 +60,6 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/go-concert/timed" "github.com/elastic/beats/v7/metricbeat/mb" @@ -74,11 +72,9 @@ type TestModule struct { RawConfig *conf.C } -func (m *TestModule) Name() string { return m.ModName } -func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } -func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } -func (m *TestModule) UpdateStatus(_ status.Status, _ string) {} -func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {} +func (m *TestModule) Name() string { return m.ModName } +func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } +func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } func NewTestModule(t testing.TB, config interface{}) *TestModule { c, err := conf.NewConfigFrom(config) diff --git a/metricbeat/module/elasticsearch/node_stats/data_test.go b/metricbeat/module/elasticsearch/node_stats/data_test.go index 2317418eeaf..e6151555701 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_test.go +++ b/metricbeat/module/elasticsearch/node_stats/data_test.go @@ -22,7 +22,6 @@ package node_stats import ( "testing" - "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" ) @@ -61,6 +60,3 @@ func (m mockModule) Config() mb.ModuleConfig { func (m mockModule) UnpackConfig(to interface{}) error { return nil } - -func (m mockModule) UpdateStatus(_ status.Status, _ string) {} -func (m mockModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go deleted file mode 100644 index 5e22332f2e4..00000000000 --- a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go +++ /dev/null @@ -1,220 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package tests - -import ( - "fmt" - "os" - "path/filepath" - "testing" - "time" - - "github.com/elastic/beats/v7/libbeat/common/reload" - lbmanagement "github.com/elastic/beats/v7/libbeat/management" - "github.com/elastic/beats/v7/x-pack/libbeat/management" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - "github.com/stretchr/testify/require" - - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" - - "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" - "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" - - conf "github.com/elastic/elastic-agent-libs/config" -) - -func TestProcessStatusReporter(t *testing.T) { - unitOneID := mock.NewID() - unitOutID := mock.NewID() - token := mock.NewID() - - tests.InitBeatsForTest(t, cmd.RootCmd) - - filename := fmt.Sprintf("test-%d", time.Now().Unix()) - outPath := filepath.Join(os.TempDir(), filename) - t.Logf("writing output to file %s", outPath) - err := os.Mkdir(outPath, 0775) - require.NoError(t, err) - defer func() { - err := os.RemoveAll(outPath) - require.NoError(t, err) - }() - - /* - * process with pid=-1 doesn't exist. This should degrade the input for a while */ - inputStreamIncorrectPid := getInputStream(unitOneID, -1, 1) - /* - * process with valid pid. This should change state to healthy */ - inputStreamCorrectPid := getInputStream(unitOneID, os.Getpid(), 2) - outputExpectedStream := proto.UnitExpected{ - Id: unitOutID, - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - DataStream: &proto.DataStream{ - Namespace: "default", - }, - Type: "file", - Revision: 1, - Meta: &proto.Meta{ - Package: &proto.Package{ - Name: "system", - Version: "1.17.0", - }, - }, - Source: tests.RequireNewStruct(map[string]interface{}{ - "type": "file", - "enabled": true, - "path": outPath, - "filename": "beat-out", - "number_of_files": 7, - }), - }, - } - - observedStates := make(chan *proto.CheckinObserved) - expectedUnits := make(chan []*proto.UnitExpected) - done := make(chan struct{}) - // V2 mock server - server := &mock.StubServerV2{ - CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { - select { - case observedStates <- observed: - return &proto.CheckinExpected{ - Units: <-expectedUnits, - } - case <-done: - return nil - } - }, - ActionImpl: func(response *proto.ActionResponse) error { - return nil - }, - } - require.NoError(t, server.Start()) - defer server.Stop() - - // start the client - client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ - Name: "program", - Meta: map[string]string{ - "key": "value", - }, - }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) - - lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { - c := management.DefaultConfig() - if err := cfg.Unpack(&c); err != nil { - return nil, err - } - return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits) - }) - - go func() { - t.Logf("Running beats...") - err := cmd.RootCmd.Execute() - require.NoError(t, err) - }() - - scenarios := []struct { - expectedStatus proto.State - nextInputunit *proto.UnitExpected - }{ - { - proto.State_HEALTHY, - &inputStreamIncorrectPid, - }, - { - proto.State_DEGRADED, - &inputStreamCorrectPid, - }, - { - proto.State_HEALTHY, - &inputStreamCorrectPid, - }, - // wait for one more checkin, just to be sure it's healthy - { - proto.State_HEALTHY, - &inputStreamCorrectPid, - }, - } - - timer := time.NewTimer(2 * time.Minute) - id := 0 - for id < len(scenarios) { - select { - case observed := <-observedStates: - state := extractState(observed.GetUnits(), unitOneID) - expectedUnits <- []*proto.UnitExpected{ - scenarios[id].nextInputunit, - &outputExpectedStream, - } - if state != scenarios[id].expectedStatus { - continue - } - // always ensure that output is healthy - outputState := extractState(observed.GetUnits(), unitOutID) - require.Equal(t, outputState, proto.State_HEALTHY) - - timer.Reset(2 * time.Minute) - id++ - case <-timer.C: - t.Fatal("timeout waiting for checkin") - default: - } - } -} - -func extractState(units []*proto.UnitObserved, idx string) proto.State { - for _, unit := range units { - if unit.Id == idx { - return unit.GetState() - } - } - return -1 -} - -func getInputStream(id string, pid int, stateIdx int) proto.UnitExpected { - return proto.UnitExpected{ - Id: id, - Type: proto.UnitType_INPUT, - ConfigStateIdx: uint64(stateIdx), - State: proto.State_HEALTHY, - Config: &proto.UnitExpectedConfig{ - DataStream: &proto.DataStream{ - Namespace: "default", - }, - Streams: []*proto.Stream{{ - Id: "system/metrics-system.process-default-system", - DataStream: &proto.DataStream{ - Dataset: "system.process", - Type: "metrics", - }, - Source: tests.RequireNewStruct(map[string]interface{}{ - "metricsets": []interface{}{"process"}, - "process.pid": pid, - }), - }}, - Type: "system/metrics", - Id: "system/metrics-system-default-system", - Name: "system-1", - Revision: 1, - Meta: &proto.Meta{ - Package: &proto.Package{ - Name: "system", - Version: "1.17.0", - }, - }, - }, - } -}