Skip to content

Commit

Permalink
Use the helper to filter out messages missing a timestamp...
Browse files Browse the repository at this point in the history
Note that if the index doesn't require a timestamp to resolve, events are
let through.

Closes logstash-plugins#739.
  • Loading branch information
Mathieu Martin committed May 25, 2018
1 parent 4f9bc4f commit 2492c0c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
10 changes: 8 additions & 2 deletions lib/logstash/outputs/elasticsearch/common.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "logstash/outputs/elasticsearch/template_manager"
require "logstash/outputs/elasticsearch/helpers"

module LogStash; module Outputs; class ElasticSearch;
module Common
Expand Down Expand Up @@ -35,7 +36,7 @@ def multi_receive(events)
until @template_installed.true?
sleep 1
end
retrying_submit(events.map {|e| event_action_tuple(e)})
retrying_submit(events.map {|e| event_action_tuple(e)}.compact)
end

def install_template_after_successful_connection
Expand All @@ -58,11 +59,16 @@ def successful_connection?
!!maximum_seen_major_version
end

# Convert the event into a 3-tuple of action, params, and event
# Convert the event into a 3-tuple of [action, params, and event]
# Returns nil if the event cannot be indexed
def event_action_tuple(event)

action = event.sprintf(@action)

if Helpers.predict_timestamp_issue_for(@index, event)
@logger.error "Cannot index event missing @timestamp to a timestamped index", index: @index, event: event.to_json
return nil
end
params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
Expand Down
51 changes: 51 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -555,4 +555,55 @@
end
end
end

context 'interpolating an index name' do
let(:event) { LogStash::Event.new }
let(:event_no_ts) { LogStash::Event.new.tap { |e| e.remove('@timestamp') } }

context 'when preparing a tuple' do

context 'when the index name is timestamped' do
it 'should prepare a tuple for the event if it has a timestamp' do
params = subject.event_action_tuple(event)[1]
expect(params[:_index]).to match(/logstash-\d+\.\d+\.\d+/)
end

it 'should not error if the event has no timestamp' do
expect do
expect(subject.event_action_tuple(event_no_ts)).to be_nil
end.not_to raise_error
end
end

context 'when the index name is not timestamped' do
let(:options) { super.merge('index' => 'logstash') }
it 'should prepare a tuple for the event if it has a timestamp' do
params = subject.event_action_tuple(event)[1]
expect(params[:_index]).to eq('logstash')
end

it 'should prepare a tuple for the event even if it has no timestamp' do
expect do
params = subject.event_action_tuple(event_no_ts)[1]
expect(params[:_index]).to eq('logstash')
end.not_to raise_error
end
end
end

context 'when submitting events with and without timestamps' do
let(:options) { { "manage_template" => false } }

context 'and index is timestamped' do
it 'should only submit events that have a timestamp' do
expect(subject.logger).to receive(:error).with(/Cannot/, hash_including(:index, :event))
expect(subject).to receive(:retrying_submit) { |args|
expect( args.map{ |(action, params, event)| event} ).to eq([event])
}
subject.multi_receive([event, event_no_ts])
end
end
end
end

end

0 comments on commit 2492c0c

Please sign in to comment.