Skip to content

Commit

Permalink
Allow custom KCL config values to be passed
Browse files Browse the repository at this point in the history
  • Loading branch information
w4 committed Nov 17, 2018
1 parent 351e35e commit f47e91e
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 13 deletions.
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,32 @@ This are the properties you can configure and what are the default values:
* **required**: false
* **default value**: `"TRIM_HORIZON"`

### Additional KCL Settings

Each configuration value defined in the KCL config files given below can be passed as snake_case, for example to set `initialLeaseTableReadCapacity` in `LeaseManagementConfig` to 30 the following configuration block could be used: `"lease_management_additional_settings" => { "initial_lease_table_read_capacity" => 30 }`

* `checkpoint_additional_settings`: Configuration values to set in [CheckpointConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java).
* **required**: false
* **default value**: `{}`
* `coordinator_additional_settings`: Configuration values to set in [CoordinatorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java).
* **required**: false
* **default value**: `{}`
* `lease_management_additional_settings`: Configuration values to set in [LeaseManagementConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java).
* **required**: false
* **default value**: `{}`
* `lifecycle_additional_settings`: Configuration values to set in [LifecycleConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java).
* **required**: false
* **default value**: `{}`
* `metrics_additional_settings`: Configuration values to set in [MetricsConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java).
* **required**: false
* **default value**: `{}`
* `retrieval_additional_settings`: Configuration values to set in [RetrievalConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java).
* **required**: false
* **default value**: `{}`
* `processor_additional_settings`: Configuration values to set in [ProcessorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java).
* **required**: false
* **default value**: `{}`

## Authentication

This plugin uses the default AWS SDK auth chain, [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html), to determine which credentials the client will use, unless `profile` is set, in which case [ProfileCredentialsProvider](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/profile/ProfileCredentialsProvider.html) is used.
Expand Down
56 changes: 48 additions & 8 deletions lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base

attr_reader(
:kcl_config,
:kcl_worker,

:checkpoint_config,
:coordinator_config,
:lease_management_config,
:lifecycle_config,
:metrics_config,
:retrieval_config,
:lease_management_config,
:kcl_worker,
)

# The application name used for the dynamodb coordination table. Must be
Expand All @@ -72,6 +76,27 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# Select initial_position_in_stream. Accepts TRIM_HORIZON or LATEST
config :initial_position_in_stream, :validate => ["TRIM_HORIZON", "LATEST"], :default => "TRIM_HORIZON"

# Any additional arbitrary kcl options configurable in the CheckpointConfig
config :checkpoint_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the CoordinatorConfig
config :coordinator_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the LeaseManagementConfig
config :lease_management_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the LifecycleConfig
config :lifecycle_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the MetricsConfig
config :metrics_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the RetrievalConfig
config :retrieval_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the ProcessorConfig
config :processor_additional_settings, :validate => :hash, :default => {}

def initialize(params = {})
super(params)
end
Expand Down Expand Up @@ -112,13 +137,30 @@ def register
worker_factory([])
)

@checkpoint_config = send_additional_settings(@kcl_config.checkpoint_config, @checkpoint_additional_settings)

@coordinator_config = send_additional_settings(@kcl_config.coordinator_config, @coordinator_additional_settings)

@lifecycle_config = send_additional_settings(@kcl_config.lifecycle_config, @lifecycle_additional_settings)

@metrics_config = @kcl_config.metrics_config.metrics_factory(metrics_factory)
@metrics_config = send_additional_settings(@metrics_config, @metrics_additional_settings)

@retrieval_config = @kcl_config.retrieval_config.
initial_position_in_stream_extended(software.amazon.kinesis.common.InitialPositionInStreamExtended.new_initial_position(initial_position_in_stream))
@retrieval_config = send_additional_settings(@retrieval_config, @retrieval_additional_settings)

@lease_management_config = @kcl_config.lease_management_config.
failover_time_millis(@checkpoint_interval_seconds * 1000 * 3)
@lease_management_config = send_additional_settings(@lease_management_config, @lease_management_additional_settings)
end

def send_additional_settings(obj, options)
options.each do |key, value|
obj = obj.send(key, value)
end

obj
end

def run(output_queue)
Expand All @@ -128,14 +170,12 @@ def run(output_queue)

def kcl_builder(output_queue)
Scheduler.new(
@kcl_config.checkpoint_config,
@kcl_config.coordinator_config,
@checkpoint_config,
@coordinator_config,
@lease_management_config,
@kcl_config.lifecycle_config,
@lifecycle_config,
@metrics_config,
# checkpointing is done on processRecords so we need Kinesis to always call us so we don't lose this shard
# even when we're active
ProcessorConfig.new(worker_factory(output_queue)).call_process_records_even_for_empty_record_list(true),
send_additional_settings(ProcessorConfig.new(worker_factory(output_queue)), @processor_additional_settings),
@retrieval_config
)
end
Expand Down
92 changes: 87 additions & 5 deletions spec/inputs/kinesis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,64 @@
"initial_position_in_stream" => "LATEST"
}}

# Config hash to test valid additional_settings
let(:config_with_valid_additional_settings) {{
"application_name" => "my-processor",
"kinesis_stream_name" => "run-specs",
"codec" => codec,
"metrics" => metrics,
"checkpoint_interval_seconds" => 120,
"region" => "ap-southeast-1",
"profile" => nil,
"coordinator_additional_settings" => {
"max_initialization_attempts" => 2
},
"lifecycle_additional_settings" => {
"task_backoff_time_millis" => 20
},
"lease_management_additional_settings" => {
"initial_lease_table_read_capacity" => 25,
"initial_lease_table_write_capacity" => 100,
},
"metrics_additional_settings" => {
"metrics_max_queue_size" => 20000
},
"retrieval_additional_settings" => {
"list_shards_backoff_time_in_millis" => 3000
},
"processor_additional_settings" => {
"call_process_records_even_for_empty_record_list" => true
}
}}

# Config hash to test invalid additional_settings where the name is not found
let(:config_with_invalid_additional_settings_name_not_found) {{
"application_name" => "my-processor",
"kinesis_stream_name" => "run-specs",
"codec" => codec,
"metrics" => metrics,
"checkpoint_interval_seconds" => 120,
"region" => "ap-southeast-1",
"profile" => nil,
"lease_management_additional_settings" => {
"foo" => "bar"
}
}}

# Config hash to test invalid additional_settings where the type is complex or wrong
let(:config_with_invalid_additional_settings_wrong_type) {{
"application_name" => "my-processor",
"kinesis_stream_name" => "run-specs",
"codec" => codec,
"metrics" => metrics,
"checkpoint_interval_seconds" => 120,
"region" => "ap-southeast-1",
"profile" => nil,
"coordinator_additional_settings" => {
"max_initialization_attempts" => "invalid_init_attempts"
}
}}

subject!(:kinesis) { LogStash::Inputs::Kinesis.new(config) }
let(:kcl_worker) { double('kcl_worker') }
let(:metrics) { nil }
Expand All @@ -57,7 +115,7 @@
expect(kinesis.kcl_config.streamName).to eq("run-specs")
expect(kinesis.retrieval_config.initialPositionInStreamExtended.initialPositionInStream).to eq(KCL::InitialPositionInStream::TRIM_HORIZON)

assert_client_region = -> (client) do
assert_client_region = lambda do |client|
config = get_client_configuration client
expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::AWS_REGION).to_s).to eq("ap-southeast-1")
end
Expand All @@ -66,7 +124,7 @@
assert_client_region.call kinesis.kcl_config.dynamoDBClient
assert_client_region.call kinesis.kcl_config.kinesisClient

assert_credentials_provider = -> (client) do
assert_credentials_provider = lambda do |client|
config = get_client_configuration client
expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::CREDENTIALS_PROVIDER).getClass.to_s).to eq("software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider")
end
Expand All @@ -76,12 +134,36 @@
assert_credentials_provider.call kinesis.kcl_config.kinesisClient
end


subject!(:kinesis_with_valid_additional_settings) { LogStash::Inputs::Kinesis.new(config_with_valid_additional_settings) }
it "configures the KCL" do
kinesis_with_valid_additional_settings.register
expect(kinesis_with_valid_additional_settings.coordinator_config.applicationName).to eq("my-processor")
expect(kinesis_with_valid_additional_settings.retrieval_config.streamName).to eq("run-specs")
expect(kinesis_with_valid_additional_settings.coordinator_config.maxInitializationAttempts).to eq(2)
expect(kinesis_with_valid_additional_settings.lifecycle_config.taskBackoffTimeMillis).to eq(20)
expect(kinesis_with_valid_additional_settings.lease_management_config.initialLeaseTableReadCapacity).to eq(25)
expect(kinesis_with_valid_additional_settings.lease_management_config.initialLeaseTableWriteCapacity).to eq(100)
expect(kinesis_with_valid_additional_settings.metrics_config.metricsMaxQueueSize).to eq(20000)
expect(kinesis_with_valid_additional_settings.retrieval_config.listShardsBackoffTimeInMillis).to eq(3000)
end

subject!(:kinesis_with_invalid_additional_settings_name_not_found) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_name_not_found) }
it "raises NoMethodError for invalid configuration options" do
expect{ kinesis_with_invalid_additional_settings_name_not_found.register }.to raise_error(NoMethodError)
end

subject!(:kinesis_with_invalid_additional_settings_wrong_type) { LogStash::Inputs::Kinesis.new(config_with_invalid_additional_settings_wrong_type) }
it "raises an error for invalid configuration values such as the wrong type" do
expect{ kinesis_with_invalid_additional_settings_wrong_type.register }.to raise_error(NameError)
end

subject!(:kinesis_with_profile) { LogStash::Inputs::Kinesis.new(config_with_profile) }

it "uses ProfileCredentialsProvider if profile is specified" do
kinesis_with_profile.register

assert_credentials_provider = -> (client) do
assert_credentials_provider = lambda do |client|
config = get_client_configuration client
expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::CREDENTIALS_PROVIDER).getClass.to_s).to eq("software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider")
end
Expand All @@ -100,7 +182,7 @@
expect(kinesis_with_latest.kcl_config.streamName).to eq("run-specs")
expect(kinesis_with_latest.retrieval_config.initialPositionInStreamExtended.initialPositionInStream).to eq(KCL::InitialPositionInStream::LATEST)

assert_client_region = -> (client) do
assert_client_region = lambda do |client|
config = get_client_configuration client
expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::AWS_REGION).to_s).to eq("ap-southeast-1")
end
Expand All @@ -109,7 +191,7 @@
assert_client_region.call kinesis_with_latest.kcl_config.dynamoDBClient
assert_client_region.call kinesis_with_latest.kcl_config.kinesisClient

assert_credentials_provider = -> (client) do
assert_credentials_provider = lambda do |client|
config = get_client_configuration client
expect(config.option(software.amazon.awssdk.awscore.client.config.AwsClientOption::CREDENTIALS_PROVIDER).getClass.to_s).to eq("software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider")
end
Expand Down

0 comments on commit f47e91e

Please sign in to comment.