Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve network error handling for batch jobs (HTTP retries) #216

Open
gmfmi opened this issue Nov 21, 2024 · 1 comment
Open

Improve network error handling for batch jobs (HTTP retries) #216

gmfmi opened this issue Nov 21, 2024 · 1 comment

Comments

@gmfmi
Copy link

gmfmi commented Nov 21, 2024

Context

In our project, we have to reprocess a large amount of Elastic data in a single batch. The elasticsearch input plugin might run for multiple hours to handle billions of events. But we are facing a problem: when a network issue occurs, the entire job is restarted (not only last HTTP request).

I guess this issue rarely occurs when Logstash is deployed in the Elasticsearch subnet but we got a cloud hybrid configuration, and it makes it impossible to use in production as of now.

The plugin configuration:

input {
 elasticsearch {
 hosts => "https://<cluster_id>.francecentral.azure.elastic-cloud.com:443"
 api_key => "<api_key>"
 ssl_enabled => "true"
 index => "<index_name>"
 search_api => "search_after"
 retries => 5
 scroll => "5m"
 response_type => "hits"
 size => 1000
 query => '{"query":{"bool":{"filter":[ ... ]}},"sort":[ ... ]}'
 }
}

Log sample of the restarting job when a network error occurred:

[2024-11-21T09:29:03,645][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress
[2024-11-21T09:29:03,806][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress
[2024-11-21T09:29:03,815][WARN ][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Attempt to search_after paginated search but failed. Sleeping for 0.02 {:fail_count=>1, :exception=>"<cluster_id>.francecentral.azure.elastic-cloud.com:443 failed to respond"}
[2024-11-21T09:29:03,835][INFO ][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query start
[2024-11-21T09:29:03,835][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress
[2024-11-21T09:29:04,222][DEBUG][logstash.inputs.elasticsearch.searchafter][main][9fee6d2baff37ecc70c364d3215ef2d3eab93bec9b08f68ed299cc50ed87e9b2] Query progress

Feature proposal

First, for future documentation readers, it would be nice to improve the retries section of the documentation to explain that it is at the "job" level and not "http request" level.

Then, adding a retry mechanism at the HTTP request level with an exponential backoff (or similar) would be a good option.

I had a quick look at the code base, I think we could add a wrapper around the next_page() function to handle the network error and implement the retries properly

Contribution

If it can help, we can contribute and develop this feature.

@gmfmi
Copy link
Author

gmfmi commented Nov 22, 2024

If that can help, I just patched the search_after section of the code by adding a retry logic. Here is the code of the next_page function that I modified:

def next_page(pit_id: , search_after: nil, slice_id: nil)
  options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
  logger.trace("search options", options)
  
  max_retries = 5        # Maximum number of retries
  initial_delay = 1      # Initial delay (1 second)
  retries = 0
  begin
    @client.search(options)
  rescue => err
    if retries < max_retries
      delay = initial_delay * (2 ** retries)
      retries += 1
      logger.warn("Retrying search request, attempt #{retries} of #{max_retries}, waiting #{delay}s. Details: #{err.message}")
      sleep(delay)
      retry
    else
      logger.error("Max retries reached. Failing with error: #{err.message}")
      raise
    end
  end
end

That works as expected but I didn't take the time to implement generic logic for both "search_after" and "scoll" methods. Also, I have not added any tests for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant