Skip to content

Commit

Permalink
Merge branch 'main' into tls13
Browse files Browse the repository at this point in the history
* main:
  Fix: hosts => "es_host:port" regression (when ssl => true) (logstash-plugins#156)
  • Loading branch information
kares committed Mar 14, 2022
2 parents 834887c + 57482ef commit 3414e5e
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 30 deletions.
7 changes: 6 additions & 1 deletion .ci/logstash-run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ set -ex

export PATH=$BUILD_DIR/gradle/bin:$PATH

CURL_OPTS="-k --tlsv1.2"

wait_for_es() {
echo "Waiting for elasticsearch to respond..."
es_url="http://elasticsearch:9200"
if [[ "$SECURE_INTEGRATION" == "true" ]]; then
es_url="https://elasticsearch:9200"
fi
count=120
while ! curl -u elastic:$ELASTIC_PASSWORD --silent $es_url && [[ $count -ne 0 ]]; do
while ! curl $CURL_OPTS -u elastic:$ELASTIC_PASSWORD --silent $es_url && [[ $count -ne 0 ]]; do
count=$(( $count - 1 ))
[[ $count -eq 0 ]] && return 1
sleep 1
Expand Down
8 changes: 5 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import:
env:
- INTEGRATION=false ELASTIC_STACK_VERSION=7.x
- INTEGRATION=false ELASTIC_STACK_VERSION=8.x
- INTEGRATION=false ELASTIC_STACK_VERSION=7.16.3 MANTICORE_VERSION=0.7.1 ELASTICSEARCH_VERSION=7.15.0
- INTEGRATION=true ELASTIC_STACK_VERSION=7.x
- INTEGRATION=true ELASTIC_STACK_VERSION=7.x ELASTIC_PASSWORD=password ELASTIC_SECURITY_ENABLED=true
- INTEGRATION=true ELASTIC_STACK_VERSION=7.14.0 ELASTIC_PASSWORD=password ELASTIC_SECURITY_ENABLED=true
- SECURE_INTEGRATION=true INTEGRATION=true ELASTIC_STACK_VERSION=7.x ELASTIC_PASSWORD=password ELASTIC_SECURITY_ENABLED=true
- SECURE_INTEGRATION=true INTEGRATION=true ELASTIC_STACK_VERSION=7.x ELASTIC_PASSWORD=password ELASTIC_SECURITY_ENABLED=true LOG_LEVEL=info
- SECURE_INTEGRATION=true INTEGRATION=true ELASTIC_STACK_VERSION=7.x ELASTIC_PASSWORD=password ELASTIC_SECURITY_ENABLED=true ES_SSL_SUPPORTED_PROTOCOLS=TLSv1.3
- INTEGRATION=true ELASTIC_STACK_VERSION=7.x SNAPSHOT=true
- INTEGRATION=true ELASTIC_STACK_VERSION=8.x SNAPSHOT=true
- SECURE_INTEGRATION=true INTEGRATION=true ELASTIC_STACK_VERSION=7.14.2 MANTICORE_VERSION=0.7.1 ELASTICSEARCH_VERSION=7.14.1 ELASTIC_SECURITY_ENABLED=true LOG_LEVEL=info
- INTEGRATION=true ELASTIC_STACK_VERSION=7.x SNAPSHOT=true LOG_LEVEL=info
- INTEGRATION=true ELASTIC_STACK_VERSION=8.x SNAPSHOT=true LOG_LEVEL=info
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.11.1
- Fix: hosts => "es_host:port" regression [#156](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/156)

## 3.11.0
- Feat: update Elasticsearch client to 7.14.0 [#150](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/150)

Expand Down
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ if Dir.exist?(logstash_path) && use_logstash_source
gem 'logstash-core', :path => "#{logstash_path}/logstash-core"
gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api"
end

gem 'manticore', ENV['MANTICORE_VERSION'] if ENV['MANTICORE_VERSION']
gem 'elasticsearch', ENV['ELASTICSEARCH_VERSION'] if ENV['ELASTICSEARCH_VERSION']
13 changes: 12 additions & 1 deletion lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def initialize(logger, hosts, options = {})
logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')
transport_options[:proxy] = proxy.to_s if proxy && !proxy.eql?('')

hosts = hosts.map { |host| { host: host, scheme: 'https' } } if ssl
hosts = setup_hosts(hosts, ssl)
# set ca_file even if ssl isn't on, since the host can be an https url
ssl_options = { ssl: true, ca_file: options[:ca_file] } if options[:ca_file]
ssl_options ||= {}
Expand All @@ -41,6 +41,17 @@ def search(params)

private

def setup_hosts(hosts, ssl)
hosts.map do |h|
if h.start_with?('http:/', 'https:/')
h
else
host, port = h.split(':')
{ host: host, port: port, scheme: (ssl ? 'https' : 'http') }
end
end
end

def setup_basic_auth(user, password)
return {} unless user && password && password.value

Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.11.0'
s.version = '3.11.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
32 changes: 20 additions & 12 deletions spec/es_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,26 @@ def self.get_host_port
end
end

def self.get_client(credentials)
require 'elasticsearch/transport/transport/http/faraday' # supports user/password options
host, port = get_host_port.split(':')
host_opts = credentials.inject({}) { |h, (k, v)| h[k.to_sym] = v; h } # user: _, password: _
host_opts.merge! host: host, port: port, scheme: 'http'
Elasticsearch::Client.new(hosts: [host_opts], transport_class: Elasticsearch::Transport::Transport::HTTP::Faraday)
def self.curl_and_get_json_response(url, method: :get, args: nil); require 'open3'
cmd = "curl -s -v --show-error #{args} -X #{method.to_s.upcase} -k #{url}"
begin
out, err, status = Open3.capture3(cmd)
rescue Errno::ENOENT
fail "curl not available, make sure curl binary is installed and available on $PATH"
end

if status.success?
http_status = err.match(/< HTTP\/1.1 (.*?)/)[1] || '0' # < HTTP/1.1 200 OK\r\n
if http_status.strip[0].to_i > 2
warn out
fail "#{cmd.inspect} unexpected response: #{http_status}\n\n#{err}"
end

LogStash::Json.load(out)
else
warn out
fail "#{cmd.inspect} process failed: #{status}\n\n#{err}"
end
end

def self.doc_type
Expand All @@ -25,12 +39,6 @@ def self.doc_type
end
end

def self.index_doc(es, params)
type = doc_type
params[:type] = doc_type unless type.nil?
es.index(params)
end

def self.es_version
ENV['ES_VERSION'] || ENV['ELASTIC_STACK_VERSION']
end
Expand Down
10 changes: 7 additions & 3 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

describe LogStash::Filters::Elasticsearch do

subject(:plugin) { described_class.new(config) }

let(:event) { LogStash::Event.new({}) }

context "registration" do

let(:plugin) { LogStash::Plugin.lookup("filter", "elasticsearch").new({}) }
Expand Down Expand Up @@ -53,8 +57,6 @@
"aggregation_fields" => { "bytes_avg" => "bytes_avg_ls_field" }
}
end
let(:plugin) { described_class.new(config) }
let(:event) { LogStash::Event.new({}) }

let(:response) do
LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_1.json")))
Expand Down Expand Up @@ -569,7 +571,9 @@ def wait_receive_request
it "should set localhost:9200 as hosts" do
plugin.register
client = plugin.send(:get_client).client
expect( extract_transport(client).hosts ).to eql [{ :host => "localhost", :port => 9200, :protocol => "http"}]
hosts = extract_transport(client).hosts
expect( hosts.size ).to be 1
expect( hosts[0] ).to include(:host => "localhost", :port => 9200, :scheme => "http")
end
end

Expand Down
45 changes: 36 additions & 9 deletions spec/filters/integration/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,53 @@
describe LogStash::Filters::Elasticsearch, :integration => true do

ELASTIC_SECURITY_ENABLED = ENV['ELASTIC_SECURITY_ENABLED'].eql? 'true'
SECURE_INTEGRATION = ENV['SECURE_INTEGRATION'].eql? 'true'

let(:base_config) do
{
"index" => 'logs',
"hosts" => [ESHelper.get_host_port],
"hosts" => ["http#{SECURE_INTEGRATION ? 's' : nil}://#{ESHelper.get_host_port}"],
"query" => "response: 404",
"sort" => "response",
"fields" => [ ["response", "code"] ],
}
end

let(:credentials) do
{ 'user' => 'elastic', 'password' => ENV['ELASTIC_PASSWORD'] }
if SECURE_INTEGRATION
{ 'user' => 'tests', 'password' => 'Tests123' } # added user
else
{ 'user' => 'elastic', 'password' => ENV['ELASTIC_PASSWORD'] }
end
end

let(:config) do
ELASTIC_SECURITY_ENABLED ? base_config.merge(credentials) : base_config
config = ELASTIC_SECURITY_ENABLED ? base_config.merge(credentials) : base_config
config = { 'ca_file' => ca_path }.merge(config) if SECURE_INTEGRATION
config
end

let(:ca_path) do
File.expand_path('../fixtures/test_certs/ca.crt', File.dirname(__FILE__))
end

let(:plugin) { described_class.new(config) }
let(:event) { LogStash::Event.new({}) }

before(:each) do
@es = ESHelper.get_client(ELASTIC_SECURITY_ENABLED ? credentials : {})
# Delete all templates first.
es_url = ESHelper.get_host_port
es_url = SECURE_INTEGRATION ? "https://#{es_url}" : "http://#{es_url}"
args = ELASTIC_SECURITY_ENABLED ? "-u #{credentials['user']}:#{credentials['password']}" : ''
# Clean ES of data before we start.
@es.indices.delete_template(:name => "*")
# Delete all templates first.
ESHelper.curl_and_get_json_response "#{es_url}/_index_template/*", method: 'DELETE', args: args
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
ESHelper.curl_and_get_json_response "#{es_url}/_index/*", method: 'DELETE', args: args
doc_args = "#{args} -H 'Content-Type: application/json' -d '{\"response\": 404, \"this\":\"that\"}'"
10.times do
ESHelper.index_doc(@es, :index => 'logs', :body => { :response => 404, :this => 'that'})
ESHelper.curl_and_get_json_response "#{es_url}/logs/_doc", method: 'POST', args: doc_args
end
@es.indices.refresh
ESHelper.curl_and_get_json_response "#{es_url}/_refresh", method: 'POST', args: args
end

it "should enhance the current event with new data" do
Expand Down Expand Up @@ -75,4 +89,17 @@

end if ELASTIC_SECURITY_ENABLED

context 'setting host:port (and ssl)' do # reproduces GH-155

let(:config) do
super().merge "hosts" => [ESHelper.get_host_port], "ssl" => SECURE_INTEGRATION
end

it "works" do
expect { plugin.register }.to_not raise_error
plugin.filter(event)
end

end

end

0 comments on commit 3414e5e

Please sign in to comment.