Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/use thread locals #77

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.1.8
- Fix a thread safety issue when using this filter with multiple workers on heavy load, we now create an elasticsearch client for every LogStash worker. #76

## 3.1.6
- Fix some documentation issues

Expand Down
31 changes: 23 additions & 8 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/namespace"
require_relative "elasticsearch/client"
require "logstash/json"
java_import "java.util.concurrent.ConcurrentHashMap"

# .Compatibility Note
# [NOTE]
Expand Down Expand Up @@ -125,14 +126,10 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
# Tags the event on failure to look up geo information. This can be used in later analysis.
config :tag_on_failure, :validate => :array, :default => ["_elasticsearch_lookup_failure"]

attr_reader :clients_pool

def register
options = {
:ssl => @ssl,
:hosts => @hosts,
:ca_file => @ca_file,
:logger => @logger
}
@client = LogStash::Filters::ElasticsearchClient.new(@user, @password, options)
@clients_pool = java.util.concurrent.ConcurrentHashMap.new

#Load query if it exists
if @query_template
Expand Down Expand Up @@ -162,7 +159,7 @@ def filter(event)

@logger.debug("Querying elasticsearch for lookup", :params => params)

results = @client.search(params)
results = get_client.search(params)
@fields.each do |old_key, new_key|
if !results['hits']['hits'].empty?
set = []
Expand All @@ -178,4 +175,22 @@ def filter(event)
end
filter_matched(event)
end # def filter

private
def client_options
{
:ssl => @ssl,
:hosts => @hosts,
:ca_file => @ca_file,
:logger => @logger
}
end

def new_client
LogStash::Filters::ElasticsearchClient.new(@user, @password, client_options)
end

def get_client
@clients_pool.computeIfAbsent(Thread.current, lambda { |x| new_client })
end
end #class LogStash::Filters::Elasticsearch
2 changes: 1 addition & 1 deletion lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def initialize(user, password, options={})
# set ca_file even if ssl isn't on, since the host can be an https url
transport_options[:ssl] = { ca_file: options[:ca_file] } if options[:ca_file]

@logger.info("New ElasticSearch filter", :hosts => hosts)
@logger.info("New ElasticSearch filter client", :hosts => hosts)
@client = ::Elasticsearch::Client.new(hosts: hosts, transport_options: transport_options)
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.1.6'
s.version = '3.1.8'
s.licenses = ['Apache License (2.0)']
s.summary = "Search elasticsearch for a previous log event and copy some fields from it into the current event"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
19 changes: 19 additions & 0 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@
plugin.register
end

after(:each) do
Thread.current[:filter_elasticsearch_client] = nil
end

# Since the Elasticsearch Ruby client is not thread safe
# and under high load we can get error with the connection pool
# we have decided to create a new instance per worker thread which
# will be lazy created on the first call to `#filter`
#
# I am adding a simple test case for future changes
it "uses a different connection object per thread wait" do
expect(plugin.clients_pool.size).to eq(0)

Thread.new { plugin.filter(event) }.join
Thread.new { plugin.filter(event) }.join

expect(plugin.clients_pool.size).to eq(2)
end

it "should enhance the current event with new data" do
plugin.filter(event)
expect(event.get("code")).to eq(404)
Expand Down