Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sbose2k21 committed Oct 22, 2024
1 parent c749f2f commit 4cb7caf
Showing 1 changed file with 27 additions and 16 deletions.
43 changes: 27 additions & 16 deletions _data-prepper/pipelines/configuration/sources/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ nav_order: 3

# kinesis

Check failure on line 9 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 9, "column": 3}}}, "severity": "ERROR"}

Check failure on line 9 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'kinesis' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'kinesis' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 9, "column": 3}}}, "severity": "ERROR"}

You can use `kinesis` source in Data Prepper to ingest records from one or more [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/).
You can use the Data Prepper `kinesis` source to ingest records from one or more [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/).

Check failure on line 11 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 11, "column": 90}}}, "severity": "ERROR"}

## Usage

Expand Down Expand Up @@ -50,18 +50,29 @@ You can use the following options in the `streams` array.

Option | Required | Type | Description
:--- |:---------| :--- | :---
`stream_name` | Yes | String | Defines the name of each Kinesis stream.
`initial_position` | No | String | Sets the `initial_position` to determine where the `kinesis` source starts reading stream records. Use `LATEST` to start from the most recent record or `EARLIEST` to start from the beginning of the stream. Default is `LATEST`.
`checkpoint_interval` | No | Duration | Configure the `checkpoint_interval` to periodically checkpoint Kinesis streams and avoid duplication of record processing. Default is `PT2M`.
`compression` | No | String | Specifies the compression format. To decompress records added by the [CloudWatch subscription filter](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html) to Kinesis, use the `gzip` compression format.
`stream_name` | Yes | String | Defines the name of each Kinesis data stream.
`initial_position` | No | String | Sets the `initial_position` to determine at what point the `kinesis` source starts reading stream records. Use `LATEST` to start from the most recent record or `EARLIEST` to start from the beginning of the stream. Default is `LATEST`.
`checkpoint_interval` | No | Duration | Configure the `checkpoint_interval` to periodically checkpoint Kinesis data streams and avoid duplication of record processing. Default is `PT2M`.
`compression` | No | String | Specifies the compression format. To decompress records added by a [CloudWatch Logs Subscription Filter](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html) to Kinesis, use the `gzip` compression format.

## codec

The `codec` determines how the `kinesis` source parses each Amazon Kinesis Record. For increased and more efficient performance, you can use [codec combinations]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/codec-processor-combinations/) with certain processors.
The `codec` determines how the `kinesis` source parses each Kinesis stream record. For increased and more efficient performance, you can use [codec combinations]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/codec-processor-combinations/) with certain processors.

### json codec

The `json` codec parses each single line as a single JSON object from a JSON array and then creates a Data Prepper event for each object in the array. It can be used for parsing nested CloudWatch events into individual log entries.
It also supports the below configuration to use with this codec.

Check warning on line 65 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.DirectionAboveBelow] Use 'following or later' instead of 'below' for versions or orientation within a document. Use 'above' and 'below' only for physical space or screen descriptions. Raw Output: {"message": "[OpenSearch.DirectionAboveBelow] Use 'following or later' instead of 'below' for versions or orientation within a document. Use 'above' and 'below' only for physical space or screen descriptions.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 65, "column": 22}}}, "severity": "WARNING"}

Option | Required | Type | Description
:--- | :--- |:--------| :---
`key_name` | No | String | The name of the input field from which to extract the JSON array and create Data Prepper events.
`include_keys` | No | List | The list of input fields to be extracted and added as additional fields in the Data Prepper event.
`include_keys_metadata` | No | List | The list of input fields to be extracted and added to the Data Prepper event metadata object.

### `newline` codec

The newline codec parses each Kinesis stream record as a single log event, making it ideal for processing single-line records. It also works well with the [`parse_json` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/parse-json/) to parse each line.
The `newline` codec parses each Kinesis stream record as a single log event, making it ideal for processing single-line records. It also works well with the [`parse_json` processor]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/parse-json/) to parse each line.

You can use the following options to configure the `newline` codec.

Expand All @@ -72,32 +83,32 @@ Option | Required | Type | Description

### polling

When the `consumer_strategy` is set to polling, the `kinesis` source uses a polling-based approach to read records from the Kinesis streams, instead of the default `fan-out` approach.
When the `consumer_strategy` is set to `polling`, the `kinesis` source uses a polling-based approach to read records from the Kinesis data streams, instead of the default `fan-out` approach.

Option | Required | Type | Description
:--- | :--- |:--------| :---
`max_polling_records` | No | Integer | Sets the number of records to fetch from Kinesis during a single call to get Kinesis stream records.
`idle_time_between_reads` | No | Duration | Defines the time duration to sleep in between calls to get Kinesis stream records.
`max_polling_records` | No | Integer | Sets the number of records to fetch from Kinesis during a single call.
`idle_time_between_reads` | No | Duration | Defines the amount of idle time between calls.

### aws

You can use the following options in the `aws` configuration.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`region` | No | String | Sets the AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
`sts_role_arn` | No | String | Defines the AWS Security Token Service (AWS STS) role to assume for requests to Amazon Kinesis Data Streams and Amazon DynamoDb. Defaults to `null`, which uses the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
`region` | No | String | Sets the AWS Region to use for credentials. Defaults to the [standard SDK behavior for determining the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
`sts_role_arn` | No | String | Defines the AWS Security Token Service (AWS STS) role to assume for requests to Amazon Kinesis Data Streams and Amazon DynamoDB. Defaults to `null`, which uses the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
`aws_sts_header_overrides` | No | Map | Defines a map of header overrides that the AWS Identity and Access Management (IAM) role assumes for the sink plugin.

## Exposed metadata attributes

The `kinesis` source adds the following metadata to each processed event. You can access the metadata attributes using the [expression syntax `getMetadata` function]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/get-metadata/).

- `kinesis_stream_name`: Contains the name of the Kinesis stream that the event came from.
- `stream_name`: Contains the name of the Kinesis data stream from which the event was obtained.

## Permissions

You need the following minimum required permissions to run `kinesis` as a source:
The following minimum permissions are required in order to run `kinesis` as a source:

```json
{
Expand Down Expand Up @@ -143,7 +154,7 @@ You need the following minimum required permissions to run `kinesis` as a source
}
```

The `kinesis` source in Data Prepper uses a DynamoDB table for ingestion coordination among multiple workers, so you need DynamoDB permissions.
The `kinesis` source uses a DynamoDB table for ingestion coordination among multiple workers, so you need DynamoDB permissions.

## Metrics

Expand All @@ -153,5 +164,5 @@ The `kinesis` source includes the following metrics.

* `recordsProcessed`: Counts the number of processed stream records.
* `recordProcessingErrors`: Counts the number of stream record processing errors.
* `acknowledgementSetSuccesses`: Tracks the total number stream records processed that were successfully added to sink.
* `acknowledgementSetSuccesses`: Counts the number of processed stream records that were successfully added to the sink.
* `acknowledgementSetFailures`: Counts the number of processed stream records that failed to be added to the sink.

0 comments on commit 4cb7caf

Please sign in to comment.