Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to KCL v2 #55

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,28 @@ This are the properties you can configure and what are the default values:
* **default value**: `"TRIM_HORIZON"`

### Additional KCL Settings
* `additional_settings`: The KCL provides several configuration options which can be set in [KinesisClientLibConfiguration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java). These options are configured via various function calls that all begin with `with`. Some of these functions take complex types, which are not supported. However, you may invoke any one of the `withX()` functions that take a primitive by providing key-value pairs in `snake_case`. For example, to set the dynamodb read and write capacity values, two functions exist, withInitialLeaseTableReadCapacity and withInitialLeaseTableWriteCapacity. To set a value for these, provide a hash of `additional_settings => {"initial_lease_table_read_capacity" => 25, "initial_lease_table_write_capacity" => 100}`

Each configuration value defined in the KCL config files given below can be passed as snake_case, for example to set `initialLeaseTableReadCapacity` in `LeaseManagementConfig` to 30 the following configuration block could be used: `"lease_management_additional_settings" => { "initial_lease_table_read_capacity" => 30 }`

* `checkpoint_additional_settings`: Configuration values to set in [CheckpointConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/CheckpointConfig.java).
* **required**: false
* **default value**: `{}`
* `coordinator_additional_settings`: Configuration values to set in [CoordinatorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/CoordinatorConfig.java).
* **required**: false
* **default value**: `{}`
* `lease_management_additional_settings`: Configuration values to set in [LeaseManagementConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java).
* **required**: false
* **default value**: `{}`
* `lifecycle_additional_settings`: Configuration values to set in [LifecycleConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/LifecycleConfig.java).
* **required**: false
* **default value**: `{}`
* `metrics_additional_settings`: Configuration values to set in [MetricsConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/metrics/MetricsConfig.java).
* **required**: false
* **default value**: `{}`
* `retrieval_additional_settings`: Configuration values to set in [RetrievalConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java).
* **required**: false
* **default value**: `{}`
* `processor_additional_settings`: Configuration values to set in [ProcessorConfig](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/ProcessorConfig.java).
* **required**: false
* **default value**: `{}`

Expand Down
154 changes: 114 additions & 40 deletions lib/logstash/inputs/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
require "logstash/inputs/kinesis/version"


def software
Java::Software
end

# Receive events through an AWS Kinesis stream.
#
# This input plugin uses the Java Kinesis Client Library underneath, so the
Expand All @@ -24,15 +28,29 @@
#
# The library can optionally also send worker statistics to CloudWatch.
class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
KCL = com.amazonaws.services.kinesis.clientlibrary.lib.worker
KCL_PROCESSOR_FACTORY_CLASS = com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
ConfigsBuilder = software.amazon.kinesis.common.ConfigsBuilder
ProcessorConfig = software.amazon.kinesis.processor.ProcessorConfig
Scheduler = software.amazon.kinesis.coordinator.Scheduler

KinesisClientUtil = software.amazon.kinesis.common.KinesisClientUtil
KinesisAsyncClient = software.amazon.awssdk.services.kinesis.KinesisAsyncClient
DynamoDbAsyncClient = software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
CloudWatchAsyncClient = software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient

require "logstash/inputs/kinesis/worker"

config_name 'kinesis'

attr_reader(
:kcl_config,
:kcl_worker,

:checkpoint_config,
:coordinator_config,
:lease_management_config,
:lifecycle_config,
:metrics_config,
:retrieval_config,
)

# The application name used for the dynamodb coordination table. Must be
Expand All @@ -58,73 +76,129 @@ class LogStash::Inputs::Kinesis < LogStash::Inputs::Base
# Select initial_position_in_stream. Accepts TRIM_HORIZON or LATEST
config :initial_position_in_stream, :validate => ["TRIM_HORIZON", "LATEST"], :default => "TRIM_HORIZON"

# Any additional arbitrary kcl options configurable in the KinesisClientLibConfiguration
config :additional_settings, :validate => :hash, :default => {}
# Any additional arbitrary kcl options configurable in the CheckpointConfig
config :checkpoint_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the CoordinatorConfig
config :coordinator_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the LeaseManagementConfig
config :lease_management_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the LifecycleConfig
config :lifecycle_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the MetricsConfig
config :metrics_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the RetrievalConfig
config :retrieval_additional_settings, :validate => :hash, :default => {}

# Any additional arbitrary kcl options configurable in the ProcessorConfig
config :processor_additional_settings, :validate => :hash, :default => {}

def initialize(params = {})
super(params)
end

def register
# the INFO log level is extremely noisy in KCL
kinesis_logger = org.apache.commons.logging::LogFactory.getLog("com.amazonaws.services.kinesis").logger
if kinesis_logger.java_kind_of?(java.util.logging::Logger)
kinesis_logger.setLevel(java.util.logging::Level::WARNING)
else
kinesis_logger.setLevel(org.apache.log4j::Level::WARN)
end

hostname = Socket.gethostname
uuid = java.util::UUID.randomUUID.to_s
worker_id = "#{hostname}:#{uuid}"

# If the AWS profile is set, use the profile credentials provider.
# Otherwise fall back to the default chain.
unless @profile.nil?
creds = com.amazonaws.auth.profile::ProfileCredentialsProvider.new(@profile)
creds = software.amazon.awssdk.auth.credentials::ProfileCredentialsProvider.create(@profile)
else
creds = com.amazonaws.auth::DefaultAWSCredentialsProviderChain.new
creds = software.amazon.awssdk.auth.credentials::DefaultCredentialsProvider.create
end
initial_position_in_stream = if @initial_position_in_stream == "TRIM_HORIZON"
KCL::InitialPositionInStream::TRIM_HORIZON
software.amazon.kinesis.common.InitialPositionInStream::TRIM_HORIZON
else
KCL::InitialPositionInStream::LATEST
software.amazon.kinesis.common.InitialPositionInStream::LATEST
end

@kcl_config = KCL::KinesisClientLibConfiguration.new(
@application_name,
region = software.amazon.awssdk.regions.Region.of(@region)

kinesis_client = KinesisClientUtil.create_kinesis_async_client(KinesisAsyncClient.builder().region(region).credentials_provider(creds))
dynamodb_client = DynamoDbAsyncClient.builder().region(region).credentials_provider(creds).build
cloudwatch_client = CloudWatchAsyncClient.builder().region(region).credentials_provider(creds).build

@kcl_config = ConfigsBuilder.new(
@kinesis_stream_name,
creds,
worker_id).
withInitialPositionInStream(initial_position_in_stream).
withRegionName(@region)

# Call arbitrary "withX()" functions
# snake_case => withCamelCase happens automatically
@additional_settings.each do |key, value|
fn = "with_#{key}"
@kcl_config.send(fn, value)
end
@application_name,
kinesis_client,
dynamodb_client,
cloudwatch_client,
worker_id,
# cannot pass nil into ConfigsBuilder because Lombok will throw an NPE - this value
# (processor_config) should not be used.
worker_factory([])
)

@checkpoint_config = send_additional_settings(@kcl_config.checkpoint_config, @checkpoint_additional_settings)

@coordinator_config = send_additional_settings(@kcl_config.coordinator_config, @coordinator_additional_settings)

@lifecycle_config = send_additional_settings(@kcl_config.lifecycle_config, @lifecycle_additional_settings)

@metrics_config = @kcl_config.metrics_config.metrics_factory(metrics_factory)
@metrics_config = send_additional_settings(@metrics_config, @metrics_additional_settings)

@retrieval_config = @kcl_config.retrieval_config.
initial_position_in_stream_extended(software.amazon.kinesis.common.InitialPositionInStreamExtended.new_initial_position(initial_position_in_stream))
@retrieval_config = send_additional_settings(@retrieval_config, @retrieval_additional_settings)

@lease_management_config = @kcl_config.lease_management_config.
failover_time_millis(@checkpoint_interval_seconds * 1000 * 3)
@lease_management_config = send_additional_settings(@lease_management_config, @lease_management_additional_settings)
end

def send_additional_settings(obj, options)
options.each do |key, value|
obj = obj.send(key, value)
end

obj
end

def run(output_queue)
@kcl_worker = kcl_builder(output_queue).build
@kcl_worker = kcl_builder(output_queue)
@kcl_worker.run
end

def kcl_builder(output_queue)
KCL::Worker::Builder.new.tap do |builder|
builder.java_send(:recordProcessorFactory, [KCL_PROCESSOR_FACTORY_CLASS.java_class], worker_factory(output_queue))
builder.config(@kcl_config)

if metrics_factory
builder.metricsFactory(metrics_factory)
end
end
Scheduler.new(
@checkpoint_config,
@coordinator_config,
@lease_management_config,
@lifecycle_config,
@metrics_config,
send_additional_settings(ProcessorConfig.new(worker_factory(output_queue)), @processor_additional_settings),
@retrieval_config
)
end

def stop
@kcl_worker.shutdown if @kcl_worker
if not @kcl_worker
return
end

graceful_shutdown_future = @kcl_worker.start_graceful_shutdown

begin
graceful_shutdown_future.get(20, java.util.concurrent.TimeUnit::SECONDS)
rescue java.lang.InterruptedException
@logger.info("Interrupted while waiting for graceful shutdown. Attempting to force shutdown.")
@kcl_worker.shutdown
rescue java.util.concurrent.ExecutionException => error
@logger.error("Exception while executing graceful shutdown.", error)
raise error
rescue java.util.concurrent.TimeoutException
@logger.error("Timeout while waiting for shutdown. Scheduler may not have exited. Attempting to force shutdown.")
@kcl_worker.shutdown
end
end

def worker_factory(output_queue)
Expand All @@ -136,7 +210,7 @@ def worker_factory(output_queue)
def metrics_factory
case @metrics
when nil
com.amazonaws.services.kinesis.metrics.impl::NullMetricsFactory.new
software.amazon.kinesis.metrics::NullMetricsFactory.new
when 'cloudwatch'
nil # default in the underlying library
end
Expand Down
35 changes: 24 additions & 11 deletions lib/logstash/inputs/kinesis/worker.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# encoding: utf-8
class LogStash::Inputs::Kinesis::Worker
include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor
software = Java::Software

include software.amazon.kinesis.processor::ShardRecordProcessor

attr_reader(
:checkpoint_interval,
Expand Down Expand Up @@ -30,37 +32,48 @@ def processRecords(records_input)
end
end

def shutdown(shutdown_input)
if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.lib.worker::ShutdownReason::TERMINATE
checkpoint(shutdown_input.checkpointer)
end
def leaseLost(lease_lost_input)
end

def shardEnded(shard_ended_input)
checkpoint(shard_ended_input.checkpointer)
end

def shutdownRequested(shutdown_input)
checkpoint(shutdown_input.checkpointer)
end

protected

def checkpoint(checkpointer)
checkpointer.checkpoint()
rescue => error
@logger.error("Kinesis worker failed checkpointing: #{error}")
@logger.error("Kinesis worker failed checkpointing: #{error}", error)
end

def process_record(record)
raw = String.from_java_bytes(record.getData.array)
buf = record.data
buf.rewind
bytes = Java::byte[record.data.remaining].new
buf.get(bytes)

raw = String.from_java_bytes(bytes)

metadata = build_metadata(record)
@codec.decode(raw) do |event|
@decorator.call(event)
event.set('@metadata', metadata)
@output_queue << event
end
rescue => error
@logger.error("Error processing record: #{error}")
@logger.error("Error processing record: #{error}", error)
end

def build_metadata(record)
metadata = Hash.new
metadata['approximate_arrival_timestamp'] = record.getApproximateArrivalTimestamp.getTime
metadata['partition_key'] = record.getPartitionKey
metadata['sequence_number'] = record.getSequenceNumber
metadata['approximate_arrival_timestamp'] = record.approximateArrivalTimestamp.toEpochMilli
metadata['partition_key'] = record.partitionKey
metadata['sequence_number'] = record.sequenceNumber
metadata
end

Expand Down
6 changes: 3 additions & 3 deletions logstash-input-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ Gem::Specification.new do |spec|

spec.platform = 'java'

spec.requirements << "jar 'com.amazonaws:amazon-kinesis-client', '1.9.2'"
spec.requirements << "jar 'com.amazonaws:aws-java-sdk-core', '1.11.414'"
spec.requirements << "jar 'software.amazon.kinesis:amazon-kinesis-client', '2.0.5'"
spec.requirements << "jar 'com.amazonaws:aws-java-sdk-core', '1.11.414'"

spec.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

spec.add_development_dependency 'logstash-devutils'
spec.add_development_dependency 'jar-dependencies', '~> 0.3.4'
spec.add_development_dependency 'jar-dependencies', '~> 0.4.0'
spec.add_development_dependency "logstash-codec-json"
end
Loading