From 2060383cc21f428af17303552878e292ada5ef22 Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Tue, 6 Aug 2024 22:31:45 +0530 Subject: [PATCH] [metricbeat] - Allow metricsets to report their status via v2 protocol (#40400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: initial commit * tests: add integration test cases * fix: expand testing scenarios * fix: add comments * fix: move integration tests to system/process * cleanup * fix: ci * fix: ci and typos * chore: update changelog * fix: add helper * fix: remove extra space * fix: ci * fix: move integration tests to x-pack * fix: add null check * fix: ci * fix: remove unused code * fix: lint * fix: lint and imports * fix: ci windows * inting for windows * fix lint linux * fix: go imports * fix: switch to the generic way * chore: make error descriptive * fix: move status report after fetch * fix: typo * fix: remove nolint * Squashed commit of the following: commit 18d38af46267f7fd8f7c140c6695fe62e3da7f69 Author: Vihas Makwana Date: Wed Jul 24 01:23:54 2024 +0530 fix: add comments commit 806cda4725414332ed5867c788a06289fb509512 Merge: 2e0bd28c14 b5b67a1789 Author: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Wed Jul 24 01:20:38 2024 +0530 Merge branch 'main' into metricbeat-process-multierr commit 2e0bd28c14e5568fcbd5fb82df6e1764fe9f3e5a Author: Vihas Makwana Date: Wed Jul 24 01:20:14 2024 +0530 fix: typo commit 82dc1036b51033f4596f397b9af5f801f4cbbc26 Author: Vihas Makwana Date: Wed Jul 24 01:19:35 2024 +0530 fix: typo commit b5b67a1789f30958d1b4dad3243537a4693863ba Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue Jul 23 18:13:16 2024 +0000 build(deps): bump the azure-sdks group with 2 updates (#40310) * build(deps): bump the azure-sdks group with 2 updates Bumps the azure-sdks group with 2 updates: [github.com/Azure/go-autorest/autorest](https://github.com/Azure/go-autorest) and [github.com/Azure/go-autorest/autorest/adal](https://github.com/Azure/go-autorest). Updates `github.com/Azure/go-autorest/autorest` from 0.11.28 to 0.11.29 - [Release notes](https://github.com/Azure/go-autorest/releases) - [Changelog](https://github.com/Azure/go-autorest/blob/main/CHANGELOG.md) - [Commits](https://github.com/Azure/go-autorest/compare/autorest/v0.11.28...autorest/v0.11.29) Updates `github.com/Azure/go-autorest/autorest/adal` from 0.9.21 to 0.9.22 - [Release notes](https://github.com/Azure/go-autorest/releases) - [Changelog](https://github.com/Azure/go-autorest/blob/main/CHANGELOG.md) - [Commits](https://github.com/Azure/go-autorest/compare/autorest/adal/v0.9.21...autorest/adal/v0.9.22) --- updated-dependencies: - dependency-name: github.com/Azure/go-autorest/autorest dependency-type: direct:production update-type: version-update:semver-patch dependency-group: azure-sdks - dependency-name: github.com/Azure/go-autorest/autorest/adal dependency-type: direct:production update-type: version-update:semver-patch dependency-group: azure-sdks ... Signed-off-by: dependabot[bot] * Update NOTICE.txt --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] commit 197396fa694ed6175a358b9303191bdec0eab348 Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue Jul 23 13:32:02 2024 -0400 build(deps): bump the gcp-sdks group with 9 updates (#40311) * build(deps): bump the gcp-sdks group with 9 updates Bumps the gcp-sdks group with 9 updates: | Package | From | To | | --- | --- | --- | | [cloud.google.com/go/bigquery](https://github.com/googleapis/google-cloud-go) | `1.55.0` | `1.62.0` | | [cloud.google.com/go/monitoring](https://github.com/googleapis/google-cloud-go) | `1.16.0` | `1.20.1` | | [cloud.google.com/go/pubsub](https://github.com/googleapis/google-cloud-go) | `1.33.0` | `1.40.0` | | [cloud.google.com/go/compute](https://github.com/googleapis/google-cloud-go) | `1.23.0` | `1.27.2` | | [cloud.google.com/go/redis](https://github.com/googleapis/google-cloud-go) | `1.13.1` | `1.16.2` | | [cloud.google.com/go/compute/metadata](https://github.com/googleapis/google-cloud-go) | `0.2.3` | `0.4.0` | | [cloud.google.com/go/iam](https://github.com/googleapis/google-cloud-go) | `1.1.2` | `1.1.10` | | [cloud.google.com/go/longrunning](https://github.com/googleapis/google-cloud-go) | `0.5.1` | `0.5.9` | | [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) | `1.30.1` | `1.42.0` | Updates `cloud.google.com/go/bigquery` from 1.55.0 to 1.62.0 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/spanner/v1.55.0...spanner/v1.62.0) Updates `cloud.google.com/go/monitoring` from 1.16.0 to 1.20.1 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/documentai/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/kms/v1.16.0...video/v1.20.1) Updates `cloud.google.com/go/pubsub` from 1.33.0 to 1.40.0 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/pubsub/v1.33.0...pubsub/v1.40.0) Updates `cloud.google.com/go/compute` from 1.23.0 to 1.27.2 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/documentai/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/pubsub/v1.23.0...compute/v1.27.2) Updates `cloud.google.com/go/redis` from 1.13.1 to 1.16.2 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/asset/v1.13.1...redis/v1.16.2) Updates `cloud.google.com/go/compute/metadata` from 0.2.3 to 0.4.0 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/netapp/v0.2.3...v0.4.0) Updates `cloud.google.com/go/iam` from 1.1.2 to 1.1.10 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/iam/v1.1.2...iam/v1.1.10) Updates `cloud.google.com/go/longrunning` from 0.5.1 to 0.5.9 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/auth/v0.5.1...longrunning/v0.5.9) Updates `cloud.google.com/go/storage` from 1.30.1 to 1.42.0 - [Release notes](https://github.com/googleapis/google-cloud-go/releases) - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/documentai/CHANGES.md) - [Commits](https://github.com/googleapis/google-cloud-go/compare/pubsub/v1.30.1...spanner/v1.42.0) --- updated-dependencies: - dependency-name: cloud.google.com/go/bigquery dependency-type: direct:production update-type: version-update:semver-minor dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/monitoring dependency-type: direct:production update-type: version-update:semver-minor dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/pubsub dependency-type: direct:production update-type: version-update:semver-minor dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/compute dependency-type: direct:production update-type: version-update:semver-minor dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/redis dependency-type: direct:production update-type: version-update:semver-minor dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/compute/metadata dependency-type: indirect update-type: version-update:semver-minor dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/iam dependency-type: indirect update-type: version-update:semver-patch dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/longrunning dependency-type: indirect update-type: version-update:semver-patch dependency-group: gcp-sdks - dependency-name: cloud.google.com/go/storage dependency-type: direct:production update-type: version-update:semver-minor dependency-group: gcp-sdks ... Signed-off-by: dependabot[bot] * Update NOTICE.txt --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] commit 8940f7de7d80bf3c05eede3018ea812750e9c1e8 Author: Vihas Makwana Date: Tue Jul 23 21:02:27 2024 +0530 fix: update notice commit 58bc2ff07637fb80d5bf64da257b32ed964d610c Merge: 9433065fc8 dd671a6bce Author: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Tue Jul 23 20:59:16 2024 +0530 Merge branch 'main' into metricbeat-process-multierr commit 9433065fc8ef128bd55226f1d9be1c9838597e45 Author: Vihas Makwana Date: Tue Jul 23 20:57:58 2024 +0530 chore: update tests commit c1d4aba72a4c24764f5720f178b0797bed319c52 Author: Vihas Makwana Date: Tue Jul 23 20:55:45 2024 +0530 fix: add specifc version metric-system commit dd671a6bcefbcf3b99a1e3ac863293839fa7a3cb Author: Vinit Chauhan Date: Tue Jul 23 10:20:37 2024 -0400 filebeat/decode_cef - Add option to ignore empty values (#40268) Added option to ignore empty values in the decode_cef processor. In the decode_cef processor, when there are empty values in the extensions section, we get errors during log parsing. This change provides a flag in decode_cef config to override this default behavior and ignore the fields with empty value. Some example errors that this helps handle are: error in field 'cn1': strconv.ParseInt: parsing "": invalid syntax error in field 'destinationTranslatedAddress': value is not a valid IP address Closes #40236 commit add7a455a13e23e58f94c87383775c11d48bd847 Author: Vihas Makwana Date: Tue Jul 23 19:13:54 2024 +0530 fix: unit test commit 029364581adbee989ed1c74f4eb42e29b98152e7 Author: Vihas Makwana Date: Tue Jul 23 16:31:49 2024 +0530 fix: remove ioutil commit e84201027015b713c99740e54dc473797e6b6a84 Author: Vihas Makwana Date: Tue Jul 23 16:14:01 2024 +0530 fix: update notice commit 246d730121ace6c80c856482e96fae00a006fa61 Author: Vihas Makwana Date: Tue Jul 23 16:13:15 2024 +0530 fix: add license, remove uuid5 commit ac018316942a9bd73f3f84e24937b25d128a7407 Author: Vihas Makwana Date: Tue Jul 23 15:03:08 2024 +0530 update: go.mod commit 42101c8f8e80f13e998a5658f13e7f0de9ed26fd Merge: 091fff8b4b 72636965a2 Author: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Tue Jul 23 15:02:20 2024 +0530 Merge branch 'main' into metricbeat-process-multierr commit 091fff8b4b98c4435f95228e24b0c87bbdf4c6a2 Author: Vihas Makwana Date: Tue Jul 23 14:58:51 2024 +0530 fix: test commit fd6d31280bf87b94d68117d847c5781c266463a8 Author: Vihas Makwana Date: Tue Jul 23 14:57:13 2024 +0530 fix: update go.mod, update uuid and metrics version commit 72636965a2e88becb475020e2b1aaac2d2476d56 Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon Jul 22 19:32:38 2024 +0000 build(deps): bump github.com/elastic/elastic-agent-libs from 0.9.13 to 0.9.15 (#40300) * build(deps): bump github.com/elastic/elastic-agent-libs Bumps [github.com/elastic/elastic-agent-libs](https://github.com/elastic/elastic-agent-libs) from 0.9.13 to 0.9.15. - [Release notes](https://github.com/elastic/elastic-agent-libs/releases) - [Commits](https://github.com/elastic/elastic-agent-libs/compare/v0.9.13...v0.9.15) --- updated-dependencies: - dependency-name: github.com/elastic/elastic-agent-libs dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update NOTICE.txt --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] commit e3d8f3b7fa6d77cc470704fa66ffceb84a22832d Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon Jul 22 13:44:51 2024 -0400 build(deps): bump github.com/elastic/elastic-agent-client/v7 from 7.14.0 to 7.15.0 (#40304) * build(deps): bump github.com/elastic/elastic-agent-client/v7 Bumps [github.com/elastic/elastic-agent-client/v7](https://github.com/elastic/elastic-agent-client) from 7.14.0 to 7.15.0. - [Release notes](https://github.com/elastic/elastic-agent-client/releases) - [Commits](https://github.com/elastic/elastic-agent-client/compare/v7.14.0...v7.15.0) --- updated-dependencies: - dependency-name: github.com/elastic/elastic-agent-client/v7 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] * Update NOTICE.txt --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] commit 3e95d530320e5e6ec605410839fcbd6a454ce2b5 Author: Maurizio Branca Date: Mon Jul 22 18:24:59 2024 +0200 Add CSP SDKs to the `allow` list (#40150) commit f3f772f6fe4700ea4584c23e5a56fed1594e39aa Author: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Fri Jul 19 17:52:19 2024 +0530 [filebeat][log] Enable status reporter for log input (#40075) * chore: initial commit, without tests * chore: tests * chore: add test cases * fix: add null check * fix: remove println * fix: lint * goimports * remove println * fix: changelog * update test for windows * fix: fix some comments * chore: add starting state in NewInput * fix: add sample output to verify the status * fix: remove println * fix: add integration tag * Update CHANGELOG.next.asciidoc Co-authored-by: Denis * fix: remove redundant bool * fix: add degraded --------- Co-authored-by: Pierre HILBERT Co-authored-by: Denis commit 463bbb4c7c00bf4715d6fff63b54260e79a35010 Author: Dan Kortschak Date: Fri Jul 19 06:32:29 2024 +0930 x-pack/filebeat/input/websocket: do minor clean-up in main loop (#40145) * remove unneeded goroutine * fix logging: The body was previously not being logged since an io.ReadCloser is not a JSON-serialisable type. commit 908553d797cc2541f8d7f61c333658705e324e33 Author: Vihas Makwana Date: Thu Jul 18 19:04:52 2024 +0530 chore: rename function commit 51a785484db6cbb01b65b3ac2661b59cd377a30e Author: Vihas Makwana Date: Thu Jul 18 18:42:00 2024 +0530 chore: update process summary commit 21b102baa4866c2ac2baa27372bbab8dcb5f78ac Author: Vihas Makwana Date: Thu Jul 18 16:44:19 2024 +0530 chore: add degradable error commit 942f8c78573d0162765bb8d80c7d7368ca36d58f Author: Alejandro Fernández Haro Date: Wed Jul 17 20:52:14 2024 +0200 [Metricbeat/kibana/status] Add support for v8format (#40275) commit 1bfcecb44b78ab6e578a26c2b87e5ab2e002021e Author: Vihas Makwana Date: Wed Jul 17 23:31:10 2024 +0530 fix: multierror support * fix: nits and comments * fix: fix notice, and test * fix notice * fix notice * fix: lint * fix: nits * fix: update notice, go.mod * fix: update notice, go.mod to v0.11.0 * temp * fix: use ErrorIs * fix: use ErrorIsf --------- Co-authored-by: Pierre HILBERT --- CHANGELOG.next.asciidoc | 1 + NOTICE.txt | 4 +- go.mod | 2 +- go.sum | 4 +- metricbeat/helper/http_test.go | 13 +- metricbeat/mb/mb.go | 28 ++- metricbeat/mb/module/runner.go | 5 + metricbeat/mb/module/runner_group.go | 9 + metricbeat/mb/module/wrapper.go | 8 + metricbeat/mb/testing/modules.go | 10 +- .../elasticsearch/node_stats/data_test.go | 4 + metricbeat/module/system/process/process.go | 7 +- .../module/system/process/process_test.go | 8 +- .../system/process_summary/process_summary.go | 16 +- .../process_summary/process_summary_test.go | 8 +- metricbeat/module/system/test_system.py | 39 ++++ .../mbtest/system/process_integration_test.go | 221 ++++++++++++++++++ 17 files changed, 355 insertions(+), 32 deletions(-) create mode 100644 x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8e63616d80f..9c6e506a9fc 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553] - Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619] +- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025] - Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation - Add support for Kibana status metricset in v8 format {pull}40275[40275] diff --git a/NOTICE.txt b/NOTICE.txt index 67076b74e92..23714152db4 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -13180,11 +13180,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-system-metrics -Version: v0.10.8 +Version: v0.11.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.10.8/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.11.0/LICENSE.txt: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 24c3f6e05d6..49a19fc29fd 100644 --- a/go.mod +++ b/go.mod @@ -194,7 +194,7 @@ require ( github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.8.1 github.com/elastic/elastic-agent-libs v0.9.15 - github.com/elastic/elastic-agent-system-metrics v0.10.8 + github.com/elastic/elastic-agent-system-metrics v0.11.0 github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff github.com/elastic/mito v1.15.0 diff --git a/go.sum b/go.sum index ea4ec0dee0d..96eb13a36d2 100644 --- a/go.sum +++ b/go.sum @@ -558,8 +558,8 @@ github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7b github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M= github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8= -github.com/elastic/elastic-agent-system-metrics v0.10.8 h1:YoX3GfWWDtL5YrBkIbl7jQ/usOxBi+0N9jHke2EzFCk= -github.com/elastic/elastic-agent-system-metrics v0.10.8/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= +github.com/elastic/elastic-agent-system-metrics v0.11.0 h1:/bWrgTsHZWLUhdT7WPNuQDFkrSfm+A4qf6QDQnZo9d8= +github.com/elastic/elastic-agent-system-metrics v0.11.0/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4= diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 2fbfea0d1ad..3fcb25578ba 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -19,7 +19,7 @@ package helper import ( "fmt" - "io/ioutil" + "io" "net" "net/http" "net/http/httptest" @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -55,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { content := []byte(test.Content) - tmpfile, err := ioutil.TempFile("", "token") + tmpfile, err := os.CreateTemp("", "token") if err != nil { t.Fatal(err) } @@ -236,14 +237,14 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - go http.Serve(l, mux) + go http.Serve(l, mux) //nolint:errcheck,gosec // Ignore the error, it's a test file return l } for title, c := range cases { t.Run(title, func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "testsocket") + tmpDir, err := os.MkdirTemp("", "testsocket") require.NoError(t, err) defer os.RemoveAll(tmpDir) @@ -262,7 +263,7 @@ func TestOverUnixSocket(t *testing.T) { r, err := h.FetchResponse() require.NoError(t, err) defer r.Body.Close() - content, err := ioutil.ReadAll(r.Body) + content, err := io.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, []byte("ehlo!"), content) }) @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig { func (*dummyModule) UnpackConfig(interface{}) error { return nil } +func (dummyModule) UpdateStatus(_ status.Status, _ string) {} +func (dummyModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 7e18dc9029d..0be1db7cef3 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -27,6 +27,7 @@ import ( "net/url" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -62,9 +63,11 @@ const ( // Module is the common interface for all Module implementations. type Module interface { - Name() string // Name returns the name of the Module. - Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. - UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + Name() string // Name returns the name of the Module. + Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. + UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent. + SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module. } // BaseModule implements the Module interface. @@ -73,9 +76,10 @@ type Module interface { // MetricSets, it can embed this type into another struct to satisfy the // Module interface requirements. type BaseModule struct { - name string - config ModuleConfig - rawConfig *conf.C + name string + config ModuleConfig + rawConfig *conf.C + statusReporter status.StatusReporter } func (m *BaseModule) String() string { @@ -95,6 +99,18 @@ func (m *BaseModule) UnpackConfig(to interface{}) error { return m.rawConfig.Unpack(to) } +// UpdateStatus updates the status of the module. Reflected on elastic-agent. +func (m *BaseModule) UpdateStatus(status status.Status, msg string) { + if m.statusReporter != nil { + m.statusReporter.UpdateStatus(status, msg) + } +} + +// SetStatusReporter sets the status repoter of the module. +func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) { + m.statusReporter = statusReporter +} + // WithConfig re-configures the module with the given raw configuration and returns a // copy of the module. // Intended to be called from module factories. Note that if metricsets are specified diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 1b0a621d705..aedb443e9a8 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup { func (mr *runner) String() string { return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets)) } + +func (mr *runner) SetStatusReporter(reporter status.StatusReporter) { + mr.mod.SetStatusReporter(reporter) +} diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index e020cd87d55..b4d92d29f56 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" ) type runnerGroup struct { @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner { } } +func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) { + for _, runner := range rg.runners { + if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok { + runnerWithStatus.SetStatusReporter(reporter) + } + } +} + func (rg *runnerGroup) Start() { rg.startOnce.Do(func() { for _, runner := range rg.runners { diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index d41bdf01497..5243d956365 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -146,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event { registry.Add(metricsPath, msw.Metrics(), monitoring.Full) monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String()) + msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name())) msw.run(done, out) }(msw) } @@ -253,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { err := fetcher.Fetch(reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } case mb.ReportingMetricSetV2WithContext: reporter.StartFetchTimer() err := fetcher.Fetch(ctx, reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 8c6e09df537..736bb1f40e6 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -53,6 +53,7 @@ that Metricbeat does it and with the same validations. } } */ + package testing import ( @@ -60,6 +61,7 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/go-concert/timed" "github.com/elastic/beats/v7/metricbeat/mb" @@ -72,9 +74,11 @@ type TestModule struct { RawConfig *conf.C } -func (m *TestModule) Name() string { return m.ModName } -func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } -func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) Name() string { return m.ModName } +func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } +func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) UpdateStatus(_ status.Status, _ string) {} +func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {} func NewTestModule(t testing.TB, config interface{}) *TestModule { c, err := conf.NewConfigFrom(config) diff --git a/metricbeat/module/elasticsearch/node_stats/data_test.go b/metricbeat/module/elasticsearch/node_stats/data_test.go index e6151555701..2317418eeaf 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_test.go +++ b/metricbeat/module/elasticsearch/node_stats/data_test.go @@ -22,6 +22,7 @@ package node_stats import ( "testing" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" ) @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig { func (m mockModule) UnpackConfig(to interface{}) error { return nil } + +func (m mockModule) UpdateStatus(_ status.Status, _ string) {} +func (m mockModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index ad9fa8d5ac0..684c87059c9 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -20,6 +20,7 @@ package process import ( + "errors" "fmt" "os" "runtime" @@ -111,7 +112,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { // monitor either a single PID, or the configured set of processes. if m.setpid == 0 { procs, roots, err := m.stats.Get() - if err != nil { + if err != nil && !errors.Is(err, process.NonFatalErr{}) { + // return only if the error is fatal in nature return fmt.Errorf("process stats: %w", err) } @@ -121,9 +123,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { RootFields: roots[evtI], }) if !isOpen { - return nil + return err } } + return err } else { proc, root, err := m.stats.GetOneRootEvent(m.setpid) if err != nil { diff --git a/metricbeat/module/system/process/process_test.go b/metricbeat/module/system/process/process_test.go index 98b48b75d6e..18841b68c09 100644 --- a/metricbeat/module/system/process/process_test.go +++ b/metricbeat/module/system/process/process_test.go @@ -37,13 +37,17 @@ func TestFetch(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) events, errs := mbtest.ReportingFetchV2Error(f) - assert.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } assert.NotEmpty(t, events) time.Sleep(2 * time.Second) events, errs = mbtest.ReportingFetchV2Error(f) - assert.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } assert.NotEmpty(t, events) t.Logf("fetched %d events, showing events[0]:", len(events)) diff --git a/metricbeat/module/system/process_summary/process_summary.go b/metricbeat/module/system/process_summary/process_summary.go index c64a0c1d3e1..cbf1c63a2fe 100644 --- a/metricbeat/module/system/process_summary/process_summary.go +++ b/metricbeat/module/system/process_summary/process_summary.go @@ -20,8 +20,9 @@ package process_summary import ( + "errors" "fmt" - "io/ioutil" + "os" "runtime" "strconv" "strings" @@ -68,9 +69,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) error { - procList, err := process.ListStates(m.sys) - if err != nil { - return fmt.Errorf("error fetching process list: %w", err) + procList, degradeErr := process.ListStates(m.sys) + if degradeErr != nil && !errors.Is(degradeErr, process.NonFatalErr{}) { + // return only if the error is fatal in nature + return fmt.Errorf("error fetching process list: %w", degradeErr) } procStates := map[string]int{} @@ -83,7 +85,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { } outMap := mapstr.M{} - err = typeconv.Convert(&outMap, procStates) + err := typeconv.Convert(&outMap, procStates) if err != nil { return fmt.Errorf("error formatting process stats: %w", err) } @@ -101,13 +103,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { MetricSetFields: outMap, }) - return nil + return degradeErr } // threadStats returns a map of state counts for running threads on a system func threadStats(sys resolve.Resolver) (mapstr.M, error) { statPath := sys.ResolveHostFS("/proc/stat") - procData, err := ioutil.ReadFile(statPath) + procData, err := os.ReadFile(statPath) if err != nil { return nil, fmt.Errorf("error reading procfs file %s: %w", statPath, err) } diff --git a/metricbeat/module/system/process_summary/process_summary_test.go b/metricbeat/module/system/process_summary/process_summary_test.go index 7ec35634e43..042148f3713 100644 --- a/metricbeat/module/system/process_summary/process_summary_test.go +++ b/metricbeat/module/system/process_summary/process_summary_test.go @@ -46,7 +46,9 @@ func TestFetch(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) events, errs := mbtest.ReportingFetchV2Error(f) - require.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } require.NotEmpty(t, events) event := events[0].BeatEvent("system", "process_summary").Fields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), @@ -62,7 +64,9 @@ func TestStateNames(t *testing.T) { f := mbtest.NewReportingMetricSetV2Error(t, getConfig()) events, errs := mbtest.ReportingFetchV2Error(f) - require.Empty(t, errs) + for _, err := range errs { + assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err) + } require.NotEmpty(t, events) event := events[0].BeatEvent("system", "process_summary").Fields diff --git a/metricbeat/module/system/test_system.py b/metricbeat/module/system/test_system.py index de113e5e4b0..dda6a0a6fdd 100644 --- a/metricbeat/module/system/test_system.py +++ b/metricbeat/module/system/test_system.py @@ -385,9 +385,18 @@ def test_process_summary(self): output = self.read_output_json() self.assertGreater(len(output), 0) + only_errors_encountered = True for evt in output: self.assert_fields_are_documented(evt) + if evt.get("error", None) is not None: + # Here, we assume that the error is non-fatal and we move forward the test execution. + # If the error is non-fatal, the test should pass with assertions. + # If the error is fatal, the test should fail. + continue + + # we've encoutered an event. Turn off the flag + only_errors_encountered = False summary = evt["system"]["process"]["summary"] assert isinstance(summary["total"], int) @@ -396,6 +405,10 @@ def test_process_summary(self): assert isinstance(summary["running"], int) assert isinstance(summary["total"], int) + # If the flag is true, we've only encountered error (fatal errors) + # If the flag is false, we've encoutered events and probably some non-fatal errors. + assert not only_errors_encountered + @unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd", sys.platform), "os") def test_process(self): """ @@ -419,7 +432,17 @@ def test_process(self): self.assertGreater(len(output), 0) found_cmdline = False + only_errors_encountered = True for evt in output: + if evt.get("error", None) is not None: + # Here, we assume that the error is non-fatal and we move forward the test execution. + # If the error is non-fatal, the test should pass with assertions. + # If the error is fatal, the test should fail. + continue + + # we've encoutered an event. Turn off the flag + only_errors_encountered = False + process = evt["system"]["process"] # Not all process will have 'cmdline' due to permission issues, # especially on Windows. Therefore we ensure at least some of @@ -442,6 +465,10 @@ def test_process(self): self.assertTrue( found_cmdline, "cmdline not found in any process events") + # If the flag is true, we've only encountered error (fatal errors) + # If the flag is false, we've encoutered events and probably some non-fatal errors. + assert not only_errors_encountered + @unittest.skipUnless(re.match("(?i)linux|darwin|freebsd", sys.platform), "os") def test_process_unix(self): """ @@ -477,7 +504,16 @@ def test_process_unix(self): found_fd = False found_env = False found_cwd = not sys.platform.startswith("linux") + only_errors_encountered = True for evt in output: + if evt.get("error", None) is not None: + # Here, we assume that the error is non-fatal and we move forward the test execution. + # If the error is non-fatal, the test should pass with assertions. + # If the error is fatal, the test should fail. + continue + # we've encoutered an event. Turn off the flag + only_errors_encountered = False + found_cwd |= "working_directory" in evt["process"] process = evt["system"]["process"] @@ -499,6 +535,9 @@ def test_process_unix(self): if not sys.platform.startswith("darwin"): self.assertTrue(found_fd, "fd not found in any process events") + # If the flag is true, we've only encountered error (fatal errors) + # If the flag is false, we've encoutered events and probably some non-fatal errors. + assert not only_errors_encountered self.assertTrue(found_env, "env not found in any process events") self.assertTrue( found_cwd, "working_directory not found in any process events") diff --git a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go new file mode 100644 index 00000000000..660e9525558 --- /dev/null +++ b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go @@ -0,0 +1,221 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package tests + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestProcessStatusReporter(t *testing.T) { + unitOneID := mock.NewID() + unitOutID := mock.NewID() + token := mock.NewID() + + tests.InitBeatsForTest(t, cmd.RootCmd) + + filename := fmt.Sprintf("test-%d", time.Now().Unix()) + outPath := filepath.Join(t.TempDir(), filename) + t.Logf("writing output to file %s", outPath) + err := os.Mkdir(outPath, 0775) + require.NoError(t, err) + defer func() { + err := os.RemoveAll(outPath) + require.NoError(t, err) + }() + + // process with pid=-1 doesn't exist. This should degrade the input for a while + inputStreamIncorrectPid := getInputStream(unitOneID, -1, 1) + + // process with valid pid. This should change state to healthy + inputStreamCorrectPid := getInputStream(unitOneID, os.Getpid(), 2) + + outputExpectedStream := proto.UnitExpected{ + Id: unitOutID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "file", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), + }, + } + + observedStates := make(chan *proto.CheckinObserved) + expectedUnits := make(chan []*proto.UnitExpected) + done := make(chan struct{}) + + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + select { + case observedStates <- observed: + return &proto.CheckinExpected{ + Units: <-expectedUnits, + } + case <-done: + return nil + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + require.NoError(t, server.Start(), "could not start V2 mock server") + defer server.Stop() + + // start the client + client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + c := management.DefaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits) + }) + + go func() { + t.Logf("Running beats...") + err := cmd.RootCmd.Execute() + require.NoError(t, err) + }() + + scenarios := []struct { + expectedStatus proto.State + nextInputunit *proto.UnitExpected + }{ + { + proto.State_HEALTHY, + &inputStreamIncorrectPid, + }, + { + proto.State_DEGRADED, + &inputStreamCorrectPid, + }, + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, + // wait for one more checkin, just to be sure it's healthy + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, + } + + timeout := 2 * time.Minute + timer := time.NewTimer(timeout) + + for id := 0; id < len(scenarios); { + select { + case observed := <-observedStates: + state := extractState(observed.GetUnits(), unitOneID) + expectedUnits <- []*proto.UnitExpected{ + scenarios[id].nextInputunit, + &outputExpectedStream, + } + if state != scenarios[id].expectedStatus { + continue + } + // always ensure that output is healthy + outputState := extractState(observed.GetUnits(), unitOutID) + require.Equal(t, outputState, proto.State_HEALTHY) + + timer.Reset(timeout) + id++ + case <-timer.C: + t.Fatalf("timeout after %s waiting for checkin", timeout) + default: + } + } +} + +func extractState(units []*proto.UnitObserved, idx string) proto.State { + for _, unit := range units { + if unit.Id == idx { + return unit.GetState() + } + } + return -1 +} + +func getInputStream(id string, pid int, stateIdx int) proto.UnitExpected { + return proto.UnitExpected{ + Id: id, + Type: proto.UnitType_INPUT, + ConfigStateIdx: uint64(stateIdx), + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Streams: []*proto.Stream{{ + Id: "system/metrics-system.process-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.process", + Type: "metrics", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "metricsets": []interface{}{"process"}, + "process.pid": pid, + }), + }}, + Type: "system/metrics", + Id: "system/metrics-system-default-system", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + }, + } +}