diff --git a/CHANGELOG.md b/CHANGELOG.md index 432341f..84bc410 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.1.8 + - Fix a thread safety issue when using this filter with multiple workers on heavy load, we now create an elasticsearch client for every LogStash worker. #76 + ## 3.1.6 - Fix some documentation issues diff --git a/lib/logstash/filters/elasticsearch.rb b/lib/logstash/filters/elasticsearch.rb index 7ca7452..f5cad83 100644 --- a/lib/logstash/filters/elasticsearch.rb +++ b/lib/logstash/filters/elasticsearch.rb @@ -3,6 +3,7 @@ require "logstash/namespace" require_relative "elasticsearch/client" require "logstash/json" +java_import "java.util.concurrent.ConcurrentHashMap" # .Compatibility Note # [NOTE] @@ -125,14 +126,10 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base # Tags the event on failure to look up geo information. This can be used in later analysis. config :tag_on_failure, :validate => :array, :default => ["_elasticsearch_lookup_failure"] + attr_reader :clients_pool + def register - options = { - :ssl => @ssl, - :hosts => @hosts, - :ca_file => @ca_file, - :logger => @logger - } - @client = LogStash::Filters::ElasticsearchClient.new(@user, @password, options) + @clients_pool = java.util.concurrent.ConcurrentHashMap.new #Load query if it exists if @query_template @@ -162,7 +159,7 @@ def filter(event) @logger.debug("Querying elasticsearch for lookup", :params => params) - results = @client.search(params) + results = get_client.search(params) @fields.each do |old_key, new_key| if !results['hits']['hits'].empty? set = [] @@ -178,4 +175,22 @@ def filter(event) end filter_matched(event) end # def filter + + private + def client_options + { + :ssl => @ssl, + :hosts => @hosts, + :ca_file => @ca_file, + :logger => @logger + } + end + + def new_client + LogStash::Filters::ElasticsearchClient.new(@user, @password, client_options) + end + + def get_client + @clients_pool.computeIfAbsent(Thread.current, lambda { |x| new_client }) + end end #class LogStash::Filters::Elasticsearch diff --git a/lib/logstash/filters/elasticsearch/client.rb b/lib/logstash/filters/elasticsearch/client.rb index d9f7708..2d33f61 100644 --- a/lib/logstash/filters/elasticsearch/client.rb +++ b/lib/logstash/filters/elasticsearch/client.rb @@ -24,7 +24,7 @@ def initialize(user, password, options={}) # set ca_file even if ssl isn't on, since the host can be an https url transport_options[:ssl] = { ca_file: options[:ca_file] } if options[:ca_file] - @logger.info("New ElasticSearch filter", :hosts => hosts) + @logger.info("New ElasticSearch filter client", :hosts => hosts) @client = ::Elasticsearch::Client.new(hosts: hosts, transport_options: transport_options) end diff --git a/logstash-filter-elasticsearch.gemspec b/logstash-filter-elasticsearch.gemspec index d5de80f..6ab63b6 100644 --- a/logstash-filter-elasticsearch.gemspec +++ b/logstash-filter-elasticsearch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-filter-elasticsearch' - s.version = '3.1.6' + s.version = '3.1.8' s.licenses = ['Apache License (2.0)'] s.summary = "Search elasticsearch for a previous log event and copy some fields from it into the current event" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/filters/elasticsearch_spec.rb b/spec/filters/elasticsearch_spec.rb index 30877b2..924e6b6 100644 --- a/spec/filters/elasticsearch_spec.rb +++ b/spec/filters/elasticsearch_spec.rb @@ -38,6 +38,25 @@ plugin.register end + after(:each) do + Thread.current[:filter_elasticsearch_client] = nil + end + + # Since the Elasticsearch Ruby client is not thread safe + # and under high load we can get error with the connection pool + # we have decided to create a new instance per worker thread which + # will be lazy created on the first call to `#filter` + # + # I am adding a simple test case for future changes + it "uses a different connection object per thread wait" do + expect(plugin.clients_pool.size).to eq(0) + + Thread.new { plugin.filter(event) }.join + Thread.new { plugin.filter(event) }.join + + expect(plugin.clients_pool.size).to eq(2) + end + it "should enhance the current event with new data" do plugin.filter(event) expect(event.get("code")).to eq(404)