From c5bb19c36699f407ff704fa3bdc05168155e4f22 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 12 Sep 2024 12:56:52 -0400 Subject: [PATCH 01/26] Fix elasticsearch re-connection after network error When the Elasticsearch client fails to publish events, it ends up calling `Close` in the connection (that is reused). To cancel the in-flight requests, the context is cancelled and a new one is created to used in future requests. The callback to check the version holds a reference to the connection via a closure, now the Elasticsearch client holds a pointer to that connection, so whenever Close is called, the callback can create a request with the new, not cancelled, context. An integration test is added to ensure the ES output can always recover from network errors. --- CHANGELOG.next.asciidoc | 14 ++ NOTICE.txt | 53 +++++++ go.mod | 2 + go.sum | 4 + libbeat/esleg/eslegclient/connection.go | 3 + .../connection_integration_test.go | 7 + libbeat/outputs/elasticsearch/client.go | 8 +- .../tests/integration/elasticsearch_test.go | 142 ++++++++++++++++++ 8 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 libbeat/tests/integration/elasticsearch_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d582c3c3691..9804c8d929a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -108,6 +108,20 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] - Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356] +- Fix the paths in the .cmd script added to the path by the Windows MSI to point to the new C:\Program Files installation location. https://github.com/elastic/elastic-stack-installers/pull/238 +- Change cache processor documentation from `write_period` to `write_interval`. {pull}38561[38561] +- Fix cache processor expiries heap cleanup on partial file writes. {pull}38561[38561] +- Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561] +- Fix parsing of RFC 3164 process IDs in syslog processor. {issue}38947[38947] {pull}38982[38982] +- Rename the field "apache2.module.error" to "apache.module.error" in Apache error visualization. {issue}39480[39480] {pull}39481[39481] +- Validate config of the `replace` processor {pull}40047[40047] +- Allow port number 0 in the community ID flowhash processor {pull}40259[40259] +- Fix handling of escaped brackets in syslog structured data. {issue}40445[40445] {pull}40446[40446] +- Update Go version to 1.22.6. {pull}40528[40528] +- Aborts all active connections for Elasticsearch output. {pull}40572[40572] +- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] +- The journald input now restarts if there is an error/crash {issue}32782[32782] {pull}40558[40558] +- Fix Elasticsearch output not recovering from network errors {issue}40705[40705] {pull}40794[40794] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index b5df79133f7..bdcede0f6ee 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -16057,6 +16057,29 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/mito@v1.15.0/LI limitations under the License. +-------------------------------------------------------------------------------- +Dependency : github.com/elastic/mock-es +Version: v0.0.0-20240712014503-e5b47ece0015 +Licence type (autodetected): Apache-2.0 +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/elastic/mock-es@v0.0.0-20240712014503-e5b47ece0015/LICENSE: + +Copyright 2024 Elasticsearch B.V. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + -------------------------------------------------------------------------------- Dependency : github.com/elastic/tk-btf Version: v0.1.0 @@ -48374,6 +48397,36 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/mileusna/useragent +Version: v1.3.4 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/mileusna/useragent@v1.3.4/LICENSE.md: + +MIT License + +Copyright (c) 2017 Miloš Mileusnić + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- Dependency : github.com/minio/asm2plan9s Version: v0.0.0-20200509001527-cdd76441f9d8 diff --git a/go.mod b/go.mod index 75aba460236..1b43f2de007 100644 --- a/go.mod +++ b/go.mod @@ -195,6 +195,7 @@ require ( github.com/elastic/go-quark v0.1.2 github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 github.com/elastic/mito v1.15.0 + github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 github.com/elastic/tk-btf v0.1.0 github.com/elastic/toutoumomoma v0.0.0-20240626215117-76e39db18dfb github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 @@ -339,6 +340,7 @@ require ( github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.9 // indirect + github.com/mileusna/useragent v1.3.4 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect diff --git a/go.sum b/go.sum index 3eadfda4509..d1536fa7ef9 100644 --- a/go.sum +++ b/go.sum @@ -383,6 +383,8 @@ github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/u github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/mito v1.15.0 h1:MicOxLSVkgU2Aonbh3i+++66Wl5wvD8y9gALK8PQDYs= github.com/elastic/mito v1.15.0/go.mod h1:J+wCf4HccW2YoSFmZMGu+d06gN+WmnIlj5ehBqine74= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 h1:z8cC8GASpPo8yKlbnXI36HQ/BM9wYjhBPNbDjAWm0VU= +github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015/go.mod h1:qH9DX/Dmflz6EAtaks/+2SsdQzecVAKE174Zl66hk7E= github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA= github.com/elastic/pkcs8 v1.0.0/go.mod h1:ipsZToJfq1MxclVTwpG7U/bgeDtf+0HkUiOxebk95+0= github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3 h1:FzA0/n4iMt8ojGDGRoiFPSHFvvdVIvxOxyLtiFnrLBM= @@ -703,6 +705,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/miekg/dns v1.1.22/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.61 h1:nLxbwF3XxhwVSm8g9Dghm9MHPaUZuqhPiGL+675ZmEs= github.com/miekg/dns v1.1.61/go.mod h1:mnAarhS3nWaW+NVP2wTkYVIZyHNJ098SJZUki3eykwQ= +github.com/mileusna/useragent v1.3.4 h1:MiuRRuvGjEie1+yZHO88UBYg8YBC/ddF6T7F56i3PCk= +github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4ccGOMKNYYc= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 6a22132080f..2d3579e9a9b 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -326,6 +326,9 @@ func (conn *Connection) Ping() (ESPingData, error) { // Close closes a connection. func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() + conn.cancelReqs() + // Creates a new context to be use in new requests + conn.reqsContext, conn.cancelReqs = context.WithCancel(context.Background()) return nil } diff --git a/libbeat/esleg/eslegclient/connection_integration_test.go b/libbeat/esleg/eslegclient/connection_integration_test.go index b4e277ed1a6..3b0afe575a4 100644 --- a/libbeat/esleg/eslegclient/connection_integration_test.go +++ b/libbeat/esleg/eslegclient/connection_integration_test.go @@ -45,6 +45,13 @@ func TestConnect(t *testing.T) { assert.NoError(t, err) } +func TestConnectionCanBeClosedAndReused(t *testing.T) { + conn := getTestingElasticsearch(t) + assert.NoError(t, conn.Connect(), "first connect must succeed") + assert.NoError(t, conn.Close(), "close must succeed") + assert.NoError(t, conn.Connect(), "calling connect after close must succeed") +} + func TestConnectWithProxy(t *testing.T) { wrongPort, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 70c4cc1cce5..e2282ea0eaf 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -48,7 +48,11 @@ var ( // Client is an elasticsearch client. type Client struct { - conn eslegclient.Connection + // We use a pointer here because the connection holds a context + // that needs to be recreated in case of error and the connection + // that is passed to this client is also used in a closure, we need + // to ensure both hold a reference to the same instance of the connection. + conn *eslegclient.Connection indexSelector outputs.IndexSelector pipelineSelector *outil.Selector @@ -186,7 +190,7 @@ func NewClient( pLogIndex.Start() pLogIndexTryDeadLetter.Start() client := &Client{ - conn: *conn, + conn: conn, indexSelector: s.indexSelector, pipelineSelector: pipeline, observer: observer, diff --git a/libbeat/tests/integration/elasticsearch_test.go b/libbeat/tests/integration/elasticsearch_test.go new file mode 100644 index 00000000000..a076e767f27 --- /dev/null +++ b/libbeat/tests/integration/elasticsearch_test.go @@ -0,0 +1,142 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "errors" + "io" + "net/http" + "testing" + "time" + + "github.com/gofrs/uuid/v5" + "github.com/rcrowley/go-metrics" + "github.com/stretchr/testify/require" + + "github.com/elastic/mock-es/pkg/api" +) + +var esCfg = ` +mockbeat: +logging: + level: debug + selectors: + - publisher_pipeline_output + - esclientleg +queue.mem: + events: 4096 + flush.min_events: 8 + flush.timeout: 0.1s +output.elasticsearch: + allow_older_versions: true + hosts: + - "http://localhost:4242" + backoff: + init: 0.1s + max: 0.2s +` + +func TestESOutputRecoversFromNetworkError(t *testing.T) { + mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test") + mockbeat.WriteConfigFile(esCfg) + + s, mr := startMockES(t, "localhost:4242") + + mockbeat.Start() + + // 1. Wait for one _bulk call + waitForEventToBePublished(t, mr) + + // 2. Stop the mock-es server + if err := s.Close(); err != nil { + t.Fatalf("cannot close mock-es server: %s", err) + } + + // 3. Wait for connection error logs + mockbeat.WaitForLogs( + `Get \"http://localhost:4242\": dial tcp 127.0.0.1:4242: connect: connection refused`, + 2*time.Second, + "did not find connection refused error") + + mockbeat.WaitForLogs( + "Attempting to reconnect to backoff(elasticsearch(http://localhost:4242)) with 2 reconnect attempt(s)", + 2*time.Second, + "did not find two tries to reconnect") + + // 4. Restart mock-es on the same port + s, mr = startMockES(t, "localhost:4242") + + // 5. Wait for reconnection logs + mockbeat.WaitForLogs( + "Connection to backoff(elasticsearch(http://localhost:4242)) established", + 5*time.Second, // There is a backoff, so ensure we wait enough + "did not find re connection confirmation") + + // 6. Ensure one new call to _bulk is made + waitForEventToBePublished(t, mr) + s.Close() +} + +func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) { + uid := uuid.Must(uuid.NewV4()) + mr := metrics.NewRegistry() + es := api.NewAPIHandler(uid, "foo2", mr, time.Now().Add(24*time.Hour), 0, 0, 0, 0, 0) + + s := http.Server{Addr: addr, Handler: es} + go func() { + if err := s.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) { + t.Errorf("could not start mock-es server: %s", err) + } + }() + + require.Eventually(t, func() bool { + resp, err := http.Get("http://" + addr) + if err != nil { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return false + } + return true + }, + time.Second, time.Millisecond, "first server must be up") + + return &s, mr +} + +func waitForEventToBePublished(t *testing.T, mr metrics.Registry) { + t.Helper() + require.Eventually(t, func() bool { + total := mr.Get("bulk.create.total") + if total == nil { + return false + } + + sc, ok := total.(*metrics.StandardCounter) + if !ok { + t.Fatalf("expecting 'bulk.create.total' to be *metrics.StandardCounter, but got '%T' instead", + total, + ) + } + + return sc.Count() > 1 + }, + 10*time.Second, 100*time.Millisecond, + "at least one bulk request must be made") +} From 1e025433168778e4b62c184496acd5d76bf69848 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 13 Sep 2024 11:55:06 -0400 Subject: [PATCH 02/26] Move creating the request context to Connect This commit moves the creation of the request context to the connect method. --- libbeat/esleg/eslegclient/connection.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 2d3579e9a9b..eaa3ebb11c4 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -109,7 +109,8 @@ type ESVersionData struct { BuildFlavor string `json:"build_flavor"` } -// NewConnection returns a new Elasticsearch client +// NewConnection returns a new Elasticsearch client. +// Connect must be called before using this Connection. func NewConnection(s ConnectionSettings) (*Connection, error) { logger := logp.NewLogger("esclientleg") @@ -184,15 +185,12 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } - ctx, cancelFunc := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, HTTP: esClient, Encoder: encoder, log: logger, responseBuffer: bytes.NewBuffer(nil), - reqsContext: ctx, - cancelReqs: cancelFunc, } if s.APIKey != "" { @@ -284,6 +282,9 @@ func (conn *Connection) Connect() error { if conn.log == nil { conn.log = logp.NewLogger("esclientleg") } + + conn.reqsContext, conn.cancelReqs = context.WithCancel(context.Background()) + if err := conn.getVersion(); err != nil { return err } @@ -327,8 +328,6 @@ func (conn *Connection) Ping() (ESPingData, error) { func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() conn.cancelReqs() - // Creates a new context to be use in new requests - conn.reqsContext, conn.cancelReqs = context.WithCancel(context.Background()) return nil } From 59541dcd66731bed5553effbc6bc61cc7cd7ae5e Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 13 Sep 2024 12:27:00 -0400 Subject: [PATCH 03/26] Fix tests and lint warnings --- libbeat/esleg/eslegclient/api_mock_test.go | 8 ++++---- libbeat/esleg/eslegclient/api_test.go | 6 +++++- libbeat/esleg/eslegclient/bulkapi_mock_test.go | 6 +++--- .../eslegclient/connection_integration_test.go | 18 +++--------------- .../tests/integration/elasticsearch_test.go | 5 +++-- 5 files changed, 18 insertions(+), 25 deletions(-) diff --git a/libbeat/esleg/eslegclient/api_mock_test.go b/libbeat/esleg/eslegclient/api_mock_test.go index 97834dcda51..bcb0d7a03c1 100644 --- a/libbeat/esleg/eslegclient/api_mock_test.go +++ b/libbeat/esleg/eslegclient/api_mock_test.go @@ -63,14 +63,14 @@ func TestOneHostSuccessResp(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", } _, resp, err := client.Index(index, "test", "1", params, body) if err != nil { - t.Errorf("Index() returns error: %s", err) + t.Fatalf("Index() returns error: %s", err) } if !resp.Created { t.Errorf("Index() fails: %s", resp) @@ -89,7 +89,7 @@ func TestOneHost500Resp(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) err := client.Connect() if err != nil { t.Fatalf("Failed to connect: %v", err) @@ -121,7 +121,7 @@ func TestOneHost503Resp(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", diff --git a/libbeat/esleg/eslegclient/api_test.go b/libbeat/esleg/eslegclient/api_test.go index 6c7dd675ccf..e930068cc12 100644 --- a/libbeat/esleg/eslegclient/api_test.go +++ b/libbeat/esleg/eslegclient/api_test.go @@ -170,11 +170,15 @@ func TestReadSearchResult_invalid(t *testing.T) { assert.Error(t, err) } -func newTestConnection(url string) *Connection { +func newTestConnection(t *testing.T, url string) *Connection { conn, _ := NewConnection(ConnectionSettings{ URL: url, }) conn.Encoder = NewJSONEncoder(nil, false) + if err := conn.Connect(); err != nil { + t.Fatalf("cannot connect to Elasticsearch: %s", err) + } + return conn } diff --git a/libbeat/esleg/eslegclient/bulkapi_mock_test.go b/libbeat/esleg/eslegclient/bulkapi_mock_test.go index 96434819eca..598204386f9 100644 --- a/libbeat/esleg/eslegclient/bulkapi_mock_test.go +++ b/libbeat/esleg/eslegclient/bulkapi_mock_test.go @@ -60,7 +60,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) { server := ElasticsearchMock(200, expectedResp) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", @@ -95,7 +95,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", @@ -134,7 +134,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) { server := ElasticsearchMock(503, []byte("Something wrong happened")) - client := newTestConnection(server.URL) + client := newTestConnection(t, server.URL) params := map[string]string{ "refresh": "true", diff --git a/libbeat/esleg/eslegclient/connection_integration_test.go b/libbeat/esleg/eslegclient/connection_integration_test.go index 3b0afe575a4..019df0563d9 100644 --- a/libbeat/esleg/eslegclient/connection_integration_test.go +++ b/libbeat/esleg/eslegclient/connection_integration_test.go @@ -21,8 +21,7 @@ package eslegclient import ( "context" - "io/ioutil" - "math/rand" + "io" "net" "net/http" "net/http/httptest" @@ -34,7 +33,6 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" - "github.com/elastic/beats/v7/libbeat/outputs" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) @@ -146,16 +144,6 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *Connection { return conn } -func randomClient(grp outputs.Group) outputs.NetworkClient { - L := len(grp.Clients) - if L == 0 { - panic("no elasticsearch client") - } - - client := grp.Clients[rand.Intn(L)] - return client.(outputs.NetworkClient) -} - // startTestProxy starts a proxy that redirects all connections to the specified URL func startTestProxy(t *testing.T, redirectURL string) *httptest.Server { t.Helper() @@ -173,14 +161,14 @@ func startTestProxy(t *testing.T, redirectURL string) *httptest.Server { require.NoError(t, err) defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) require.NoError(t, err) for _, header := range []string{"Content-Encoding", "Content-Type"} { w.Header().Set(header, resp.Header.Get(header)) } w.WriteHeader(resp.StatusCode) - w.Write(body) + w.Write(body) //nolint: errcheck // It's a test, we can ignore this error })) return proxy } diff --git a/libbeat/tests/integration/elasticsearch_test.go b/libbeat/tests/integration/elasticsearch_test.go index a076e767f27..23737e9ce08 100644 --- a/libbeat/tests/integration/elasticsearch_test.go +++ b/libbeat/tests/integration/elasticsearch_test.go @@ -99,7 +99,7 @@ func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) { mr := metrics.NewRegistry() es := api.NewAPIHandler(uid, "foo2", mr, time.Now().Add(24*time.Hour), 0, 0, 0, 0, 0) - s := http.Server{Addr: addr, Handler: es} + s := http.Server{Addr: addr, Handler: es, ReadHeaderTimeout: time.Second} go func() { if err := s.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) { t.Errorf("could not start mock-es server: %s", err) @@ -107,8 +107,9 @@ func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) { }() require.Eventually(t, func() bool { - resp, err := http.Get("http://" + addr) + resp, err := http.Get("http://" + addr) //nolint: noctx // It's just a test if err != nil { + //nolint: errcheck // We're just draining the body, we can ignore the error io.Copy(io.Discard, resp.Body) resp.Body.Close() return false From 76d986ba4818bc33368e678daccde78bec69c228 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 13 Sep 2024 15:20:23 -0400 Subject: [PATCH 04/26] Initialise reqsContext in NewConnections There are some cases where the Connection will be used without calling Connect, so we initialise reqsContext and cancelReqs in the NewConnection function to avoid panics. --- libbeat/esleg/eslegclient/connection.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index eaa3ebb11c4..11b5b8530df 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -110,7 +110,6 @@ type ESVersionData struct { } // NewConnection returns a new Elasticsearch client. -// Connect must be called before using this Connection. func NewConnection(s ConnectionSettings) (*Connection, error) { logger := logp.NewLogger("esclientleg") @@ -185,12 +184,18 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } + // There are some cases where the connection is created but Connect + // is not called before it's used, so we populate reqsContext and cancelReqs + // here. + reqsContext, cancelReqs := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, HTTP: esClient, Encoder: encoder, log: logger, responseBuffer: bytes.NewBuffer(nil), + reqsContext: reqsContext, + cancelReqs: cancelReqs, } if s.APIKey != "" { From fe2140a1d04f2311358c7ac585e3e7bd376c8e63 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 13 Sep 2024 15:48:58 -0400 Subject: [PATCH 05/26] Fix lint warnings --- libbeat/esleg/eslegclient/api_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/esleg/eslegclient/api_test.go b/libbeat/esleg/eslegclient/api_test.go index e930068cc12..45e5fbd52ab 100644 --- a/libbeat/esleg/eslegclient/api_test.go +++ b/libbeat/esleg/eslegclient/api_test.go @@ -170,6 +170,7 @@ func TestReadSearchResult_invalid(t *testing.T) { assert.Error(t, err) } +// nolint: unused // it's used by files with the !integration constraint func newTestConnection(t *testing.T, url string) *Connection { conn, _ := NewConnection(ConnectionSettings{ URL: url, From f8411f9be6cea447269d9653fe9d57170ddacd80 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 13 Sep 2024 16:55:54 -0400 Subject: [PATCH 06/26] Fix lint warnings --- libbeat/esleg/eslegclient/api_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libbeat/esleg/eslegclient/api_test.go b/libbeat/esleg/eslegclient/api_test.go index 45e5fbd52ab..d0d3c94a6f4 100644 --- a/libbeat/esleg/eslegclient/api_test.go +++ b/libbeat/esleg/eslegclient/api_test.go @@ -170,7 +170,9 @@ func TestReadSearchResult_invalid(t *testing.T) { assert.Error(t, err) } -// nolint: unused // it's used by files with the !integration constraint +// newTestConnection creates a new connection for testing +// +//nolint:unused // it's used by files with the !integration constraint func newTestConnection(t *testing.T, url string) *Connection { conn, _ := NewConnection(ConnectionSettings{ URL: url, From fcab79c7c88da4c6faea03ae2bcee847a9aa7250 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 16 Sep 2024 09:32:52 -0400 Subject: [PATCH 07/26] Fix error messages and improve documentation --- libbeat/tests/integration/elasticsearch_test.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/libbeat/tests/integration/elasticsearch_test.go b/libbeat/tests/integration/elasticsearch_test.go index 23737e9ce08..6d8d1a46a08 100644 --- a/libbeat/tests/integration/elasticsearch_test.go +++ b/libbeat/tests/integration/elasticsearch_test.go @@ -116,11 +116,16 @@ func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) { } return true }, - time.Second, time.Millisecond, "first server must be up") + time.Second, time.Millisecond, "mock-es server did not start on '%s'", addr) return &s, mr } +// waitForEventToBePublished waits for at least one event published +// by inspecting the count for `bulk.create.total` in `mr`. Once +// the counter is > 1, waitForEventToBePublished returns. If that +// does not happen within 10min, then the test fails with a call to +// t.Fatal. func waitForEventToBePublished(t *testing.T, mr metrics.Registry) { t.Helper() require.Eventually(t, func() bool { From 4b41bb63f1fa93ddd80cfbebe924acc77825e849 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 16 Sep 2024 16:58:33 -0400 Subject: [PATCH 08/26] Cancel context before replacing it --- libbeat/esleg/eslegclient/connection.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 11b5b8530df..5284c9f288e 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -288,6 +288,10 @@ func (conn *Connection) Connect() error { conn.log = logp.NewLogger("esclientleg") } + // Cancel the old context before replacing it. Because this Connection + // is not goroutine safe, there should be no in-flight request using + // this context, hence it's safe to cancel it. + conn.cancelReqs() conn.reqsContext, conn.cancelReqs = context.WithCancel(context.Background()) if err := conn.getVersion(); err != nil { From d76c0d2ab9a387de06e34cb9d8608fe01406a8a4 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 16 Sep 2024 17:02:08 -0400 Subject: [PATCH 09/26] Improve comments --- libbeat/esleg/eslegclient/connection.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 5284c9f288e..e1e6e30eb85 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -184,9 +184,9 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } - // There are some cases where the connection is created but Connect - // is not called before it's used, so we populate reqsContext and cancelReqs - // here. + // There are some cases where the connection is created and closed + // without being used, so we populate reqsContext and cancelReqs + // here to make the returned Connection always safe to use. reqsContext, cancelReqs := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, From 6cef80075759a936df82ff769939ccaf1667ba63 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 18 Sep 2024 13:09:02 -0400 Subject: [PATCH 10/26] Accept a context on Connect Connection.Connect now accepts a context to control the life cycle of its requests. --- libbeat/esleg/eslegclient/connection.go | 38 +++++++++---------- libbeat/licenser/elastic_fetcher.go | 5 ++- .../monitoring/report/elasticsearch/client.go | 4 +- libbeat/outputs/elasticsearch/client.go | 22 ++++++----- 4 files changed, 37 insertions(+), 32 deletions(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index e1e6e30eb85..80a02edd7fb 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -67,7 +67,6 @@ type Connection struct { // requests will share the same cancellable context // so they can be aborted on Close() reqsContext context.Context - cancelReqs func() } // ConnectionSettings are the settings needed for a Connection @@ -82,7 +81,7 @@ type ConnectionSettings struct { Kerberos *kerberos.Config - OnConnectCallback func() error + OnConnectCallback func(Connection) error Observer transport.IOStatser Parameters map[string]string @@ -184,18 +183,12 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { logger.Info("kerberos client created") } - // There are some cases where the connection is created and closed - // without being used, so we populate reqsContext and cancelReqs - // here to make the returned Connection always safe to use. - reqsContext, cancelReqs := context.WithCancel(context.Background()) conn := Connection{ ConnectionSettings: s, HTTP: esClient, Encoder: encoder, log: logger, responseBuffer: bytes.NewBuffer(nil), - reqsContext: reqsContext, - cancelReqs: cancelReqs, } if s.APIKey != "" { @@ -267,14 +260,20 @@ func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) { errors := []string{} for _, client := range clients { - err = client.Connect() + // client.Connect makes a call to get the ES version, so we create + // a context for those calls. Users of this Connection will have + // to call Connect again. + ctx, cancel := context.WithCancel(context.Background()) + err = client.Connect(ctx) if err != nil { const errMsg = "error connecting to Elasticsearch at %v: %v" client.log.Errorf(errMsg, client.URL, err) err = fmt.Errorf(errMsg, client.URL, err) errors = append(errors, err.Error()) + cancel() continue } + cancel() return &client, nil } return nil, fmt.Errorf("couldn't connect to any of the configured Elasticsearch hosts. Errors: %v", errors) @@ -282,24 +281,22 @@ func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) { // Connect connects the client. It runs a GET request against the root URL of // the configured host, updates the known Elasticsearch version and calls -// globally configured handlers. -func (conn *Connection) Connect() error { +// globally configured handlers. The context is used to control the lifecycle +// of the HTTP requests/connections, the caller is responsible for cancelling +// the context to stop any in-flight requests. +func (conn *Connection) Connect(ctx context.Context) error { if conn.log == nil { conn.log = logp.NewLogger("esclientleg") } - // Cancel the old context before replacing it. Because this Connection - // is not goroutine safe, there should be no in-flight request using - // this context, hence it's safe to cancel it. - conn.cancelReqs() - conn.reqsContext, conn.cancelReqs = context.WithCancel(context.Background()) + conn.reqsContext = ctx if err := conn.getVersion(); err != nil { return err } if conn.OnConnectCallback != nil { - if err := conn.OnConnectCallback(); err != nil { + if err := conn.OnConnectCallback(*conn); err != nil { return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %w", err) } } @@ -333,10 +330,9 @@ func (conn *Connection) Ping() (ESPingData, error) { return response, nil } -// Close closes a connection. +// Close closes any idle connections from the HTTP client. func (conn *Connection) Close() error { conn.HTTP.CloseIdleConnections() - conn.cancelReqs() return nil } @@ -369,7 +365,9 @@ func (conn *Connection) Test(d testing.Driver) { }) } - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) d.Fatal("talk to server", err) version := conn.GetVersion() d.Info("version", version.String()) diff --git a/libbeat/licenser/elastic_fetcher.go b/libbeat/licenser/elastic_fetcher.go index bcbe68a938f..80fc1cc978d 100644 --- a/libbeat/licenser/elastic_fetcher.go +++ b/libbeat/licenser/elastic_fetcher.go @@ -18,6 +18,7 @@ package licenser import ( + "context" "encoding/json" "errors" "fmt" @@ -115,7 +116,9 @@ func (mux *esClientMux) Request( ) (int, []byte, error) { c := mux.clients[mux.idx] - if err := c.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := c.Connect(ctx); err != nil { return 0, nil, err } defer c.Close() diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 56f56ac8e1e..2b0d42dc55e 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -62,7 +62,9 @@ func newPublishClient( func (c *publishClient) Connect() error { c.log.Debug("Monitoring client: connect.") - err := c.es.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := c.es.Connect(ctx) if err != nil { return fmt.Errorf("cannot connect underlying Elasticsearch client: %w", err) } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index e2282ea0eaf..89c190819c2 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -48,11 +48,9 @@ var ( // Client is an elasticsearch client. type Client struct { - // We use a pointer here because the connection holds a context - // that needs to be recreated in case of error and the connection - // that is passed to this client is also used in a closure, we need - // to ensure both hold a reference to the same instance of the connection. - conn *eslegclient.Connection + conn eslegclient.Connection + connCtx context.Context + connCancelCtx context.CancelFunc indexSelector outputs.IndexSelector pipelineSelector *outil.Selector @@ -137,12 +135,12 @@ func NewClient( return nil, err } - conn.OnConnectCallback = func() error { + conn.OnConnectCallback = func(conn eslegclient.Connection) error { globalCallbackRegistry.mutex.Lock() defer globalCallbackRegistry.mutex.Unlock() for _, callback := range globalCallbackRegistry.callbacks { - err := callback(conn) + err := callback(&conn) if err != nil { return err } @@ -153,7 +151,7 @@ func NewClient( defer onConnect.mutex.Unlock() for _, callback := range onConnect.callbacks { - err := callback(conn) + err := callback(&conn) if err != nil { return err } @@ -190,7 +188,7 @@ func NewClient( pLogIndex.Start() pLogIndexTryDeadLetter.Start() client := &Client{ - conn: conn, + conn: *conn, indexSelector: s.indexSelector, pipelineSelector: pipeline, observer: observer, @@ -537,10 +535,14 @@ func (client *Client) applyItemStatus( } func (client *Client) Connect() error { - return client.conn.Connect() + client.connCtx, client.connCancelCtx = context.WithCancel(context.Background()) + return client.conn.Connect(client.connCtx) } func (client *Client) Close() error { + if client.connCancelCtx != nil { + client.connCancelCtx() + } return client.conn.Close() } From 2e74ea4ac16ef13dd5dcf660100f7016d6ae2486 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 18 Sep 2024 16:53:03 -0400 Subject: [PATCH 11/26] Fix python dependencies --- libbeat/tests/system/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/tests/system/requirements.txt b/libbeat/tests/system/requirements.txt index 00f3914d1a4..a42e78a93ad 100644 --- a/libbeat/tests/system/requirements.txt +++ b/libbeat/tests/system/requirements.txt @@ -58,7 +58,7 @@ pytest==7.3.2 pytest-rerunfailures==9.1.1 pytest-timeout==1.4.2 python-dotenv==0.21.1 -PyYAML==6.0.1 +PyYAML<6 redis==4.4.4 semver==2.8.1 six==1.15.0 From 198b01bcf2a339807a22dc52b674a048974dacff Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 19 Sep 2024 10:27:21 -0400 Subject: [PATCH 12/26] Add context to outputs.Connectable interface Add a context to outputs.Connectable.Connect to correctly manage the life cycle of the connection and it's requests. --- filebeat/fileset/pipelines_test.go | 5 ++++- heartbeat/monitors/wrappers/monitorstate/testutil.go | 5 ++++- libbeat/monitoring/report/elasticsearch/client.go | 4 +--- .../monitoring/report/elasticsearch/elasticsearch.go | 5 ++++- libbeat/outputs/backoff.go | 4 ++-- libbeat/outputs/elasticsearch/client.go | 12 +++--------- libbeat/outputs/elasticsearch/client_proxy_test.go | 5 ++++- libbeat/outputs/elasticsearch/client_test.go | 8 ++++++-- libbeat/outputs/failover.go | 4 ++-- libbeat/outputs/logstash/async.go | 4 ++-- libbeat/outputs/logstash/async_test.go | 4 +++- libbeat/outputs/logstash/logstash_test.go | 8 ++++++-- libbeat/outputs/logstash/sync.go | 4 ++-- libbeat/outputs/logstash/sync_test.go | 4 +++- libbeat/outputs/outputs.go | 2 +- libbeat/outputs/redis/backoff.go | 2 +- libbeat/publisher/pipeline/client_worker.go | 10 ++++++---- 17 files changed, 54 insertions(+), 36 deletions(-) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index a358b0da9be..ac6aa5035de 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -20,6 +20,7 @@ package fileset import ( + "context" "net/http" "net/http/httptest" "testing" @@ -101,7 +102,9 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }) require.NoError(t, err) - err = testESClient.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = testESClient.Connect(ctx) require.NoError(t, err) err = testRegistry.LoadPipelines(testESClient, false) diff --git a/heartbeat/monitors/wrappers/monitorstate/testutil.go b/heartbeat/monitors/wrappers/monitorstate/testutil.go index 28a6c260655..be58dcdb924 100644 --- a/heartbeat/monitors/wrappers/monitorstate/testutil.go +++ b/heartbeat/monitors/wrappers/monitorstate/testutil.go @@ -18,6 +18,7 @@ package monitorstate import ( + "context" "encoding/json" "testing" @@ -50,7 +51,9 @@ func IntegES(t *testing.T) (esc *eslegclient.Connection) { conn.Encoder = eslegclient.NewJSONEncoder(nil, false) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) if err != nil { t.Fatal(err) panic(err) // panic in case TestLogger did not stop test diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 2b0d42dc55e..28be1c37917 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -59,11 +59,9 @@ func newPublishClient( return p, nil } -func (c *publishClient) Connect() error { +func (c *publishClient) Connect(ctx context.Context) error { c.log.Debug("Monitoring client: connect.") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() err := c.es.Connect(ctx) if err != nil { return fmt.Errorf("cannot connect underlying Elasticsearch client: %w", err) diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index da3f6135110..61e051d1222 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -18,6 +18,7 @@ package elasticsearch import ( + "context" "errors" "io" "math/rand" @@ -214,8 +215,10 @@ func (r *reporter) initLoop(c config) { for { // Select one configured endpoint by random and check if xpack is available + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() client := r.out[rand.Intn(len(r.out))] - err := client.Connect() + err := client.Connect(ctx) if err == nil { closing(log, client) break diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 3c7f8e51e10..87d94bb66d0 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -45,8 +45,8 @@ func WithBackoff(client NetworkClient, init, max time.Duration) NetworkClient { } } -func (b *backoffClient) Connect() error { - err := b.client.Connect() +func (b *backoffClient) Connect(ctx context.Context) error { + err := b.client.Connect(ctx) backoff.WaitOnError(b.backoff, err) return err } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 89c190819c2..c40fcffc393 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -48,9 +48,7 @@ var ( // Client is an elasticsearch client. type Client struct { - conn eslegclient.Connection - connCtx context.Context - connCancelCtx context.CancelFunc + conn eslegclient.Connection indexSelector outputs.IndexSelector pipelineSelector *outil.Selector @@ -534,15 +532,11 @@ func (client *Client) applyItemStatus( return true } -func (client *Client) Connect() error { - client.connCtx, client.connCancelCtx = context.WithCancel(context.Background()) - return client.conn.Connect(client.connCtx) +func (client *Client) Connect(ctx context.Context) error { + return client.conn.Connect(ctx) } func (client *Client) Close() error { - if client.connCancelCtx != nil { - client.connCancelCtx() - } return client.conn.Close() } diff --git a/libbeat/outputs/elasticsearch/client_proxy_test.go b/libbeat/outputs/elasticsearch/client_proxy_test.go index c2f23f34052..bd6739c3bf0 100644 --- a/libbeat/outputs/elasticsearch/client_proxy_test.go +++ b/libbeat/outputs/elasticsearch/client_proxy_test.go @@ -22,6 +22,7 @@ package elasticsearch import ( "bytes" + "context" "fmt" "net/http" "net/http/httptest" @@ -209,10 +210,12 @@ func doClientPing(t *testing.T) { client, err := NewClient(clientSettings, nil) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // This ping won't succeed; we aren't testing end-to-end communication // (which would require a lot more setup work), we just want to make sure // the client is pointed at the right server or proxy. - _ = client.Connect() + _ = client.Connect(ctx) } // serverState contains the state of the http listeners for proxy tests, diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 5124c0defe9..abda06a02ee 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -748,8 +748,10 @@ func TestClientWithHeaders(t *testing.T) { }, nil) assert.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // simple ping - err = client.Connect() + err = client.Connect(ctx) assert.NoError(t, err) assert.Equal(t, 1, requestCount) @@ -943,11 +945,13 @@ func TestClientWithAPIKey(t *testing.T) { }, nil) assert.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // This connection will fail since the server doesn't return a valid // response. This is fine since we're just testing the headers in the // original client request. //nolint:errcheck // connection doesn't need to succeed - client.Connect() + client.Connect(ctx) assert.Equal(t, "ApiKey aHlva0hHNEJmV2s1dmlLWjE3Mlg6bzQ1SlVreXVTLS15aVNBdXV4bDhVdw==", headers.Get("Authorization")) } diff --git a/libbeat/outputs/failover.go b/libbeat/outputs/failover.go index 3e999e8321f..d69e01b03cc 100644 --- a/libbeat/outputs/failover.go +++ b/libbeat/outputs/failover.go @@ -54,7 +54,7 @@ func NewFailoverClient(clients []NetworkClient) NetworkClient { } } -func (f *failoverClient) Connect() error { +func (f *failoverClient) Connect(ctx context.Context) error { var ( next int active = f.active @@ -82,7 +82,7 @@ func (f *failoverClient) Connect() error { client := f.clients[next] f.active = next - return client.Connect() + return client.Connect(ctx) } func (f *failoverClient) Close() error { diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index b1e20a0e774..a980d1cef32 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -91,7 +91,7 @@ func newAsyncClient( } c.connect = func() error { - err := c.Client.Connect() + err := c.Client.ConnectContext(context.Background()) if err == nil { c.client, err = clientFactory(c.Client) } @@ -116,7 +116,7 @@ func makeClientFactory( } } -func (c *asyncClient) Connect() error { +func (c *asyncClient) Connect(ctx context.Context) error { c.log.Debug("connect") return c.connect() } diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index 6e2a102edf2..12d2edd124c 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -72,6 +72,8 @@ func newAsyncTestDriver(client outputs.NetworkClient) *testAsyncDriver { go func() { defer driver.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { cmd, ok := <-driver.ch if !ok { @@ -82,7 +84,7 @@ func newAsyncTestDriver(client outputs.NetworkClient) *testAsyncDriver { case driverCmdQuit: return case driverCmdConnect: - driver.client.Connect() + driver.client.Connect(ctx) case driverCmdClose: driver.client.Close() case driverCmdPublish: diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index fa1b57fb841..5be2054cf2a 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -116,7 +116,9 @@ func testConnectionType( output := makeOutputer() t.Logf("new outputter: %v", output) - err := output.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := output.Connect(ctx) if err != nil { t.Error("test client failed to connect: ", err) return @@ -186,8 +188,10 @@ func newTestLumberjackOutput( t.Fatalf("init logstash output plugin failed: %v", err) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() client := grp.Clients[0].(outputs.NetworkClient) - if err := client.Connect(); err != nil { + if err := client.Connect(ctx); err != nil { t.Fatalf("Client failed to connected: %v", err) } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index d24ab1ebb97..6a456907365 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -74,9 +74,9 @@ func newSyncClient( return c, nil } -func (c *syncClient) Connect() error { +func (c *syncClient) Connect(ctx context.Context) error { c.log.Debug("connect") - err := c.Client.Connect() + err := c.Client.ConnectContext(ctx) if err != nil { return err } diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index d0410c2a8a7..0d8a3e0f513 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -86,6 +86,8 @@ func newClientTestDriver(client outputs.NetworkClient) *testSyncDriver { go func() { defer driver.wg.Done() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { cmd, ok := <-driver.ch if !ok { @@ -96,7 +98,7 @@ func newClientTestDriver(client outputs.NetworkClient) *testSyncDriver { case driverCmdQuit: return case driverCmdConnect: - driver.client.Connect() + driver.client.Connect(ctx) case driverCmdClose: driver.client.Close() case driverCmdPublish: diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index 0fdf4d9407b..3cfdb5aef66 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -57,5 +57,5 @@ type Connectable interface { // The connection attempt shall report an error if no connection could been // established within the given time interval. A timeout value of 0 == wait // forever. - Connect() error + Connect(context.Context) error } diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go index ef3dcd7cc48..2abc1f846f0 100644 --- a/libbeat/outputs/redis/backoff.go +++ b/libbeat/outputs/redis/backoff.go @@ -60,7 +60,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient { } } -func (b *backoffClient) Connect() error { +func (b *backoffClient) Connect(ctx context.Context) error { err := b.client.Connect() if err != nil { // give the client a chance to promote an internal error to a network error. diff --git a/libbeat/publisher/pipeline/client_worker.go b/libbeat/publisher/pipeline/client_worker.go index e05658d9749..dc51979d033 100644 --- a/libbeat/publisher/pipeline/client_worker.go +++ b/libbeat/publisher/pipeline/client_worker.go @@ -115,6 +115,9 @@ func (w *netClientWorker) run() { reconnectAttempts = 0 ) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { // We wait for either the worker to be closed or for there to be a batch of // events to publish. @@ -139,7 +142,7 @@ func (w *netClientWorker) run() { w.logger.Infof("Attempting to reconnect to %v with %d reconnect attempt(s)", w.client, reconnectAttempts) } - err := w.client.Connect() + err := w.client.Connect(ctx) connected = err == nil if connected { w.logger.Infof("Connection to %v established", w.client) @@ -152,15 +155,14 @@ func (w *netClientWorker) run() { continue } - if err := w.publishBatch(batch); err != nil { + if err := w.publishBatch(ctx, batch); err != nil { connected = false } } } } -func (w *netClientWorker) publishBatch(batch publisher.Batch) error { - ctx := context.Background() +func (w *netClientWorker) publishBatch(ctx context.Context, batch publisher.Batch) error { if w.tracer != nil && w.tracer.Recording() { tx := w.tracer.StartTransaction("publish", "output") defer tx.End() From fc20198b1a031fa248feb408798a01660dd79de1 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 19 Sep 2024 13:10:48 -0400 Subject: [PATCH 13/26] Add contexts when Beats create connections to ES --- filebeat/beater/filebeat.go | 37 ++++++++++++------- heartbeat/beater/heartbeat.go | 15 ++++++-- heartbeat/beater/heartbeat_test.go | 3 +- libbeat/cmd/instance/beat.go | 4 +- libbeat/esleg/eslegclient/api_mock_test.go | 5 ++- libbeat/esleg/eslegclient/api_test.go | 5 ++- libbeat/esleg/eslegclient/connection.go | 8 +--- libbeat/esleg/eslegclient/connection_test.go | 4 +- packetbeat/beater/packetbeat.go | 5 ++- winlogbeat/beater/winlogbeat.go | 4 +- x-pack/winlogbeat/module/testing.go | 5 ++- .../winlogbeat/module/wintest/docker_test.go | 4 +- .../module/wintest/simulate_test.go | 5 ++- 13 files changed, 70 insertions(+), 34 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 9d9cb220d4e..272b36d5212 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -18,6 +18,7 @@ package beater import ( + "context" "flag" "fmt" "path/filepath" @@ -195,14 +196,16 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { overwritePipelines := true b.OverwritePipelinesCallback = func(esConfig *conf.C) error { - esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat") if err != nil { return err } // When running the subcommand setup, configuration from modules.d directories // have to be loaded using cfg.Reloader. Otherwise those configurations are skipped. - pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config()) + pipelineLoaderFactory := newPipelineLoaderFactory(ctx, b.Config.Output.Config()) enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1) forceEnableModuleFilesets, _ := b.BeatConfig.Bool("config.modules.force_enable_module_filesets", -1) filesetOverrides := fileset.FilesetOverrides{ @@ -322,14 +325,6 @@ func (fb *Filebeat) Run(b *beat.Beat) error { outDone := make(chan struct{}) // outDone closes down all active pipeline connections pipelineConnector := channel.NewOutletFactory(outDone).Create - // Create a ES connection factory for dynamic modules pipeline loading - var pipelineLoaderFactory fileset.PipelineLoaderFactory - if b.Config.Output.Name() == "elasticsearch" { - pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config()) - } else { - logp.Warn(pipelinesWarning) - } - inputsLogger := logp.NewLogger("input") v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore) v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType) @@ -350,8 +345,22 @@ func (fb *Filebeat) Run(b *beat.Beat) error { compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader), input.NewRunnerFactory(pipelineConnector, registrar, fb.done), )) - moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) + // Create a ES connection factory for dynamic modules pipeline loading + var pipelineLoaderFactory fileset.PipelineLoaderFactory + // The pipelineFactory needs a context to control the connections to ES, + // when the pipelineFactory/ESClient are not needed any more the context + // must be cancelled. This pipeline factory will be used by the moduleLoader + // that is run by a crawler, whenever this crawler is stopped we also cancel + // the context. + pipelineFactoryCtx, cancelPipelineFactoryCtx := context.WithCancel(context.Background()) + defer cancelPipelineFactoryCtx() + if b.Config.Output.Name() == "elasticsearch" { + pipelineLoaderFactory = newPipelineLoaderFactory(pipelineFactoryCtx, b.Config.Output.Config()) + } else { + logp.Warn(pipelinesWarning) + } + moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) @@ -389,6 +398,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules) if err != nil { crawler.Stop() + cancelPipelineFactoryCtx() return fmt.Errorf("Failed to start crawler: %w", err) } @@ -444,6 +454,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { modules.Stop() adiscover.Stop() crawler.Stop() + cancelPipelineFactoryCtx() timeout := fb.config.ShutdownTimeout // Checks if on shutdown it should wait for all events to be published @@ -487,9 +498,9 @@ func (fb *Filebeat) Stop() { } // Create a new pipeline loader (es client) factory -func newPipelineLoaderFactory(esConfig *conf.C) fileset.PipelineLoaderFactory { +func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.PipelineLoaderFactory { pipelineLoaderFactory := func() (fileset.PipelineLoader, error) { - esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat") + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat") if err != nil { return nil, fmt.Errorf("Error creating Elasticsearch client: %w", err) } diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 9a849f6bc7e..91508dcef75 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -88,7 +88,11 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() { // Connect to ES and setup the State loader if the output is not managed by agent // Note this, intentionally, blocks until connected or max attempts reached - esClient, err := makeESClient(b.Config.Output.Config(), 3, 2*time.Second) + // TODO(Tiago): I believe this cannot be cancelled here, but all tests are passing + // so I need to fund out the correct life cycle for this connection/context + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + esClient, err := makeESClient(ctx, b.Config.Output.Config(), 3, 2*time.Second) if err != nil { if parsedConfig.RunOnce { trace.Abort() @@ -275,7 +279,10 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { } // Backoff panics with 0 duration, set to smallest unit - esClient, err := makeESClient(outCfg.Config(), 1, 1*time.Nanosecond) + // TODO(Tiago): find out the correct lifecycle for this context/connection + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + esClient, err := makeESClient(ctx, outCfg.Config(), 1, 1*time.Nanosecond) if err != nil { logp.L().Warnf("skipping monitor state management during managed reload: %w", err) } else { @@ -324,7 +331,7 @@ func (bt *Heartbeat) Stop() { } // makeESClient establishes an ES connection meant to load monitors' state -func makeESClient(cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) { +func makeESClient(ctx context.Context, cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.Connection, error) { var ( esClient *eslegclient.Connection err error @@ -353,7 +360,7 @@ func makeESClient(cfg *conf.C, attempts int, wait time.Duration) (*eslegclient.C } for i := 0; i < attempts; i++ { - esClient, err = eslegclient.NewConnectedClient(newCfg, "Heartbeat") + esClient, err = eslegclient.NewConnectedClient(ctx, newCfg, "Heartbeat") if err == nil { connectDelay.Reset() return esClient, nil diff --git a/heartbeat/beater/heartbeat_test.go b/heartbeat/beater/heartbeat_test.go index 669811dc4c8..279366a0e7e 100644 --- a/heartbeat/beater/heartbeat_test.go +++ b/heartbeat/beater/heartbeat_test.go @@ -18,6 +18,7 @@ package beater import ( + "context" "testing" "time" @@ -39,7 +40,7 @@ func TestMakeESClient(t *testing.T) { anyAttempt := 1 anyDuration := 1 * time.Second - _, _ = makeESClient(origCfg, anyAttempt, anyDuration) + _, _ = makeESClient(context.Background(), origCfg, anyAttempt, anyDuration) timeout, err := origCfg.Int("timeout", -1) require.NoError(t, err) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 1a6250fad4d..458dbb49f31 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -898,7 +898,9 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er if !isElasticsearchOutput(outCfg.Name()) { return fmt.Errorf("index management requested but the Elasticsearch output is not configured/enabled") } - esClient, err := eslegclient.NewConnectedClient(outCfg.Config(), b.Info.Beat) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, outCfg.Config(), b.Info.Beat) if err != nil { return err } diff --git a/libbeat/esleg/eslegclient/api_mock_test.go b/libbeat/esleg/eslegclient/api_mock_test.go index bcb0d7a03c1..231ee437800 100644 --- a/libbeat/esleg/eslegclient/api_mock_test.go +++ b/libbeat/esleg/eslegclient/api_mock_test.go @@ -20,6 +20,7 @@ package eslegclient import ( + "context" "encoding/json" "fmt" "net/http" @@ -90,7 +91,9 @@ func TestOneHost500Resp(t *testing.T) { server := ElasticsearchMock(http.StatusInternalServerError, []byte("Something wrong happened")) client := newTestConnection(t, server.URL) - err := client.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err := client.Connect(ctx) if err != nil { t.Fatalf("Failed to connect: %v", err) } diff --git a/libbeat/esleg/eslegclient/api_test.go b/libbeat/esleg/eslegclient/api_test.go index d0d3c94a6f4..0bd0f5341b5 100644 --- a/libbeat/esleg/eslegclient/api_test.go +++ b/libbeat/esleg/eslegclient/api_test.go @@ -19,6 +19,7 @@ package eslegclient import ( + "context" "encoding/json" "testing" @@ -178,7 +179,9 @@ func newTestConnection(t *testing.T, url string) *Connection { URL: url, }) conn.Encoder = NewJSONEncoder(nil, false) - if err := conn.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := conn.Connect(ctx); err != nil { t.Fatalf("cannot connect to Elasticsearch: %s", err) } diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 80a02edd7fb..72833c40b62 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -251,7 +251,7 @@ func NewClients(cfg *cfg.C, beatname string) ([]Connection, error) { } // NewConnectedClient returns a non-thread-safe connection. Make sure for each goroutine you initialize a new connection. -func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) { +func NewConnectedClient(ctx context.Context, cfg *cfg.C, beatname string) (*Connection, error) { clients, err := NewClients(cfg, beatname) if err != nil { return nil, err @@ -260,20 +260,14 @@ func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) { errors := []string{} for _, client := range clients { - // client.Connect makes a call to get the ES version, so we create - // a context for those calls. Users of this Connection will have - // to call Connect again. - ctx, cancel := context.WithCancel(context.Background()) err = client.Connect(ctx) if err != nil { const errMsg = "error connecting to Elasticsearch at %v: %v" client.log.Errorf(errMsg, client.URL, err) err = fmt.Errorf(errMsg, client.URL, err) errors = append(errors, err.Error()) - cancel() continue } - cancel() return &client, nil } return nil, fmt.Errorf("couldn't connect to any of the configured Elasticsearch hosts. Errors: %v", errors) diff --git a/libbeat/esleg/eslegclient/connection_test.go b/libbeat/esleg/eslegclient/connection_test.go index 19fe67e9f55..77cbcdda674 100644 --- a/libbeat/esleg/eslegclient/connection_test.go +++ b/libbeat/esleg/eslegclient/connection_test.go @@ -162,7 +162,9 @@ func TestUserAgentHeader(t *testing.T) { testCase.connSettings.URL = server.URL conn, err := NewConnection(testCase.connSettings) require.NoError(t, err) - require.NoError(t, conn.Connect(), "conn.Connect must not return an error") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, conn.Connect(ctx), "conn.Connect must not return an error") }) } } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 6495a733379..e12573f8406 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -18,6 +18,7 @@ package beater import ( + "context" "flag" "fmt" "sync" @@ -111,7 +112,9 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { } overwritePipelines = config.OverwritePipelines b.OverwritePipelinesCallback = func(esConfig *conf.C) error { - esClient, err := eslegclient.NewConnectedClient(esConfig, "Packetbeat") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Packetbeat") if err != nil { return err } diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index e41aa54cb7f..4e6b2b3657d 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -108,7 +108,9 @@ func (eb *Winlogbeat) init(b *beat.Beat) error { } b.OverwritePipelinesCallback = func(esConfig *conf.C) error { overwritePipelines := config.OverwritePipelines - esClient, err := eslegclient.NewConnectedClient(esConfig, "Winlogbeat") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Winlogbeat") if err != nil { return err } diff --git a/x-pack/winlogbeat/module/testing.go b/x-pack/winlogbeat/module/testing.go index 3dc628b80a9..f1d38fceac8 100644 --- a/x-pack/winlogbeat/module/testing.go +++ b/x-pack/winlogbeat/module/testing.go @@ -5,6 +5,7 @@ package module import ( + "context" "encoding/json" "flag" "fmt" @@ -105,7 +106,9 @@ func testIngestPipeline(t *testing.T, pipeline, pattern string, p *params) { } defer conn.Close() - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) if err != nil { t.Fatalf("unexpected error making connection: %v", err) } diff --git a/x-pack/winlogbeat/module/wintest/docker_test.go b/x-pack/winlogbeat/module/wintest/docker_test.go index e45826f3b08..db7ab341a27 100644 --- a/x-pack/winlogbeat/module/wintest/docker_test.go +++ b/x-pack/winlogbeat/module/wintest/docker_test.go @@ -82,7 +82,9 @@ func TestDocker(t *testing.T) { } defer conn.Close() - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) if err != nil { t.Fatalf("unexpected error making connection: %v", err) } diff --git a/x-pack/winlogbeat/module/wintest/simulate_test.go b/x-pack/winlogbeat/module/wintest/simulate_test.go index 1bda1d5fb17..b54d12f1d96 100644 --- a/x-pack/winlogbeat/module/wintest/simulate_test.go +++ b/x-pack/winlogbeat/module/wintest/simulate_test.go @@ -11,6 +11,7 @@ package wintest_test import ( + "context" "encoding/json" "fmt" "os" @@ -72,7 +73,9 @@ func TestSimulate(t *testing.T) { } defer conn.Close() - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = conn.Connect(ctx) if err != nil { t.Fatalf("unexpected error making connection: %v", err) } From 3ca5b9c598d7ebd49b53658e5e3fa96b115fabce Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 20 Sep 2024 09:47:35 -0400 Subject: [PATCH 14/26] update tests --- filebeat/fileset/modules_integration_test.go | 5 ++++- libbeat/esleg/eslegtest/util.go | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 0d5ad2172c0..ffb149e53b3 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -20,6 +20,7 @@ package fileset import ( + "context" "encoding/json" "path/filepath" "testing" @@ -268,7 +269,9 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { conn.Encoder = eslegclient.NewJSONEncoder(nil, false) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) if err != nil { t.Fatal(err) panic(err) // panic in case TestLogger did not stop test diff --git a/libbeat/esleg/eslegtest/util.go b/libbeat/esleg/eslegtest/util.go index 28f33fde2dc..adabdf139e6 100644 --- a/libbeat/esleg/eslegtest/util.go +++ b/libbeat/esleg/eslegtest/util.go @@ -32,6 +32,7 @@ const ( // TestLogger is used to report fatal errors to the testing framework. type TestLogger interface { Fatal(args ...interface{}) + Cleanup(f func()) } // Connectable defines the minimum interface required to initialize a connected From 5b962d238f0cf5d3a6e7c6ed88b9fa386727bfc2 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 20 Sep 2024 10:38:19 -0400 Subject: [PATCH 15/26] Revert PyYAML changes to fix x-pack/auditbeat tests --- libbeat/tests/system/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/tests/system/requirements.txt b/libbeat/tests/system/requirements.txt index a42e78a93ad..00f3914d1a4 100644 --- a/libbeat/tests/system/requirements.txt +++ b/libbeat/tests/system/requirements.txt @@ -58,7 +58,7 @@ pytest==7.3.2 pytest-rerunfailures==9.1.1 pytest-timeout==1.4.2 python-dotenv==0.21.1 -PyYAML<6 +PyYAML==6.0.1 redis==4.4.4 semver==2.8.1 six==1.15.0 From 459b8693b029a9e702c803a31de9b09193996d5e Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 27 Sep 2024 16:45:23 -0400 Subject: [PATCH 16/26] Update tests to Connect with context --- .../eslegclient/connection_integration_test.go | 14 +++++++++----- libbeat/esleg/eslegtest/util.go | 7 +++++-- libbeat/licenser/elastic_fetcher_test.go | 5 +++++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/libbeat/esleg/eslegclient/connection_integration_test.go b/libbeat/esleg/eslegclient/connection_integration_test.go index 019df0563d9..b56360b4232 100644 --- a/libbeat/esleg/eslegclient/connection_integration_test.go +++ b/libbeat/esleg/eslegclient/connection_integration_test.go @@ -39,15 +39,17 @@ import ( func TestConnect(t *testing.T) { conn := getTestingElasticsearch(t) - err := conn.Connect() + err := conn.Connect(context.Background()) assert.NoError(t, err) } func TestConnectionCanBeClosedAndReused(t *testing.T) { conn := getTestingElasticsearch(t) - assert.NoError(t, conn.Connect(), "first connect must succeed") + ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, conn.Connect(ctx), "first connect must succeed") assert.NoError(t, conn.Close(), "close must succeed") - assert.NoError(t, conn.Connect(), "calling connect after close must succeed") + cancel() + assert.NoError(t, conn.Connect(context.Background()), "calling connect after close must succeed") } func TestConnectWithProxy(t *testing.T) { @@ -71,7 +73,9 @@ func TestConnectWithProxy(t *testing.T) { "timeout": 5, // seconds }) require.NoError(t, err) - assert.Error(t, client.Connect(), "it should fail without proxy") + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + assert.Error(t, client.Connect(ctx), "it should fail without proxy") client, err = connectTestEs(t, map[string]interface{}{ "hosts": "http://" + wrongPort.Addr().String(), @@ -79,7 +83,7 @@ func TestConnectWithProxy(t *testing.T) { "timeout": 5, // seconds }) require.NoError(t, err) - assert.NoError(t, client.Connect()) + assert.NoError(t, client.Connect(ctx)) } func connectTestEs(t *testing.T, cfg interface{}) (*Connection, error) { diff --git a/libbeat/esleg/eslegtest/util.go b/libbeat/esleg/eslegtest/util.go index adabdf139e6..e86ca14363d 100644 --- a/libbeat/esleg/eslegtest/util.go +++ b/libbeat/esleg/eslegtest/util.go @@ -18,6 +18,7 @@ package eslegtest import ( + "context" "fmt" "os" ) @@ -38,15 +39,17 @@ type TestLogger interface { // Connectable defines the minimum interface required to initialize a connected // client. type Connectable interface { - Connect() error + Connect(context.Context) error } // InitConnection initializes a new connection if the no error value from creating the // connection instance is reported. // The test logger will be used if an error is found. func InitConnection(t TestLogger, conn Connectable, err error) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) if err == nil { - err = conn.Connect() + err = conn.Connect(ctx) } if err != nil { diff --git a/libbeat/licenser/elastic_fetcher_test.go b/libbeat/licenser/elastic_fetcher_test.go index 731bf5c0618..c3148041d2c 100644 --- a/libbeat/licenser/elastic_fetcher_test.go +++ b/libbeat/licenser/elastic_fetcher_test.go @@ -18,6 +18,7 @@ package licenser import ( + "context" "io/ioutil" "net/http" "net/http/httptest" @@ -43,6 +44,10 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv t.Fatalf("could not create the elasticsearch client, error: %s", err) } + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + client.Connect(ctx) + return server, client } From eee35b7ec4b701cd67b70aca44a102efabe8f3f1 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 27 Sep 2024 17:04:29 -0400 Subject: [PATCH 17/26] Fix lint warnings --- libbeat/licenser/elastic_fetcher.go | 19 ++----------------- libbeat/licenser/elastic_fetcher_test.go | 17 +++++++++-------- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/libbeat/licenser/elastic_fetcher.go b/libbeat/licenser/elastic_fetcher.go index 80fc1cc978d..1f869d61fef 100644 --- a/libbeat/licenser/elastic_fetcher.go +++ b/libbeat/licenser/elastic_fetcher.go @@ -22,7 +22,6 @@ import ( "encoding/json" "errors" "fmt" - "math/rand" "net/http" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" @@ -99,6 +98,7 @@ func (f *ElasticFetcher) parseJSON(b []byte) (License, error) { // esClientMux is taking care of round robin request over an array of elasticsearch client, note that // calling request is not threadsafe. +// nolint: unused // it's used on Linux type esClientMux struct { clients []eslegclient.Connection idx int @@ -108,6 +108,7 @@ type esClientMux struct { // at the end of the function call, if an error occur we return the error and will pick up the next client on the // next call. Not that we just round robin between hosts, any backoff strategy should be handled by // the consumer of this type. +// nolint: unused // it's used on Linux func (mux *esClientMux) Request( method, path string, pipeline string, @@ -130,19 +131,3 @@ func (mux *esClientMux) Request( } return status, response, err } - -// newESClientMux takes a list of clients and randomize where we start and the list of host we are -// querying. -func newESClientMux(clients []eslegclient.Connection) *esClientMux { - // randomize where we start - idx := rand.Intn(len(clients)) - - // randomize the list of round robin hosts. - tmp := make([]eslegclient.Connection, len(clients)) - copy(tmp, clients) - rand.Shuffle(len(tmp), func(i, j int) { - tmp[i], tmp[j] = tmp[j], tmp[i] - }) - - return &esClientMux{idx: idx, clients: tmp} -} diff --git a/libbeat/licenser/elastic_fetcher_test.go b/libbeat/licenser/elastic_fetcher_test.go index c3148041d2c..e53befba3dc 100644 --- a/libbeat/licenser/elastic_fetcher_test.go +++ b/libbeat/licenser/elastic_fetcher_test.go @@ -19,7 +19,6 @@ package licenser import ( "context" - "io/ioutil" "net/http" "net/http/httptest" "os" @@ -33,7 +32,7 @@ import ( func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *eslegclient.Connection) { mux := http.NewServeMux() - mux.Handle("/_license/", http.HandlerFunc(handler)) + mux.Handle("/_license/", handler) server := httptest.NewServer(mux) @@ -46,7 +45,9 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - client.Connect(ctx) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to ES: %s", err) + } return server, client } @@ -54,7 +55,7 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv func TestParseJSON(t *testing.T) { t.Run("OSS release of Elasticsearch (Code: 405)", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "Method Not Allowed", 405) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) } s, c := newServerClientPair(t, h) defer s.Close() @@ -80,7 +81,7 @@ func TestParseJSON(t *testing.T) { t.Run("malformed JSON", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte("hello bad JSON")) + _, _ = w.Write([]byte("hello bad JSON")) } s, c := newServerClientPair(t, h) defer s.Close() @@ -93,7 +94,7 @@ func TestParseJSON(t *testing.T) { t.Run("401 response", func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - http.Error(w, "Unauthorized", 401) + http.Error(w, "Unauthorized", http.StatusUnauthorized) } s, c := newServerClientPair(t, h) defer s.Close() @@ -118,14 +119,14 @@ func TestParseJSON(t *testing.T) { }) t.Run("200 response", func(t *testing.T) { - filepath.Walk("testdata/", func(path string, i os.FileInfo, err error) error { + _ = filepath.Walk("testdata/", func(path string, i os.FileInfo, err error) error { if i.IsDir() { return nil } t.Run(path, func(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { - json, err := ioutil.ReadFile(path) + json, err := os.ReadFile(path) if err != nil { t.Fatal("could not read JSON") } From ab39277c132b54d4b8672d451cd6f456af169abd Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 30 Sep 2024 15:38:00 -0400 Subject: [PATCH 18/26] Fix tests --- libbeat/outputs/elasticsearch/client_integration_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 765fd3eec5a..f4fb0e4f9a9 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -429,8 +429,12 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu } client := randomClient(output).(clientWrap).Client().(*Client) - // Load version number - _ = client.Connect() + // Load version ctx + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to ES: %s", err) + } return client, client } From 5f6edccaee55fd192346cdf5fcb5bd33891310e7 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 2 Oct 2024 08:40:01 -0400 Subject: [PATCH 19/26] Fix tests --- libbeat/licenser/elastic_fetcher_test.go | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/libbeat/licenser/elastic_fetcher_test.go b/libbeat/licenser/elastic_fetcher_test.go index e53befba3dc..82ca7e47ca2 100644 --- a/libbeat/licenser/elastic_fetcher_test.go +++ b/libbeat/licenser/elastic_fetcher_test.go @@ -19,6 +19,7 @@ package licenser import ( "context" + "fmt" "net/http" "net/http/httptest" "os" @@ -26,15 +27,41 @@ import ( "testing" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/version" "github.com/stretchr/testify/assert" ) +func esRootHandler(w http.ResponseWriter, r *http.Request) { + respStr := fmt.Sprintf(` +{ + "name" : "582a64c35c16", + "cluster_name" : "docker-cluster", + "cluster_uuid" : "fnanWPBeSNS9KZ930Z5JmA", + "version" : { + "number" : "%s", + "build_flavor" : "default", + "build_type" : "docker", + "build_hash" : "14b7170921f2f0e4109255b83cb9af175385d87f", + "build_date" : "2024-08-23T00:26:58.284513650Z", + "build_snapshot" : true, + "lucene_version" : "9.11.1", + "minimum_wire_compatibility_version" : "7.17.0", + "minimum_index_compatibility_version" : "7.0.0" + }, + "tagline" : "You Know, for Search" +}`, version.GetDefaultVersion()) + + w.Write([]byte(respStr)) +} + func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Server, *eslegclient.Connection) { mux := http.NewServeMux() + mux.Handle("/", http.HandlerFunc(esRootHandler)) mux.Handle("/_license/", handler) server := httptest.NewServer(mux) + t.Cleanup(server.Close) client, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ URL: server.URL, From 09c7581c6d7388d13652e263d8f85cc39a2ac742 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 2 Oct 2024 17:03:51 -0400 Subject: [PATCH 20/26] Fix tests --- .../lifecycle/client_handler_integration_test.go | 5 ++++- .../licenser/elastic_fetcher_integration_test.go | 15 +++++++++++---- .../outputs/logstash/logstash_integration_test.go | 9 ++++++++- libbeat/outputs/redis/redis_integration_test.go | 4 +++- libbeat/template/load_integration_test.go | 13 ++++++++++--- 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go b/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go index 67b9a1cfb06..6f81bf98a02 100644 --- a/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go +++ b/libbeat/idxmgmt/lifecycle/client_handler_integration_test.go @@ -20,6 +20,7 @@ package lifecycle import ( + "context" "fmt" "os" "testing" @@ -141,7 +142,9 @@ func newRawESClient(t *testing.T) ESClient { t.Fatal(err) } - if err := client.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { t.Fatalf("Failed to connect to Test Elasticsearch instance: %v", err) } diff --git a/libbeat/licenser/elastic_fetcher_integration_test.go b/libbeat/licenser/elastic_fetcher_integration_test.go index f303bfe0d8c..7560ebb394d 100644 --- a/libbeat/licenser/elastic_fetcher_integration_test.go +++ b/libbeat/licenser/elastic_fetcher_integration_test.go @@ -20,6 +20,7 @@ package licenser import ( + "context" "testing" "time" @@ -35,7 +36,7 @@ const ( elasticsearchPort = "9200" ) -func getTestClient() *eslegclient.Connection { +func getTestClient(t *testing.T) *eslegclient.Connection { transport := httpcommon.DefaultHTTPTransportSettings() transport.Timeout = 60 * time.Second @@ -47,16 +48,22 @@ func getTestClient() *eslegclient.Connection { CompressionLevel: 3, Transport: transport, }) - if err != nil { - panic(err) + t.Fatalf("cannot get new ES connection: %s", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to ES: %s", err) } + return client } // Sanity check for schema change on the HTTP response from a live Elasticsearch instance. func TestElasticsearch(t *testing.T) { - f := NewElasticFetcher(getTestClient()) + f := NewElasticFetcher(getTestClient(t)) license, err := f.Fetch() if !assert.NoError(t, err) { return diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 442145835df..286717e49ed 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -115,6 +115,11 @@ func esConnect(t *testing.T, index string) *esConnection { Password: password, Transport: transport, }) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { + t.Fatalf("cannot connect to LS: %s:", err) + } if err != nil { t.Fatal(err) } @@ -207,7 +212,9 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { // The Elasticsearch output requires events to be encoded // before calling Publish, so create an event encoder. es.encoder = grp.EncoderFactory() - es.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + es.Connect(ctx) return es } diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index dfd48dc75d2..6fd3e09397a 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -336,7 +336,9 @@ func newRedisTestingOutput(t *testing.T, cfg map[string]interface{}) outputs.Cli } client := out.Clients[0].(outputs.NetworkClient) - if err := client.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { t.Fatalf("Failed to connect to redis host: %v", err) } diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index b3aafad5d69..4705f9be5a8 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -20,6 +20,7 @@ package template import ( + "context" "encoding/json" "fmt" "io/ioutil" @@ -66,7 +67,9 @@ func newTestSetup(t *testing.T, cfg TemplateConfig) *testSetup { cfg.Name = fmt.Sprintf("load-test-%+v", rand.Int()) } client := getTestingElasticsearch(t) - if err := client.Connect(); err != nil { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + if err := client.Connect(ctx); err != nil { t.Fatal(err) } handler := &mockClientHandler{serverless: false, mode: lifecycle.ILM} @@ -554,7 +557,9 @@ func getTestingElasticsearch(t eslegtest.TestLogger) *eslegclient.Connection { conn.Encoder = eslegclient.NewJSONEncoder(nil, false) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) if err != nil { t.Fatal(err) panic(err) // panic in case TestLogger did not stop test @@ -586,7 +591,9 @@ func getMockElasticsearchClient(t *testing.T, method, endpoint string, code int, Transport: httpcommon.DefaultHTTPTransportSettings(), }) require.NoError(t, err) - err = conn.Connect() + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + err = conn.Connect(ctx) require.NoError(t, err) return conn } From 8a56f6e7b0ac9a395b0b4d4cff80e7ec1e12ebaa Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Thu, 3 Oct 2024 08:13:06 -0400 Subject: [PATCH 21/26] Fix more tests --- libbeat/publisher/pipeline/testing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/testing.go b/libbeat/publisher/pipeline/testing.go index ca357646a81..61977377a75 100644 --- a/libbeat/publisher/pipeline/testing.go +++ b/libbeat/publisher/pipeline/testing.go @@ -54,7 +54,7 @@ type mockNetworkClient struct { outputs.Client } -func (c *mockNetworkClient) Connect() error { return nil } +func (c *mockNetworkClient) Connect(_ context.Context) error { return nil } type mockBatch struct { mu sync.Mutex From 5f0432308d2eea9a604ac8dde99a1fb81b2c2d68 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 8 Oct 2024 10:23:48 -0400 Subject: [PATCH 22/26] Use pointer for OnConnectCallback --- libbeat/esleg/eslegclient/connection.go | 4 ++-- libbeat/outputs/elasticsearch/client.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index 72833c40b62..310aa853e34 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -81,7 +81,7 @@ type ConnectionSettings struct { Kerberos *kerberos.Config - OnConnectCallback func(Connection) error + OnConnectCallback func(*Connection) error Observer transport.IOStatser Parameters map[string]string @@ -290,7 +290,7 @@ func (conn *Connection) Connect(ctx context.Context) error { } if conn.OnConnectCallback != nil { - if err := conn.OnConnectCallback(*conn); err != nil { + if err := conn.OnConnectCallback(conn); err != nil { return fmt.Errorf("Connection marked as failed because the onConnect callback failed: %w", err) } } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index c40fcffc393..56f28cdbf30 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -133,12 +133,12 @@ func NewClient( return nil, err } - conn.OnConnectCallback = func(conn eslegclient.Connection) error { + conn.OnConnectCallback = func(conn *eslegclient.Connection) error { globalCallbackRegistry.mutex.Lock() defer globalCallbackRegistry.mutex.Unlock() for _, callback := range globalCallbackRegistry.callbacks { - err := callback(&conn) + err := callback(conn) if err != nil { return err } @@ -149,7 +149,7 @@ func NewClient( defer onConnect.mutex.Unlock() for _, callback := range onConnect.callbacks { - err := callback(&conn) + err := callback(conn) if err != nil { return err } From f03154bb462c93e0771d78617ce176f4db493027 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 14 Oct 2024 15:41:29 -0400 Subject: [PATCH 23/26] Use context to client worker cancellation --- libbeat/publisher/pipeline/client_worker.go | 28 ++++++++++----------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/libbeat/publisher/pipeline/client_worker.go b/libbeat/publisher/pipeline/client_worker.go index dc51979d033..3e6b8202dd2 100644 --- a/libbeat/publisher/pipeline/client_worker.go +++ b/libbeat/publisher/pipeline/client_worker.go @@ -29,8 +29,8 @@ import ( ) type worker struct { - qu chan publisher.Batch - done chan struct{} + qu chan publisher.Batch + cancel func() } // clientWorker manages output client of type outputs.Client, not supporting reconnect. @@ -50,14 +50,15 @@ type netClientWorker struct { } func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger logger, tracer *apm.Tracer) outputWorker { + ctx, cancel := context.WithCancel(context.Background()) w := worker{ - qu: qu, - done: make(chan struct{}), + qu: qu, + cancel: cancel, } var c interface { outputWorker - run() + run(context.Context) } if nc, ok := client.(outputs.NetworkClient); ok { @@ -71,12 +72,12 @@ func makeClientWorker(qu chan publisher.Batch, client outputs.Client, logger log c = &clientWorker{worker: w, client: client} } - go c.run() + go c.run(ctx) return c } func (w *worker) close() { - close(w.done) + w.cancel() } func (w *clientWorker) Close() error { @@ -84,20 +85,20 @@ func (w *clientWorker) Close() error { return w.client.Close() } -func (w *clientWorker) run() { +func (w *clientWorker) run(ctx context.Context) { for { // We wait for either the worker to be closed or for there to be a batch of // events to publish. select { - case <-w.done: + case <-ctx.Done(): return case batch := <-w.qu: if batch == nil { continue } - if err := w.client.Publish(context.TODO(), batch); err != nil { + if err := w.client.Publish(ctx, batch); err != nil { return } } @@ -109,21 +110,18 @@ func (w *netClientWorker) Close() error { return w.client.Close() } -func (w *netClientWorker) run() { +func (w *netClientWorker) run(ctx context.Context) { var ( connected = false reconnectAttempts = 0 ) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - for { // We wait for either the worker to be closed or for there to be a batch of // events to publish. select { - case <-w.done: + case <-ctx.Done(): return case batch := <-w.qu: From f3502a3e4e53c4e44ff592c39c2419a23d22efae Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 18 Oct 2024 11:57:14 -0400 Subject: [PATCH 24/26] Address PR comments --- filebeat/beater/filebeat.go | 2 +- heartbeat/beater/heartbeat.go | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 272b36d5212..815b6fabfde 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -196,7 +196,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { overwritePipelines := true b.OverwritePipelinesCallback = func(esConfig *conf.C) error { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() esClient, err := eslegclient.NewConnectedClient(ctx, esConfig, "Filebeat") if err != nil { diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 91508dcef75..e22503ed741 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -88,11 +88,7 @@ func New(b *beat.Beat, rawConfig *conf.C) (beat.Beater, error) { if b.Config.Output.Name() == "elasticsearch" && !b.Manager.Enabled() { // Connect to ES and setup the State loader if the output is not managed by agent // Note this, intentionally, blocks until connected or max attempts reached - // TODO(Tiago): I believe this cannot be cancelled here, but all tests are passing - // so I need to fund out the correct life cycle for this connection/context - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - esClient, err := makeESClient(ctx, b.Config.Output.Config(), 3, 2*time.Second) + esClient, err := makeESClient(context.TODO(), b.Config.Output.Config(), 3, 2*time.Second) if err != nil { if parsedConfig.RunOnce { trace.Abort() From 5e09644af3458df377be6ff84cd77f4eb9bbd283 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Fri, 18 Oct 2024 12:09:23 -0400 Subject: [PATCH 25/26] PR improvements --- CHANGELOG.next.asciidoc | 1 + heartbeat/beater/heartbeat.go | 5 +---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9804c8d929a..9eb7d746718 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] - The journald input now restarts if there is an error/crash {issue}32782[32782] {pull}40558[40558] - Fix Elasticsearch output not recovering from network errors {issue}40705[40705] {pull}40794[40794] +- Ensure Elasticsearch output can always recover from network errors {pull}40794[40794] *Auditbeat* diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index e22503ed741..227b375ee90 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -275,10 +275,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { } // Backoff panics with 0 duration, set to smallest unit - // TODO(Tiago): find out the correct lifecycle for this context/connection - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - esClient, err := makeESClient(ctx, outCfg.Config(), 1, 1*time.Nanosecond) + esClient, err := makeESClient(context.TODO(), outCfg.Config(), 1, 1*time.Nanosecond) if err != nil { logp.L().Warnf("skipping monitor state management during managed reload: %w", err) } else { From f79c8a63792bcc81a1f597e2008508f8345d12c7 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 22 Oct 2024 10:53:05 -0400 Subject: [PATCH 26/26] fix merge conflicts --- CHANGELOG.next.asciidoc | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9eb7d746718..d05e6fb5d3d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -108,20 +108,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] - Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356] -- Fix the paths in the .cmd script added to the path by the Windows MSI to point to the new C:\Program Files installation location. https://github.com/elastic/elastic-stack-installers/pull/238 -- Change cache processor documentation from `write_period` to `write_interval`. {pull}38561[38561] -- Fix cache processor expiries heap cleanup on partial file writes. {pull}38561[38561] -- Fix cache processor expiries infinite growth when large a large TTL is used and recurring keys are cached. {pull}38561[38561] -- Fix parsing of RFC 3164 process IDs in syslog processor. {issue}38947[38947] {pull}38982[38982] -- Rename the field "apache2.module.error" to "apache.module.error" in Apache error visualization. {issue}39480[39480] {pull}39481[39481] -- Validate config of the `replace` processor {pull}40047[40047] -- Allow port number 0 in the community ID flowhash processor {pull}40259[40259] -- Fix handling of escaped brackets in syslog structured data. {issue}40445[40445] {pull}40446[40446] -- Update Go version to 1.22.6. {pull}40528[40528] -- Aborts all active connections for Elasticsearch output. {pull}40572[40572] -- Closes beat Publisher on beat stop and by the Agent manager. {pull}40572[40572] -- The journald input now restarts if there is an error/crash {issue}32782[32782] {pull}40558[40558] -- Fix Elasticsearch output not recovering from network errors {issue}40705[40705] {pull}40794[40794] - Ensure Elasticsearch output can always recover from network errors {pull}40794[40794] *Auditbeat*