Skip to content

Commit

Permalink
[Logstash plugins] fixed config validation bug in output; added tests…
Browse files Browse the repository at this point in the history
… and retries for input; some doc tweaks
  • Loading branch information
radu-gheorghe committed Dec 19, 2024
1 parent 4810e0b commit d4ffaab
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 22 deletions.
8 changes: 8 additions & 0 deletions integration/logstash-plugins/logstash-input-vespa/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@ if Dir.exist?(logstash_path) && use_logstash_source
gem 'logstash-core', :path => "#{logstash_path}/logstash-core"
gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api"
end

group :development, :test do
gem 'logstash-devutils'
gem 'webmock'
gem 'rspec'
end

gem 'minitar', '~> 1.0'
11 changes: 11 additions & 0 deletions integration/logstash-plugins/logstash-input-vespa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ Download and unpack/install Logstash, then:
bin/logstash-plugin install logstash-input-vespa
```

## Development

To run tests, you'll need to clone the Logstash branch you're developing the plugin for. See https://github.com/elastic/logstash

Then:
```
export LOGSTASH_PATH=/path/to/logstash/repository/clone
export LOGSTASH_SOURCE=1
bundle exec rspec
```

## Usage

Minimal Logstash config example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ class LogStash::Inputs::Vespa < LogStash::Inputs::Base
# The cluster parameter to use in the request.
config :cluster, :validate => :string, :required => true

# Maximum number of retries for failed HTTP requests
config :max_retries, :validate => :number, :default => 3

# Delay in seconds for the first retry attempt. We double this delay for each subsequent retry.
config :retry_delay, :validate => :number, :default => 1

# Path to the client certificate file for mTLS.
config :client_cert, :validate => :path

Expand Down Expand Up @@ -88,6 +94,9 @@ def run(queue)
uri.query = URI.encode_www_form(@uri_params)
continuation = nil

retries = 0
current_delay = @retry_delay

loop do
response = fetch_documents_from_vespa(uri)
# response should look like:
Expand Down Expand Up @@ -123,28 +132,58 @@ def run(queue)
end

else
@logger.error("Failed to fetch documents from Vespa", :request => uri.to_s,
# Handle retriable status codes (5xx)
if (500..599).include?(response.code.to_i) && retries < (@max_retries - 1)
retries += 1
@logger.warn("Retriable error from Vespa, retrying",
:response_code => response.code,
:retry_count => retries,
:max_retries => @max_retries,
:next_retry_delay => current_delay)
sleep(current_delay)
current_delay *= 2
else
@logger.error("Failed to fetch documents from Vespa", :request => uri.to_s,
:response_code => response.code, :response_message => response.message)
break # TODO retry? Only on certain codes?
break
end
end # if response.is_a?(Net::HTTPSuccess)

end # loop do
end # def run

def fetch_documents_from_vespa(uri)
http = Net::HTTP.new(uri.host, uri.port)
if uri.scheme == "https"
http.use_ssl = true
http.cert = @cert
http.key = @key
http.verify_mode = OpenSSL::SSL::VERIFY_PEER
retries = 0
current_delay = @retry_delay # Start with the initial delay

begin
http = Net::HTTP.new(uri.host, uri.port)
if uri.scheme == "https"
http.use_ssl = true
http.cert = @cert
http.key = @key
http.verify_mode = OpenSSL::SSL::VERIFY_PEER
end

request = Net::HTTP::Get.new(uri.request_uri)
http.request(request)
rescue => e
retries += 1
if retries < @max_retries
@logger.warn("Failed to make HTTP request to Vespa, retrying",
:error => e.message,
:retry_count => retries,
:max_retries => @max_retries,
:next_retry_delay => current_delay)
sleep(current_delay)
current_delay *= 2 # Double the delay for next retry
retry
else
@logger.error("Failed to make HTTP request to Vespa after #{@max_retries} attempts",
:error => e.message)
nil
end
end

request = Net::HTTP::Get.new(uri.request_uri)
http.request(request)
rescue => e
@logger.error("Failed to make HTTP request to Vespa", :error => e.message)
nil
end # def fetch_documents_from_vespa

def parse_response(response)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-vespa'
s.version = '0.2.0'
s.version = '0.3.0'
s.licenses = ['Apache-2.0']
s.summary = "Logstash input plugin reading from Vespa"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -18,8 +18,13 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "input" }

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency "logstash-core", ">= 8.0.0"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22'
s.add_runtime_dependency 'logstash-codec-json'

# Development dependencies
s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'rspec', '~> 3.0'
s.add_development_dependency 'webmock', '~> 3.0'
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/vespa"
require "webmock/rspec"

describe LogStash::Inputs::Vespa do
let(:config) do
{
"cluster" => "test-cluster",
"vespa_url" => "http://localhost:8080",
"retry_delay" => 0.1, # Small delay for faster tests
"max_retries" => 3
}
end

let(:plugin) { described_class.new(config) }
let(:queue) { Queue.new }
let(:base_uri) { "#{config['vespa_url']}/document/v1/" }
let(:uri_params) { "cluster=test-cluster&wantedDocumentCount=100&concurrency=1&timeout=180" }

before do
plugin.register
allow(plugin).to receive(:sleep) # Mock sleep to speed up tests
end

describe "#run" do
context "when server returns retriable errors" do
it "retries on 503 Service Unavailable" do
stub_request(:get, "#{base_uri}?#{uri_params}")
.to_return(
{ status: 503, body: "Service Unavailable" },
{ status: 503, body: "Service Unavailable" },
{ status: 200, body: '{"documents": [], "documentCount": 0}' }
)

plugin.run(queue)
expect(a_request(:get, "#{base_uri}?#{uri_params}")).to have_been_made.times(3)
end

it "retries on 502 Bad Gateway" do
stub_request(:get, "#{base_uri}?#{uri_params}")
.to_return(
{ status: 502, body: "Bad Gateway" },
{ status: 200, body: '{"documents": [], "documentCount": 0}' }
)

plugin.run(queue)
expect(a_request(:get, "#{base_uri}?#{uri_params}")).to have_been_made.times(2)
end

it "stops after max_retries attempts" do
stub_request(:get, "#{base_uri}?#{uri_params}")
.to_return(status: 503, body: "Service Unavailable").times(4)

plugin.run(queue)
expect(a_request(:get, "#{base_uri}?#{uri_params}")).to have_been_made.times(config["max_retries"])
end
end

context "when server returns non-retriable errors" do
it "does not retry on 404 Not Found" do
stub_request(:get, "#{base_uri}?#{uri_params}")
.to_return(status: 404, body: "Not Found")

plugin.run(queue)
expect(a_request(:get, "#{base_uri}?#{uri_params}")).to have_been_made.times(1)
end

it "does not retry on 401 Unauthorized" do
stub_request(:get, "#{base_uri}?#{uri_params}")
.to_return(status: 401, body: "Unauthorized")

plugin.run(queue)
expect(a_request(:get, "#{base_uri}?#{uri_params}")).to have_been_made.times(1)
end
end

context "when server returns successful responses" do
it "processes documents and follows continuation tokens" do

# First response with continuation token
first_response = {
"pathId" => "/document/v1/",
"documents" => [
{"id" => "id:namespace:doctype::doc1", "fields" => {"field1" => "value1", "field2" => 7.0}},
{"id" => "id:namespace:doctype::doc2", "fields" => {"field1" => "value2", "field2" => 8.0}}
],
"documentCount" => 2,
"continuation" => "AAAAAA"
}

# Second response without continuation (last page)
last_response = {
"pathId" => "/document/v1/",
"documents" => [
{"id" => "id:namespace:doctype::doc3", "fields" => {"field1" => "value3", "field2" => 9.0}}
],
"documentCount" => 1
}

# Stub the requests
stub_request(:get, "#{base_uri}?#{uri_params}")
.to_return(status: 200, body: first_response.to_json)

stub_request(:get, "#{base_uri}?#{uri_params}&continuation=AAAAAA")
.to_return(status: 200, body: last_response.to_json)

plugin.run(queue)

expect(queue.size).to eq(3) # Total of 3 documents
expect(a_request(:get, "#{base_uri}?#{uri_params}")).to have_been_made.once
expect(a_request(:get, "#{base_uri}?#{uri_params}&continuation=AAAAAA")).to have_been_made.once
end
end
end
end
4 changes: 3 additions & 1 deletion integration/logstash-plugins/logstash-output-vespa/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ filter {
columns => ["id", "description", ...]
}
# remove fields that we don't need. Here you can do a lot more processing
# remove fields we don't need
# NOTE: the fields below are added by Logstash by default. You probably *need* this block
# otherwise Vespa will reject documents complaining that e.g. @timestamp is an unknown field
mutate {
remove_field => ["@timestamp", "@version", "event", "host", "log", "message"]
}
Expand Down
2 changes: 1 addition & 1 deletion integration/logstash-plugins/logstash-output-vespa/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.0
0.5.1
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
7 changes: 5 additions & 2 deletions integration/logstash-plugins/logstash-output-vespa/gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#

##############################################################################
#
Expand Down Expand Up @@ -55,7 +57,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
Expand Down Expand Up @@ -84,7 +86,8 @@ done
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
' "$PWD" ) || exit

# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@rem SPDX-License-Identifier: Apache-2.0
@rem

@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# encoding: utf-8

require 'jar_dependencies'
require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.5.0')
require_jar('org.logstashplugins', 'logstash-output-vespa_feed', '0.5.1')
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public void awaitStop() throws InterruptedException {
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return List.of(VESPA_URL, CLIENT_CERT, CLIENT_KEY, OPERATION, CREATE, NAMESPACE, DOCUMENT_TYPE, ID_FIELD,
MAX_CONNECTIONS, MAX_STREAMS, MAX_RETRIES, OPERATION_TIMEOUT);
MAX_CONNECTIONS, MAX_STREAMS, MAX_RETRIES, OPERATION_TIMEOUT, GRACE_PERIOD, DOOM_PERIOD);
}

@Override
Expand Down

0 comments on commit d4ffaab

Please sign in to comment.