Skip to content

Commit

Permalink
[metricbeat] - Allow metricsets to report their status via v2 protocol (
Browse files Browse the repository at this point in the history
#40400)

* fix: initial commit

* tests: add integration test cases

* fix: expand testing scenarios

* fix: add comments

* fix: move integration tests to system/process

* cleanup

* fix: ci

* fix: ci and typos

* chore: update changelog

* fix: add helper

* fix: remove extra space

* fix: ci

* fix: move integration tests to x-pack

* fix: add null check

* fix: ci

* fix: remove unused code

* fix: lint

* fix: lint and imports

* fix: ci windows

* inting for windows

* fix lint linux

* fix: go imports

* fix: switch to the generic way

* chore: make error descriptive

* fix: move status report after fetch

* fix: typo

* fix: remove nolint

* Squashed commit of the following:

commit 18d38af
Author: Vihas Makwana <[email protected]>
Date:   Wed Jul 24 01:23:54 2024 +0530

    fix: add comments

commit 806cda4
Merge: 2e0bd28 b5b67a1
Author: VihasMakwana <[email protected]>
Date:   Wed Jul 24 01:20:38 2024 +0530

    Merge branch 'main' into metricbeat-process-multierr

commit 2e0bd28
Author: Vihas Makwana <[email protected]>
Date:   Wed Jul 24 01:20:14 2024 +0530

    fix: typo

commit 82dc103
Author: Vihas Makwana <[email protected]>
Date:   Wed Jul 24 01:19:35 2024 +0530

    fix: typo

commit b5b67a1
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Tue Jul 23 18:13:16 2024 +0000

    build(deps): bump the azure-sdks group with 2 updates (#40310)

    * build(deps): bump the azure-sdks group with 2 updates

    Bumps the azure-sdks group with 2 updates: [github.com/Azure/go-autorest/autorest](https://github.com/Azure/go-autorest) and [github.com/Azure/go-autorest/autorest/adal](https://github.com/Azure/go-autorest).

    Updates `github.com/Azure/go-autorest/autorest` from 0.11.28 to 0.11.29
    - [Release notes](https://github.com/Azure/go-autorest/releases)
    - [Changelog](https://github.com/Azure/go-autorest/blob/main/CHANGELOG.md)
    - [Commits](Azure/go-autorest@autorest/v0.11.28...autorest/v0.11.29)

    Updates `github.com/Azure/go-autorest/autorest/adal` from 0.9.21 to 0.9.22
    - [Release notes](https://github.com/Azure/go-autorest/releases)
    - [Changelog](https://github.com/Azure/go-autorest/blob/main/CHANGELOG.md)
    - [Commits](Azure/go-autorest@autorest/adal/v0.9.21...autorest/adal/v0.9.22)

    ---
    updated-dependencies:
    - dependency-name: github.com/Azure/go-autorest/autorest
      dependency-type: direct:production
      update-type: version-update:semver-patch
      dependency-group: azure-sdks
    - dependency-name: github.com/Azure/go-autorest/autorest/adal
      dependency-type: direct:production
      update-type: version-update:semver-patch
      dependency-group: azure-sdks
    ...

    Signed-off-by: dependabot[bot] <[email protected]>

    * Update NOTICE.txt

    ---------

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    Co-authored-by: dependabot[bot] <dependabot[bot]@users.noreply.github.com>

commit 197396f
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Tue Jul 23 13:32:02 2024 -0400

    build(deps): bump the gcp-sdks group with 9 updates (#40311)

    * build(deps): bump the gcp-sdks group with 9 updates

    Bumps the gcp-sdks group with 9 updates:

    | Package | From | To |
    | --- | --- | --- |
    | [cloud.google.com/go/bigquery](https://github.com/googleapis/google-cloud-go) | `1.55.0` | `1.62.0` |
    | [cloud.google.com/go/monitoring](https://github.com/googleapis/google-cloud-go) | `1.16.0` | `1.20.1` |
    | [cloud.google.com/go/pubsub](https://github.com/googleapis/google-cloud-go) | `1.33.0` | `1.40.0` |
    | [cloud.google.com/go/compute](https://github.com/googleapis/google-cloud-go) | `1.23.0` | `1.27.2` |
    | [cloud.google.com/go/redis](https://github.com/googleapis/google-cloud-go) | `1.13.1` | `1.16.2` |
    | [cloud.google.com/go/compute/metadata](https://github.com/googleapis/google-cloud-go) | `0.2.3` | `0.4.0` |
    | [cloud.google.com/go/iam](https://github.com/googleapis/google-cloud-go) | `1.1.2` | `1.1.10` |
    | [cloud.google.com/go/longrunning](https://github.com/googleapis/google-cloud-go) | `0.5.1` | `0.5.9` |
    | [cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go) | `1.30.1` | `1.42.0` |

    Updates `cloud.google.com/go/bigquery` from 1.55.0 to 1.62.0
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@spanner/v1.55.0...spanner/v1.62.0)

    Updates `cloud.google.com/go/monitoring` from 1.16.0 to 1.20.1
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/documentai/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@kms/v1.16.0...video/v1.20.1)

    Updates `cloud.google.com/go/pubsub` from 1.33.0 to 1.40.0
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@pubsub/v1.33.0...pubsub/v1.40.0)

    Updates `cloud.google.com/go/compute` from 1.23.0 to 1.27.2
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/documentai/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@pubsub/v1.23.0...compute/v1.27.2)

    Updates `cloud.google.com/go/redis` from 1.13.1 to 1.16.2
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@asset/v1.13.1...redis/v1.16.2)

    Updates `cloud.google.com/go/compute/metadata` from 0.2.3 to 0.4.0
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@netapp/v0.2.3...v0.4.0)

    Updates `cloud.google.com/go/iam` from 1.1.2 to 1.1.10
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@iam/v1.1.2...iam/v1.1.10)

    Updates `cloud.google.com/go/longrunning` from 0.5.1 to 0.5.9
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@auth/v0.5.1...longrunning/v0.5.9)

    Updates `cloud.google.com/go/storage` from 1.30.1 to 1.42.0
    - [Release notes](https://github.com/googleapis/google-cloud-go/releases)
    - [Changelog](https://github.com/googleapis/google-cloud-go/blob/main/documentai/CHANGES.md)
    - [Commits](googleapis/google-cloud-go@pubsub/v1.30.1...spanner/v1.42.0)

    ---
    updated-dependencies:
    - dependency-name: cloud.google.com/go/bigquery
      dependency-type: direct:production
      update-type: version-update:semver-minor
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/monitoring
      dependency-type: direct:production
      update-type: version-update:semver-minor
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/pubsub
      dependency-type: direct:production
      update-type: version-update:semver-minor
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/compute
      dependency-type: direct:production
      update-type: version-update:semver-minor
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/redis
      dependency-type: direct:production
      update-type: version-update:semver-minor
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/compute/metadata
      dependency-type: indirect
      update-type: version-update:semver-minor
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/iam
      dependency-type: indirect
      update-type: version-update:semver-patch
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/longrunning
      dependency-type: indirect
      update-type: version-update:semver-patch
      dependency-group: gcp-sdks
    - dependency-name: cloud.google.com/go/storage
      dependency-type: direct:production
      update-type: version-update:semver-minor
      dependency-group: gcp-sdks
    ...

    Signed-off-by: dependabot[bot] <[email protected]>

    * Update NOTICE.txt

    ---------

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    Co-authored-by: dependabot[bot] <dependabot[bot]@users.noreply.github.com>

commit 8940f7d
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 21:02:27 2024 +0530

    fix: update notice

commit 58bc2ff
Merge: 9433065 dd671a6
Author: VihasMakwana <[email protected]>
Date:   Tue Jul 23 20:59:16 2024 +0530

    Merge branch 'main' into metricbeat-process-multierr

commit 9433065
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 20:57:58 2024 +0530

    chore: update tests

commit c1d4aba
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 20:55:45 2024 +0530

    fix: add specifc version metric-system

commit dd671a6
Author: Vinit Chauhan <[email protected]>
Date:   Tue Jul 23 10:20:37 2024 -0400

    filebeat/decode_cef - Add option to ignore empty values (#40268)

    Added option to ignore empty values in the decode_cef processor.

    In the decode_cef processor, when there are empty values in the extensions section, we get errors during log parsing. This change provides a flag in decode_cef config to override this default behavior and ignore the fields with empty value. Some example errors that this helps handle are:

        error in field 'cn1': strconv.ParseInt: parsing "": invalid syntax
        error in field 'destinationTranslatedAddress': value is not a valid IP address

    Closes #40236

commit add7a45
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 19:13:54 2024 +0530

    fix: unit test

commit 0293645
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 16:31:49 2024 +0530

    fix: remove ioutil

commit e842010
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 16:14:01 2024 +0530

    fix: update notice

commit 246d730
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 16:13:15 2024 +0530

    fix: add license, remove uuid5

commit ac01831
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 15:03:08 2024 +0530

    update: go.mod

commit 42101c8
Merge: 091fff8 7263696
Author: VihasMakwana <[email protected]>
Date:   Tue Jul 23 15:02:20 2024 +0530

    Merge branch 'main' into metricbeat-process-multierr

commit 091fff8
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 14:58:51 2024 +0530

    fix: test

commit fd6d312
Author: Vihas Makwana <[email protected]>
Date:   Tue Jul 23 14:57:13 2024 +0530

    fix: update go.mod, update uuid and metrics version

commit 7263696
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Mon Jul 22 19:32:38 2024 +0000

    build(deps): bump github.com/elastic/elastic-agent-libs from 0.9.13 to 0.9.15 (#40300)

    * build(deps): bump github.com/elastic/elastic-agent-libs

    Bumps [github.com/elastic/elastic-agent-libs](https://github.com/elastic/elastic-agent-libs) from 0.9.13 to 0.9.15.
    - [Release notes](https://github.com/elastic/elastic-agent-libs/releases)
    - [Commits](elastic/elastic-agent-libs@v0.9.13...v0.9.15)

    ---
    updated-dependencies:
    - dependency-name: github.com/elastic/elastic-agent-libs
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...

    Signed-off-by: dependabot[bot] <[email protected]>

    * Update NOTICE.txt

    ---------

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    Co-authored-by: dependabot[bot] <dependabot[bot]@users.noreply.github.com>

commit e3d8f3b
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Mon Jul 22 13:44:51 2024 -0400

    build(deps): bump github.com/elastic/elastic-agent-client/v7 from 7.14.0 to 7.15.0 (#40304)

    * build(deps): bump github.com/elastic/elastic-agent-client/v7

    Bumps [github.com/elastic/elastic-agent-client/v7](https://github.com/elastic/elastic-agent-client) from 7.14.0 to 7.15.0.
    - [Release notes](https://github.com/elastic/elastic-agent-client/releases)
    - [Commits](elastic/elastic-agent-client@v7.14.0...v7.15.0)

    ---
    updated-dependencies:
    - dependency-name: github.com/elastic/elastic-agent-client/v7
      dependency-type: direct:production
      update-type: version-update:semver-minor
    ...

    Signed-off-by: dependabot[bot] <[email protected]>

    * Update NOTICE.txt

    ---------

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
    Co-authored-by: dependabot[bot] <dependabot[bot]@users.noreply.github.com>

commit 3e95d53
Author: Maurizio Branca <[email protected]>
Date:   Mon Jul 22 18:24:59 2024 +0200

    Add CSP SDKs to the `allow` list (#40150)

commit f3f772f
Author: VihasMakwana <[email protected]>
Date:   Fri Jul 19 17:52:19 2024 +0530

    [filebeat][log] Enable status reporter for log input (#40075)

    * chore: initial commit, without tests

    * chore: tests

    * chore: add test cases

    * fix: add null check

    * fix: remove println

    * fix: lint

    * goimports

    * remove println

    * fix: changelog

    * update test for windows

    * fix: fix some comments

    * chore: add starting state in NewInput

    * fix: add sample output to verify the status

    * fix: remove println

    * fix: add integration tag

    * Update CHANGELOG.next.asciidoc

    Co-authored-by: Denis <[email protected]>

    * fix: remove redundant bool

    * fix: add degraded

    ---------

    Co-authored-by: Pierre HILBERT <[email protected]>
    Co-authored-by: Denis <[email protected]>

commit 463bbb4
Author: Dan Kortschak <[email protected]>
Date:   Fri Jul 19 06:32:29 2024 +0930

    x-pack/filebeat/input/websocket: do minor clean-up in main loop (#40145)

    * remove unneeded goroutine
    * fix logging: The body was previously not being logged since an io.ReadCloser
      is not a JSON-serialisable type.

commit 908553d
Author: Vihas Makwana <[email protected]>
Date:   Thu Jul 18 19:04:52 2024 +0530

    chore: rename function

commit 51a7854
Author: Vihas Makwana <[email protected]>
Date:   Thu Jul 18 18:42:00 2024 +0530

    chore: update process summary

commit 21b102b
Author: Vihas Makwana <[email protected]>
Date:   Thu Jul 18 16:44:19 2024 +0530

    chore: add degradable error

commit 942f8c7
Author: Alejandro Fernández Haro <[email protected]>
Date:   Wed Jul 17 20:52:14 2024 +0200

    [Metricbeat/kibana/status] Add support for v8format (#40275)

commit 1bfcecb
Author: Vihas Makwana <[email protected]>
Date:   Wed Jul 17 23:31:10 2024 +0530

    fix: multierror support

* fix: nits and comments

* fix: fix notice, and test

* fix notice

* fix notice

* fix: lint

* fix: nits

* fix: update notice, go.mod

* fix: update notice, go.mod to v0.11.0

* temp

* fix: use ErrorIs

* fix: use ErrorIsf

---------

Co-authored-by: Pierre HILBERT <[email protected]>
  • Loading branch information
VihasMakwana and pierrehilbert authored Aug 6, 2024
1 parent 99c1138 commit 2060383
Show file tree
Hide file tree
Showing 17 changed files with 355 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]
- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025]
- Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation
- Add support for Kibana status metricset in v8 format {pull}40275[40275]

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13180,11 +13180,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-system-metrics
Version: v0.10.8
Version: v0.11.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.10.8/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.11.0/LICENSE.txt:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ require (
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.8.1
github.com/elastic/elastic-agent-libs v0.9.15
github.com/elastic/elastic-agent-system-metrics v0.10.8
github.com/elastic/elastic-agent-system-metrics v0.11.0
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff
github.com/elastic/mito v1.15.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,8 @@ github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7b
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.9.15 h1:WCLtuErafUxczT/rXJa4Vr6mxwC8dgtqMbEq+qWGD4M=
github.com/elastic/elastic-agent-libs v0.9.15/go.mod h1:2VgYxHaeM+cCDBjiS2wbmTvzPGbnlXAtYrlcLefheS8=
github.com/elastic/elastic-agent-system-metrics v0.10.8 h1:YoX3GfWWDtL5YrBkIbl7jQ/usOxBi+0N9jHke2EzFCk=
github.com/elastic/elastic-agent-system-metrics v0.10.8/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk=
github.com/elastic/elastic-agent-system-metrics v0.11.0 h1:/bWrgTsHZWLUhdT7WPNuQDFkrSfm+A4qf6QDQnZo9d8=
github.com/elastic/elastic-agent-system-metrics v0.11.0/go.mod h1:3QiMu9wTKJFvpCN+5klgGqasTMNKJbgY3xcoN1KQXJk=
github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA=
github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
Expand Down
13 changes: 8 additions & 5 deletions metricbeat/helper/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package helper

import (
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -55,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
content := []byte(test.Content)
tmpfile, err := ioutil.TempFile("", "token")
tmpfile, err := os.CreateTemp("", "token")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -236,14 +237,14 @@ func TestOverUnixSocket(t *testing.T) {
fmt.Fprintf(w, "ehlo!")
})

go http.Serve(l, mux)
go http.Serve(l, mux) //nolint:errcheck,gosec // Ignore the error, it's a test file

return l
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
tmpDir, err := os.MkdirTemp("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

Expand All @@ -262,7 +263,7 @@ func TestOverUnixSocket(t *testing.T) {
r, err := h.FetchResponse()
require.NoError(t, err)
defer r.Body.Close()
content, err := ioutil.ReadAll(r.Body)
content, err := io.ReadAll(r.Body)
require.NoError(t, err)
assert.Equal(t, []byte("ehlo!"), content)
})
Expand Down Expand Up @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig {
func (*dummyModule) UnpackConfig(interface{}) error {
return nil
}
func (dummyModule) UpdateStatus(_ status.Status, _ string) {}
func (dummyModule) SetStatusReporter(_ status.StatusReporter) {}
28 changes: 22 additions & 6 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/url"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -62,9 +63,11 @@ const (

// Module is the common interface for all Module implementations.
type Module interface {
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent.
SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module.
}

// BaseModule implements the Module interface.
Expand All @@ -73,9 +76,10 @@ type Module interface {
// MetricSets, it can embed this type into another struct to satisfy the
// Module interface requirements.
type BaseModule struct {
name string
config ModuleConfig
rawConfig *conf.C
name string
config ModuleConfig
rawConfig *conf.C
statusReporter status.StatusReporter
}

func (m *BaseModule) String() string {
Expand All @@ -95,6 +99,18 @@ func (m *BaseModule) UnpackConfig(to interface{}) error {
return m.rawConfig.Unpack(to)
}

// UpdateStatus updates the status of the module. Reflected on elastic-agent.
func (m *BaseModule) UpdateStatus(status status.Status, msg string) {
if m.statusReporter != nil {
m.statusReporter.UpdateStatus(status, msg)
}
}

// SetStatusReporter sets the status repoter of the module.
func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) {
m.statusReporter = statusReporter
}

// WithConfig re-configures the module with the given raw configuration and returns a
// copy of the module.
// Intended to be called from module factories. Note that if metricsets are specified
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/mb/module/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand Down Expand Up @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup {
func (mr *runner) String() string {
return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets))
}

func (mr *runner) SetStatusReporter(reporter status.StatusReporter) {
mr.mod.SetStatusReporter(reporter)
}
9 changes: 9 additions & 0 deletions metricbeat/mb/module/runner_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
)

type runnerGroup struct {
Expand All @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner {
}
}

func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) {
for _, runner := range rg.runners {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(reporter)
}
}
}

func (rg *runnerGroup) Start() {
rg.startOnce.Do(func() {
for _, runner := range rg.runners {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -146,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event {
registry.Add(metricsPath, msw.Metrics(), monitoring.Full)
monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String())

msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name()))
msw.run(done, out)
}(msw)
}
Expand Down Expand Up @@ -253,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
Expand Down
10 changes: 7 additions & 3 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ that Metricbeat does it and with the same validations.
}
}
*/

package testing

import (
"context"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -72,9 +74,11 @@ type TestModule struct {
RawConfig *conf.C
}

func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) UpdateStatus(_ status.Status, _ string) {}
func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {}

func NewTestModule(t testing.TB, config interface{}) *TestModule {
c, err := conf.NewConfigFrom(config)
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/elasticsearch/node_stats/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package node_stats
import (
"testing"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig {
func (m mockModule) UnpackConfig(to interface{}) error {
return nil
}

func (m mockModule) UpdateStatus(_ status.Status, _ string) {}
func (m mockModule) SetStatusReporter(_ status.StatusReporter) {}
7 changes: 5 additions & 2 deletions metricbeat/module/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package process

import (
"errors"
"fmt"
"os"
"runtime"
Expand Down Expand Up @@ -111,7 +112,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
// monitor either a single PID, or the configured set of processes.
if m.setpid == 0 {
procs, roots, err := m.stats.Get()
if err != nil {
if err != nil && !errors.Is(err, process.NonFatalErr{}) {
// return only if the error is fatal in nature
return fmt.Errorf("process stats: %w", err)
}

Expand All @@ -121,9 +123,10 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
RootFields: roots[evtI],
})
if !isOpen {
return nil
return err
}
}
return err
} else {
proc, root, err := m.stats.GetOneRootEvent(m.setpid)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions metricbeat/module/system/process/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ func TestFetch(t *testing.T) {

f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
assert.Empty(t, errs)
for _, err := range errs {
assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err)
}
assert.NotEmpty(t, events)

time.Sleep(2 * time.Second)

events, errs = mbtest.ReportingFetchV2Error(f)
assert.Empty(t, errs)
for _, err := range errs {
assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err)
}
assert.NotEmpty(t, events)

t.Logf("fetched %d events, showing events[0]:", len(events))
Expand Down
16 changes: 9 additions & 7 deletions metricbeat/module/system/process_summary/process_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
package process_summary

import (
"errors"
"fmt"
"io/ioutil"
"os"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -68,9 +69,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// descriptive error must be returned.
func (m *MetricSet) Fetch(r mb.ReporterV2) error {

procList, err := process.ListStates(m.sys)
if err != nil {
return fmt.Errorf("error fetching process list: %w", err)
procList, degradeErr := process.ListStates(m.sys)
if degradeErr != nil && !errors.Is(degradeErr, process.NonFatalErr{}) {
// return only if the error is fatal in nature
return fmt.Errorf("error fetching process list: %w", degradeErr)
}

procStates := map[string]int{}
Expand All @@ -83,7 +85,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
}

outMap := mapstr.M{}
err = typeconv.Convert(&outMap, procStates)
err := typeconv.Convert(&outMap, procStates)
if err != nil {
return fmt.Errorf("error formatting process stats: %w", err)
}
Expand All @@ -101,13 +103,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
MetricSetFields: outMap,
})

return nil
return degradeErr
}

// threadStats returns a map of state counts for running threads on a system
func threadStats(sys resolve.Resolver) (mapstr.M, error) {
statPath := sys.ResolveHostFS("/proc/stat")
procData, err := ioutil.ReadFile(statPath)
procData, err := os.ReadFile(statPath)
if err != nil {
return nil, fmt.Errorf("error reading procfs file %s: %w", statPath, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func TestFetch(t *testing.T) {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)

require.Empty(t, errs)
for _, err := range errs {
assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err)
}
require.NotEmpty(t, events)
event := events[0].BeatEvent("system", "process_summary").Fields
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(),
Expand All @@ -62,7 +64,9 @@ func TestStateNames(t *testing.T) {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)

require.Empty(t, errs)
for _, err := range errs {
assert.ErrorIsf(t, err, process.NonFatalErr{}, "Expected non-fatal error, got %v", err)
}
require.NotEmpty(t, events)
event := events[0].BeatEvent("system", "process_summary").Fields

Expand Down
Loading

0 comments on commit 2060383

Please sign in to comment.