From f47e91ec46a8144a462a0487163f229b7f6e1f70 Mon Sep 17 00:00:00 2001 From: Jordan Johnson-Doyle Date: Sat, 17 Nov 2018 19:57:55 +0000 Subject: [PATCH] Allow custom KCL config values to be passed --- README.md | 26 ++++++++++ lib/logstash/inputs/kinesis.rb | 56 ++++++++++++++++++--- spec/inputs/kinesis_spec.rb | 92 ++++++++++++++++++++++++++++++++-- 3 files changed, 161 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index f6b3087..ec9d6a7 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,32 @@ This are the properties you can configure and what are the default values: * **required**: false * **default value**: `"TRIM_HORIZON"` +### Additional KCL Settings + +Each configuration value defined in the KCL config files given below can be passed as snake_case, for example to set `initialLeaseTableReadCapacity` in `LeaseManagementConfig` to 30 the following configuration block could be used: `"lease_management_additional_settings" => { "initial_lease_table_read_capacity" => 30 }` + +* `checkpoint_additional_settings`: Configuration values to set in [CheckpointConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java). + * **required**: false + * **default value**: `{}` +* `coordinator_additional_settings`: Configuration values to set in [CoordinatorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java). + * **required**: false + * **default value**: `{}` +* `lease_management_additional_settings`: Configuration values to set in [LeaseManagementConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java). + * **required**: false + * **default value**: `{}` +* `lifecycle_additional_settings`: Configuration values to set in [LifecycleConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java). + * **required**: false + * **default value**: `{}` +* `metrics_additional_settings`: Configuration values to set in [MetricsConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java). + * **required**: false + * **default value**: `{}` +* `retrieval_additional_settings`: Configuration values to set in [RetrievalConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java). + * **required**: false + * **default value**: `{}` +* `processor_additional_settings`: Configuration values to set in [ProcessorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java). + * **required**: false + * **default value**: `{}` + ## Authentication This plugin uses the default AWS SDK auth chain, [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html), to determine which credentials the client will use, unless `profile` is set, in which case [ProfileCredentialsProvider](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/profile/ProfileCredentialsProvider.html) is used. diff --git a/lib/logstash/inputs/kinesis.rb b/lib/logstash/inputs/kinesis.rb index c49ed15..18eb23a 100644 --- a/lib/logstash/inputs/kinesis.rb +++ b/lib/logstash/inputs/kinesis.rb @@ -43,10 +43,14 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base attr_reader( :kcl_config, + :kcl_worker, + + :checkpoint_config, + :coordinator_config, + :lease_management_config, + :lifecycle_config, :metrics_config, :retrieval_config, - :lease_management_config, - :kcl_worker, ) # The application name used for the dynamodb coordination table. Must be @@ -72,6 +76,27 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base # Select initial_position_in_stream. Accepts TRIM_HORIZON or LATEST config :initial_position_in_stream, :validate => ["TRIM_HORIZON", "LATEST"], :default => "TRIM_HORIZON" + # Any additional arbitrary kcl options configurable in the CheckpointConfig + config :checkpoint_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the CoordinatorConfig + config :coordinator_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the LeaseManagementConfig + config :lease_management_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the LifecycleConfig + config :lifecycle_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the MetricsConfig + config :metrics_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the RetrievalConfig + config :retrieval_additional_settings, :validate => :hash, :default => {} + + # Any additional arbitrary kcl options configurable in the ProcessorConfig + config :processor_additional_settings, :validate => :hash, :default => {} + def initialize(params = {}) super(params) end @@ -112,13 +137,30 @@ def register worker_factory([]) ) + @checkpoint_config = send_additional_settings(@kcl_config.checkpoint_config, @checkpoint_additional_settings) + + @coordinator_config = send_additional_settings(@kcl_config.coordinator_config, @coordinator_additional_settings) + + @lifecycle_config = send_additional_settings(@kcl_config.lifecycle_config, @lifecycle_additional_settings) + @metrics_config = @kcl_config.metrics_config.metrics_factory(metrics_factory) + @metrics_config = send_additional_settings(@metrics_config, @metrics_additional_settings) @retrieval_config = @kcl_config.retrieval_config. initial_position_in_stream_extended(software.amazon.kinesis.common.InitialPositionInStreamExtended.new_initial_position(initial_position_in_stream)) + @retrieval_config = send_additional_settings(@retrieval_config, @retrieval_additional_settings) @lease_management_config = @kcl_config.lease_management_config. failover_time_millis(@checkpoint_interval_seconds * 1000 * 3) + @lease_management_config = send_additional_settings(@lease_management_config, @lease_management_additional_settings) + end + + def send_additional_settings(obj, options) + options.each do |key, value| + obj = obj.send(key, value) + end + + obj end def run(output_queue) @@ -128,14 +170,12 @@ def run(output_queue) def kcl_builder(output_queue) Scheduler.new( - @kcl_config.checkpoint_config, - @kcl_config.coordinator_config, + @checkpoint_config, + @coordinator_config, @lease_management_config, - @kcl_config.lifecycle_config, + @lifecycle_config, @metrics_config, - # checkpointing is done on processRecords so we need Kinesis to always call us so we don't lose this shard - # even when we're active - ProcessorConfig.new(worker_factory(output_queue)).call_process_records_even_for_empty_record_list(true), + send_additional_settings(ProcessorConfig.new(worker_factory(output_queue)), @processor_additional_settings), @retrieval_config ) end diff --git a/spec/inputs/kinesis_spec.rb b/spec/inputs/kinesis_spec.rb index 2e9d696..f941326 100644 --- a/spec/inputs/kinesis_spec.rb +++ b/spec/inputs/kinesis_spec.rb @@ -38,6 +38,64 @@ "initial_position_in_stream" => "LATEST" }} + # Config hash to test valid additional_settings + let(:config_with_valid_additional_settings) {{ + "application_name" => "my-processor", + "kinesis_stream_name" => "run-specs", + "codec" => codec, + "metrics" => metrics, + "checkpoint_interval_seconds" => 120, + "region" => "ap-southeast-1", + "profile" => nil, + "coordinator_additional_settings" => { + "max_initialization_attempts" => 2 + }, + "lifecycle_additional_settings" => { + "task_backoff_time_millis" => 20 + }, + "lease_management_additional_settings" => { + "initial_lease_table_read_capacity" => 25, + "initial_lease_table_write_capacity" => 100, + }, + "metrics_additional_settings" => { + "metrics_max_queue_size" => 20000 + }, + "retrieval_additional_settings" => { + "list_shards_backoff_time_in_millis" => 3000 + }, + "processor_additional_settings" => { + "call_process_records_even_for_empty_record_list" => true + } + }} + + # Config hash to test invalid additional_settings where the name is not found + let(:config_with_invalid_additional_settings_name_not_found) {{ + "application_name" => "my-processor", + "kinesis_stream_name" => "run-specs", + "codec" => codec, + "metrics" => metrics, + "checkpoint_interval_seconds" => 120, + "region" => "ap-southeast-1", + "profile" => nil, + "lease_management_additional_settings" => { + "foo" => "bar" + } + }} + + # Config hash to test invalid additional_settings where the type is complex or wrong + let(:config_with_invalid_additional_settings_wrong_type) {{ + "application_name" => "my-processor", + "kinesis_stream_name" => "run-specs", + "codec" => codec, + "metrics" => metrics, + "checkpoint_interval_seconds" => 120, + "region" => "ap-southeast-1", + "profile" => nil, + "coordinator_additional_settings" => { + "max_initialization_attempts" => "invalid_init_attempts" + } + }} + subject!(:kinesis) { LogStash::Inputs::Kinesis.new(config) } let(:kcl_worker) { double('kcl_worker') } let(:metrics) { nil } @@ -57,7 +115,7 @@ expect(kinesis.kcl_config.streamName).to eq("run-specs") expect(kinesis.retrieval_config.initialPositionInStreamExtended.initialPositionInStream).to eq(KCL::InitialPositionInStream::TRIM_HORIZON) - assert_client_region = -> (client) do + assert_client_region = lambda do |client| config = get_client_configuration client expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::AWS_REGION).to_s).to eq("ap-southeast-1") end @@ -66,7 +124,7 @@ assert_client_region.call kinesis.kcl_config.dynamoDBClient assert_client_region.call kinesis.kcl_config.kinesisClient - assert_credentials_provider = -> (client) do + assert_credentials_provider = lambda do |client| config = get_client_configuration client expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::CREDENTIALS_PROVIDER).getClass.to_s).to eq("software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider") end @@ -76,12 +134,36 @@ assert_credentials_provider.call kinesis.kcl_config.kinesisClient end + + subject!(:kinesis_with_valid_additional_settings) { LogStash::Inputs::Kinesis.new(config_with_valid_additional_settings) } + it "configures the KCL" do + kinesis_with_valid_additional_settings.register + expect(kinesis_with_valid_additional_settings.coordinator_config.applicationName).to eq("my-processor") + expect(kinesis_with_valid_additional_settings.retrieval_config.streamName).to eq("run-specs") + expect(kinesis_with_valid_additional_settings.coordinator_config.maxInitializationAttempts).to eq(2) + expect(kinesis_with_valid_additional_settings.lifecycle_config.taskBackoffTimeMillis).to eq(20) + expect(kinesis_with_valid_additional_settings.lease_management_config.initialLeaseTableReadCapacity).to eq(25) + expect(kinesis_with_valid_additional_settings.lease_management_config.initialLeaseTableWriteCapacity).to eq(100) + expect(kinesis_with_valid_additional_settings.metrics_config.metricsMaxQueueSize).to eq(20000) + expect(kinesis_with_valid_additional_settings.retrieval_config.listShardsBackoffTimeInMillis).to eq(3000) + end + + subject!(:kinesis_with_invalid_additional_settings_name_not_found) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_name_not_found) } + it "raises NoMethodError for invalid configuration options" do + expect{ kinesis_with_invalid_additional_settings_name_not_found.register }.to raise_error(NoMethodError) + end + + subject!(:kinesis_with_invalid_additional_settings_wrong_type) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_wrong_type) } + it "raises an error for invalid configuration values such as the wrong type" do + expect{ kinesis_with_invalid_additional_settings_wrong_type.register }.to raise_error(NameError) + end + subject!(:kinesis_with_profile) { LogStash::Inputs::Kinesis.new(config_with_profile) } it "uses ProfileCredentialsProvider if profile is specified" do kinesis_with_profile.register - assert_credentials_provider = -> (client) do + assert_credentials_provider = lambda do |client| config = get_client_configuration client expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::CREDENTIALS_PROVIDER).getClass.to_s).to eq("software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider") end @@ -100,7 +182,7 @@ expect(kinesis_with_latest.kcl_config.streamName).to eq("run-specs") expect(kinesis_with_latest.retrieval_config.initialPositionInStreamExtended.initialPositionInStream).to eq(KCL::InitialPositionInStream::LATEST) - assert_client_region = -> (client) do + assert_client_region = lambda do |client| config = get_client_configuration client expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::AWS_REGION).to_s).to eq("ap-southeast-1") end @@ -109,7 +191,7 @@ assert_client_region.call kinesis_with_latest.kcl_config.dynamoDBClient assert_client_region.call kinesis_with_latest.kcl_config.kinesisClient - assert_credentials_provider = -> (client) do + assert_credentials_provider = lambda do |client| config = get_client_configuration client expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::CREDENTIALS_PROVIDER).getClass.to_s).to eq("software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider") end