Skip to content

Commit

Permalink
Improve the esleg bulk response parsing (#36275)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsapran authored Aug 23, 2023
1 parent dde4079 commit 1a7e9aa
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 44 deletions.
54 changes: 27 additions & 27 deletions libbeat/esleg/eslegclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func readCountResult(obj []byte) (*CountResults, error) {
// searchable. In case id is empty, a new id is created over a HTTP POST request.
// Otherwise, a HTTP PUT request is issued.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
func (es *Connection) Index(
func (conn *Connection) Index(
index, docType, id string,
params map[string]string,
body interface{},
Expand All @@ -148,11 +148,11 @@ func (es *Connection) Index(
if id == "" {
method = "POST"
}
return withQueryResult(es.apiCall(method, index, docType, id, "", params, body))
return withQueryResult(conn.apiCall(method, index, docType, id, "", params, body))
}

// Ingest pushes a pipeline of updates.
func (es *Connection) Ingest(
func (conn *Connection) Ingest(
index, docType, pipeline, id string,
params map[string]string,
body interface{},
Expand All @@ -161,39 +161,39 @@ func (es *Connection) Ingest(
if id == "" {
method = "POST"
}
return withQueryResult(es.apiCall(method, index, docType, id, pipeline, params, body))
return withQueryResult(conn.apiCall(method, index, docType, id, pipeline, params, body))
}

// Refresh an index. Call this after doing inserts or creating/deleting
// indexes in unit tests.
func (es *Connection) Refresh(index string) (int, *QueryResult, error) {
return withQueryResult(es.apiCall("POST", index, "", "_refresh", "", nil, nil))
func (conn *Connection) Refresh(index string) (int, *QueryResult, error) {
return withQueryResult(conn.apiCall("POST", index, "", "_refresh", "", nil, nil))
}

// CreateIndex creates a new index, optionally with settings and mappings passed in
// the body.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
func (es *Connection) CreateIndex(index string, body interface{}) (int, *QueryResult, error) {
return withQueryResult(es.apiCall("PUT", index, "", "", "", nil, body))
func (conn *Connection) CreateIndex(index string, body interface{}) (int, *QueryResult, error) {
return withQueryResult(conn.apiCall("PUT", index, "", "", "", nil, body))
}

// IndexExists checks if an index exists.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-exists.html
func (es *Connection) IndexExists(index string) (int, error) {
status, _, err := es.apiCall("HEAD", index, "", "", "", nil, nil)
func (conn *Connection) IndexExists(index string) (int, error) {
status, _, err := conn.apiCall("HEAD", index, "", "", "", nil, nil)
return status, err
}

// Delete deletes a typed JSON document from a specific index based on its id.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html
func (es *Connection) Delete(index string, docType string, id string, params map[string]string) (int, *QueryResult, error) {
return withQueryResult(es.apiCall("DELETE", index, docType, id, "", params, nil))
func (conn *Connection) Delete(index string, docType string, id string, params map[string]string) (int, *QueryResult, error) {
return withQueryResult(conn.apiCall("DELETE", index, docType, id, "", params, nil))
}

// PipelineExists checks if a pipeline with name id already exists.
// Using: https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html
func (es *Connection) PipelineExists(id string) (bool, error) {
status, _, err := es.apiCall("GET", "_ingest", "pipeline", id, "", nil, nil)
func (conn *Connection) PipelineExists(id string) (bool, error) {
status, _, err := conn.apiCall("GET", "_ingest", "pipeline", id, "", nil, nil)
if status == 404 {
return false, nil
}
Expand All @@ -202,41 +202,41 @@ func (es *Connection) PipelineExists(id string) (bool, error) {

// CreatePipeline create a new ingest pipeline with name id.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html
func (es *Connection) CreatePipeline(
func (conn *Connection) CreatePipeline(
id string,
params map[string]string,
body interface{},
) (int, *QueryResult, error) {
return withQueryResult(es.apiCall("PUT", "_ingest", "pipeline", id, "", params, body))
return withQueryResult(conn.apiCall("PUT", "_ingest", "pipeline", id, "", params, body))
}

// DeletePipeline deletes an ingest pipeline by id.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html
func (es *Connection) DeletePipeline(
func (conn *Connection) DeletePipeline(
id string,
params map[string]string,
) (int, *QueryResult, error) {
return withQueryResult(es.apiCall("DELETE", "_ingest", "pipeline", id, "", params, nil))
return withQueryResult(conn.apiCall("DELETE", "_ingest", "pipeline", id, "", params, nil))
}

// SearchURI executes a search request using a URI by providing request parameters.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
func (es *Connection) SearchURI(index string, docType string, params map[string]string) (int, *SearchResults, error) {
return es.SearchURIWithBody(index, docType, params, nil)
func (conn *Connection) SearchURI(index string, docType string, params map[string]string) (int, *SearchResults, error) {
return conn.SearchURIWithBody(index, docType, params, nil)
}

// SearchURIWithBody executes a search request using a URI by providing request
// parameters and a request body.
func (es *Connection) SearchURIWithBody(
func (conn *Connection) SearchURIWithBody(
index string,
docType string,
params map[string]string,
body interface{},
) (int, *SearchResults, error) {
if !es.version.LessThan(&version.V{Major: 8}) {
if !conn.version.LessThan(&version.V{Major: 8}) {
docType = ""
}
status, resp, err := es.apiCall("GET", index, docType, "_search", "", params, body)
status, resp, err := conn.apiCall("GET", index, docType, "_search", "", params, body)
if err != nil {
return status, nil, err
}
Expand All @@ -245,19 +245,19 @@ func (es *Connection) SearchURIWithBody(
}

// CountSearchURI counts the results for a search request.
func (es *Connection) CountSearchURI(
func (conn *Connection) CountSearchURI(
index string, docType string,
params map[string]string,
) (int, *CountResults, error) {
status, resp, err := es.apiCall("GET", index, docType, "_count", "", params, nil)
status, resp, err := conn.apiCall("GET", index, docType, "_count", "", params, nil)
if err != nil {
return status, nil, err
}
result, err := readCountResult(resp)
return status, result, err
}

func (es *Connection) apiCall(
func (conn *Connection) apiCall(
method, index, docType, id, pipeline string,
params map[string]string,
body interface{},
Expand All @@ -266,5 +266,5 @@ func (es *Connection) apiCall(
if err != nil {
return 0, nil, err
}
return es.Request(method, path, pipeline, params, body)
return conn.Request(method, path, pipeline, params, body)
}
4 changes: 2 additions & 2 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"net/http"
"strings"

"go.elastic.co/apm/module/apmhttp/v2"
apmHttpV2 "go.elastic.co/apm/module/apmhttp/v2"
"go.elastic.co/apm/v2"

"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -88,7 +88,7 @@ func (conn *Connection) Bulk(
apm.CaptureError(ctx, err).Send()
return 0, nil, err
}
requ.requ = apmhttp.RequestWithContext(ctx, requ.requ)
requ.requ = apmHttpV2.RequestWithContext(ctx, requ.requ)

return conn.sendBulkRequest(requ)
}
Expand Down
12 changes: 6 additions & 6 deletions libbeat/esleg/eslegclient/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
)

func TestOneHostSuccessResp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

setupErr := logp.TestingSetup(logp.WithSelectors("elasticsearch"))
require.NoError(t, setupErr)
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
expectedResp := []byte(`{"took":7,"errors":false,"items":[]}`)

Expand Down Expand Up @@ -73,8 +73,8 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
}

func TestOneHost500Resp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

setupErr := logp.TestingSetup(logp.WithSelectors("elasticsearch"))
require.NoError(t, setupErr)
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

ops := []map[string]interface{}{
Expand Down Expand Up @@ -113,8 +113,8 @@ func TestOneHost500Resp_Bulk(t *testing.T) {
}

func TestOneHost503Resp_Bulk(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("elasticsearch"))

setupErr := logp.TestingSetup(logp.WithSelectors("elasticsearch"))
require.NoError(t, setupErr)
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())

ops := []map[string]interface{}{
Expand Down
20 changes: 14 additions & 6 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package eslegclient

import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
Expand All @@ -48,7 +48,8 @@ type esHTTPClient interface {
CloseIdleConnections()
}

// Connection manages the connection for a given client.
// Connection manages the connection for a given client. Each connection is not-thread-safe and should not be shared
// between 2 different goroutines.
type Connection struct {
ConnectionSettings

Expand All @@ -58,6 +59,7 @@ type Connection struct {
apiKeyAuthHeader string // Authorization HTTP request header with base64-encoded API key
version libversion.V
log *logp.Logger
responseBuffer *bytes.Buffer
}

// ConnectionSettings are the settings needed for a Connection
Expand Down Expand Up @@ -152,14 +154,15 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
if err != nil {
return nil, err
}
logp.Info("kerberos client created")
logger.Info("kerberos client created")
}

conn := Connection{
ConnectionSettings: s,
HTTP: esClient,
Encoder: encoder,
log: logger,
responseBuffer: bytes.NewBuffer(nil),
}

if s.APIKey != "" {
Expand All @@ -173,6 +176,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
// configuration. It accepts the same configuration parameters as the Elasticsearch
// output, except for the output specific configuration options. If multiple hosts
// are defined in the configuration, a client is returned for each of them.
// The returned Connection is a non-thread-safe connection.
func NewClients(cfg *cfg.C, beatname string) ([]Connection, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
Expand Down Expand Up @@ -220,6 +224,7 @@ func NewClients(cfg *cfg.C, beatname string) ([]Connection, error) {
return clients, nil
}

// NewConnectedClient returns a non-thread-safe connection. Make sure for each goroutine you initialize a new connection.
func NewConnectedClient(cfg *cfg.C, beatname string) (*Connection, error) {
clients, err := NewClients(cfg, beatname)
if err != nil {
Expand Down Expand Up @@ -418,6 +423,8 @@ func (conn *Connection) LoadJSON(path string, json map[string]interface{}) ([]by
return body, nil
}

// execHTTPRequest executes the http request and consumes the response in a non-thread-safe way.
// The return is a triple of status code, response as byte array, error if the request produced any error.
func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error) {
req.Header.Add("Accept", "application/json")

Expand Down Expand Up @@ -452,17 +459,18 @@ func (conn *Connection) execHTTPRequest(req *http.Request) (int, []byte, error)
defer closing(resp.Body, conn.log)

status := resp.StatusCode
obj, err := ioutil.ReadAll(resp.Body)
conn.responseBuffer.Reset()
_, err = io.Copy(conn.responseBuffer, resp.Body)
if err != nil {
return status, nil, err
}

if status >= 300 {
// add the response body with the error returned by Elasticsearch
err = fmt.Errorf("%v: %s", resp.Status, obj)
err = fmt.Errorf("%v: %s", resp.Status, conn.responseBuffer.Bytes())
}

return status, obj, err
return status, conn.responseBuffer.Bytes(), err
}

func closing(c io.Closer, logger *logp.Logger) {
Expand Down
57 changes: 55 additions & 2 deletions libbeat/esleg/eslegclient/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package eslegclient
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"net/http"
"testing"
Expand All @@ -41,7 +42,7 @@ func TestAPIKeyEncoding(t *testing.T) {
httpClient := newMockClient()
conn.HTTP = httpClient

req, err := http.NewRequest("GET", "http://fakehost/some/path", nil)
req, err := http.NewRequestWithContext(context.Background(), "GET", "http://fakehost/some/path", nil)
require.NoError(t, err)

_, _, err = conn.execHTTPRequest(req)
Expand Down Expand Up @@ -97,11 +98,63 @@ func TestHeaders(t *testing.T) {
httpClient := newMockClient()
conn.HTTP = httpClient

req, err := http.NewRequest("GET", "http://fakehost/some/path", nil)
req, err := http.NewRequestWithContext(context.Background(), "GET", "http://fakehost/some/path", nil)
require.NoError(t, err)
_, _, err = conn.execHTTPRequest(req)
require.NoError(t, err)

require.Equal(t, req.Header, http.Header(td.expected))
}
}

func BenchmarkExecHTTPRequest(b *testing.B) {
for _, td := range []struct {
input map[string]string
expected map[string][]string
}{
{
input: map[string]string{
"Accept": "application/vnd.elasticsearch+json;compatible-with=7",
"Content-Type": "application/vnd.elasticsearch+json;compatible-with=7",
productorigin.Header: "elastic-product",
"X-My-Header": "true",
},
expected: map[string][]string{
"Accept": {"application/vnd.elasticsearch+json;compatible-with=7"},
"Content-Type": {"application/vnd.elasticsearch+json;compatible-with=7"},
productorigin.Header: {"elastic-product"},
"X-My-Header": {"true"},
},
},
{
input: map[string]string{
"X-My-Header": "true",
},
expected: map[string][]string{
"Accept": {"application/json"},
productorigin.Header: {productorigin.Beats},
"X-My-Header": {"true"},
},
},
} {
conn, err := NewConnection(ConnectionSettings{
Headers: td.input,
})
require.NoError(b, err)

httpClient := newMockClient()
conn.HTTP = httpClient

var bb []byte
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
req, err := http.NewRequestWithContext(context.Background(), "GET", "http://fakehost/some/path", nil)
require.NoError(b, err)
_, bb, err = conn.execHTTPRequest(req)
require.NoError(b, err)
require.Equal(b, req.Header, http.Header(td.expected))
require.NotEmpty(b, bb)
}
}
}
3 changes: 2 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event)
return nil, nil
}

status, result, sendErr := client.conn.Bulk(ctx, "", "", nil, bulkItems)
params := map[string]string{"filter_path": "errors,items.*.error,items.*.status"}
status, result, sendErr := client.conn.Bulk(ctx, "", "", params, bulkItems)

if sendErr != nil {
if status == http.StatusRequestEntityTooLarge {
Expand Down
Loading

0 comments on commit 1a7e9aa

Please sign in to comment.