diff --git a/internal/elasticsearch/client_test.go b/internal/elasticsearch/client_test.go index 4126c879031..301c7075e4c 100644 --- a/internal/elasticsearch/client_test.go +++ b/internal/elasticsearch/client_test.go @@ -18,6 +18,7 @@ package elasticsearch import ( + "bytes" "context" "fmt" "net/http" @@ -30,6 +31,7 @@ import ( apmVersion "github.com/elastic/apm-server/internal/version" esv8 "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" ) func TestClient(t *testing.T) { @@ -86,3 +88,92 @@ func TestClientCustomUserAgent(t *testing.T) { t.Fatal("timed out while waiting for request") } } + +func esMockHandler(responder http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Elastic-Product", "Elasticsearch") + + switch { + case r.Method == http.MethodPost && r.URL.Path == "/_bulk": + responder(w, r) + return + default: + http.Error(w, "unsupported request", 419) // Signal unexpected error + return + } + } +} + +func TestClientRetryableStatuses(t *testing.T) { + tests := []struct { + name string + responseStatusCode int + expectedStatusCode int + expectedRequestCount int + }{ + { + name: "retry 429 Too Many Requests", + responseStatusCode: http.StatusTooManyRequests, + expectedStatusCode: http.StatusOK, + expectedRequestCount: 2, + }, + { + name: "retry 502 Bad Gateway", + responseStatusCode: http.StatusBadGateway, + expectedStatusCode: http.StatusBadGateway, + expectedRequestCount: 1, + }, + { + name: "retry 503 Service Not Available", + responseStatusCode: http.StatusServiceUnavailable, + expectedStatusCode: http.StatusServiceUnavailable, + expectedRequestCount: 1, + }, + { + name: "retry 504 Gateway Timeout", + responseStatusCode: http.StatusGatewayTimeout, + expectedStatusCode: http.StatusGatewayTimeout, + expectedRequestCount: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + maxRetries := 2 + count := 0 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if count < maxRetries { + count += 1 + http.Error(w, "", tt.responseStatusCode) + return + } + + w.WriteHeader(http.StatusOK) + }) + + es := esMockHandler(handler) + srv := httptest.NewServer(&es) + defer srv.Close() + + c := Config{ + Username: "test", + Password: "foobar", + Backoff: BackoffConfig{ + Init: 0, + Max: 0, + }, + MaxRetries: maxRetries, + Hosts: []string{srv.URL}, + } + client, err := NewClient(&c) + require.NoError(t, err) + + var buf bytes.Buffer + var res *esapi.Response + res, err = client.Bulk(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + assert.Equal(t, tt.expectedStatusCode, res.StatusCode) + assert.Equal(t, tt.expectedRequestCount, count) + }) + } +}