Skip to content

Commit

Permalink
Fix agentcfg cache renewal scroll (#13958) (#13980)
Browse files Browse the repository at this point in the history
APM Agents configuration cache appears to be broken, resulting frequent invalid requests made by APM Server to Elasticsearch cluster. Fix ScrollID used for retrieval of APM Agents configuration.

(cherry picked from commit ecffa8e)

Co-authored-by: up2neck <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
mergify[bot] and up2neck authored Sep 9, 2024
1 parent 61b27f4 commit 1db3087
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions internal/agentcfg/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,40 +251,38 @@ func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) {

func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) {
var result cacheResult
var err error
var resp *esapi.Response

if scrollID == "" {
resp, err := esapi.SearchRequest{
switch scrollID {
case "":
resp, err = esapi.SearchRequest{
Index: []string{ElasticsearchIndexName},
Size: &f.searchSize,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
if err != nil {
return result, err
}
defer resp.Body.Close()

if resp.StatusCode >= http.StatusBadRequest {
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
f.invalidESCfg.Store(true)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
f.logger.Debugf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
}
return result, json.NewDecoder(resp.Body).Decode(&result)
default:
resp, err = esapi.ScrollRequest{
ScrollID: scrollID,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
}

resp, err := esapi.ScrollRequest{
ScrollID: result.ScrollID,
Scroll: f.cacheDuration,
}.Do(ctx, f.client)
if err != nil {
return result, err
}
defer resp.Body.Close()

if resp.StatusCode >= http.StatusBadRequest {
// Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
f.invalidESCfg.Store(true)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err == nil {
f.logger.Debugf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))
}
return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode)
}
return result, json.NewDecoder(resp.Body).Decode(&result)
}

Expand Down

0 comments on commit 1db3087

Please sign in to comment.