Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for Kinesis source in Data prepper #8252

Merged
merged 17 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Follow these steps to set up your local copy of the repository:

1. Navigate to your cloned repository.

##### Building using locally installed packages
##### Building by using locally installed packages

1. Install [Ruby](https://www.ruby-lang.org/en/) if you don't already have it. We recommend [RVM](https://rvm.io/), but you can use any method you prefer:

Expand All @@ -100,7 +100,7 @@ Follow these steps to set up your local copy of the repository:
bundle install
```

##### Building using containerization
##### Building by using containerization

Assuming you have `docker-compose` installed, run the following command:

Expand Down
4 changes: 3 additions & 1 deletion _analyzers/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,16 @@ The response provides information about the analyzers for each field:
```

## Normalizers
vagimeli marked this conversation as resolved.
Show resolved Hide resolved

Tokenization divides text into individual terms, but it does not address variations in token forms. Normalization resolves these issues by converting tokens into a standard format. This ensures that similar terms are matched appropriately, even if they are not identical.

### Normalization techniques

The following normalization techniques can help address variations in token forms:
vagimeli marked this conversation as resolved.
Show resolved Hide resolved

1. **Case normalization**: Converts all tokens to lowercase to ensure case-insensitive matching. For example, "Hello" is normalized to "hello".

2. **Stemming**: Reduces words to their root form. For instance, "cars" is stemmed to "car", and "running" is normalized to "run".
2. **Stemming**: Reduces words to their root form. For instance, "cars" is stemmed to "car" and "running" is normalized to "run".

3. **Synonym handling:** Treats synonyms as equivalent. For example, "jogging" and "running" can be indexed under a common term, such as "run".

Expand Down
158 changes: 158 additions & 0 deletions _data-prepper/pipelines/configuration/sources/kinesis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
---
layout: default
title: kinesis
parent: Sources
grand_parent: Pipelines
nav_order: 3
---

# kinesis

Check failure on line 9 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'kinesis' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'kinesis' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 9, "column": 3}}}, "severity": "ERROR"}

Check failure on line 9 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 9, "column": 3}}}, "severity": "ERROR"}

You can use `kinesis` source in Data Prepper to ingest records from one or more [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/).

Check failure on line 11 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 11, "column": 89}}}, "severity": "ERROR"}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
You can use `kinesis` source in Data Prepper to ingest records from one or more [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/).
You can use the Data Prepper `kinesis` source to ingest records from one or more [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/).


## Usage

The following example pipeline specifies Kinesis as a source. It ingests data from multiple Kinesis Data streams named `stream1` and `stream2`. It also indicates the `initial_position`, which tells the pipeline from where to start reading Kinesis stream records:

Check failure on line 15 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 15, "column": 42}}}, "severity": "ERROR"}

Check failure on line 15 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 15, "column": 93}}}, "severity": "ERROR"}

Check failure on line 15 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: Kinesis. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 15, "column": 240}}}, "severity": "ERROR"}

```yaml
version: "2"
kinesis-pipeline:
source:
kinesis:
streams:
- stream_name: "stream1"
initial_position: "LATEST"
- stream_name: "stream2"
initial_position: "LATEST"
aws:
region: "us-west-2"
sts_role_arn: "arn:aws:iam::123456789012:role/my-iam-role"
```

## Configuration options

The following configuration options are supported for the `kinesis` source.

Option | Required | Type | Description
:--- |:---------|:---------| :---
`aws` | Yes | AWS | The AWS configuration. See [aws](#aws) for more information.

Check failure on line 38 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 38, "column": 2}}}, "severity": "ERROR"}

Check failure on line 38 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.HeadingCapitalization] 'aws' is a heading and should be in sentence case. Raw Output: {"message": "[OpenSearch.HeadingCapitalization] 'aws' is a heading and should be in sentence case.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 38, "column": 59}}}, "severity": "ERROR"}

Check failure on line 38 in _data-prepper/pipelines/configuration/sources/kinesis.md

View workflow job for this annotation

GitHub Actions / style-job

[vale] reported by reviewdog 🐶 [OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks. Raw Output: {"message": "[OpenSearch.Spelling] Error: aws. If you are referencing a setting, variable, format, function, or repository, surround it with tic marks.", "location": {"path": "_data-prepper/pipelines/configuration/sources/kinesis.md", "range": {"start": {"line": 38, "column": 59}}}, "severity": "ERROR"}
`acknowledgments` | No | Boolean | When `true`, enables `kinesis` source to receive [end-to-end acknowledgments]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/pipelines#end-to-end-acknowledgments) when events are received by OpenSearch sinks.
`streams` | Yes | List | Multiple Kinesis Data streams that Data Prepper `kinesis` source uses to read records. You can configure up to 4 streams. For more information about `streams` configuration options, see [Streams](#streams).
`codec` | Yes | Codec | The [codec](#codec) to apply.
`buffer_timeout` | No | Duration | The amount of time allowed for writing events to the Data Prepper buffer before timeout occurs. Any events that the source cannot write to the buffer during the specified amount of time are discarded. Default is `1s`.
`records_to_accumulate` | No | Integer | The number of messages that accumulate before being written to the buffer. Default is `100`.
`consumer_strategy` | No | String | Consumer strategy to use for ingesting Kinesis data streams. Default is `fan-out`. However, `polling` can also be used. if `polling` is enabled, additional configuration for `polling` will need to be added.
`polling` | No | polling | Refer to [polling](#polling).


### Streams

Use the following options in the `streams` array.

Option | Required | Type | Description
:--- |:---------| :--- | :---
`stream_name` | Yes | String | The name of each Kinesis stream.
`initial_position` | No | String | The position from where `kinesis` source starts reading stream records. `LATEST` starts reading from the most recent stream record. `EARLIEST` starts reading from the begining of the stream. Default is `LATEST`.
`checkpoint_interval` | No | Duration | Periodically checkpoint Kinesis streams to avoid duplication of record processing. Default is `PT2M`.
`compression` | No | String | Specifies the compression format. To decompress records added by [CloudWatch subscription filter](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html) to Kinesis, use the `gzip` compression format.

## codec

The `codec` determines how the `kinesis` source parses each Amazon Kinesis Record. For increased and more efficient performance, you can use [codec combinations]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/codec-processor-combinations/) with certain processors.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The `codec` determines how the `kinesis` source parses each Amazon Kinesis Record. For increased and more efficient performance, you can use [codec combinations]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/codec-processor-combinations/) with certain processors.
The `codec` determines how the `kinesis` source parses each Kinesis stream record. For increased and more efficient performance, you can use [codec combinations]({{site.url}}{{site.baseurl}}/data-prepper/common-use-cases/codec-processor-combinations/) with certain processors.


### `newline` codec

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add JSON codec here?


The `newline` codec parses each single line as a single log event. This is ideal where each Kinesis stream record is processed as a single line. It also matches well when used with the [parse_json]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/configuration/processors/parse-json/) processor to parse each line.

Use the following options to configure the `newline` codec.

Option | Required | Type | Description
:--- | :--- |:--------| :---
`skip_lines` | No | Integer | The number of lines to skip before creating events. You can use this configuration to skip common header rows. Default is `0`.
`header_destination` | No | String | A key value to assign to the header line of the stream event. If this option is specified, then each event will contain a `header_destination` field.

### polling
Option | Required | Type | Description
:--- | :--- |:--------| :---
`max_polling_records` | No | Integer | The number of records to fetch from Kinesis during a single call to get Kinesis stream records.
`idle_time_between_reads` | No | Duration | The time duration to sleep in between calls to get Kinesis stream records.

### aws

Use the following options in the AWS configuration.

Option | Required | Type | Description
:--- | :--- | :--- | :---
`region` | No | String | The AWS Region to use for credentials. Defaults to [standard SDK behavior to determine the Region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
`sts_role_arn` | No | String | The AWS Security Token Service (AWS STS) role to assume for requests to Amazon Kinesis Data Streams (Amazon Kinesis) and Amazon DynamoDb (Amazon DynamoDb). Defaults to `null`, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
`aws_sts_header_overrides` | No | Map | A map of header overrides that the AWS Identity and Access Management (IAM) role assumes for the sink plugin.

## Exposed metadata attributes

The following metadata will be added to each event that is processed by the `kinesis` source. These metadata attributes can be accessed using the [expression syntax `getMetadata` function]({{site.url}}{{site.baseurl}}/data-prepper/pipelines/get-metadata/).

* `kinesis_stream_name`: The name of the Kinesis stream that an event came from.

## Permissions

The following are the minimum required permissions for running `kinesis` as a source:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:DescribeStreamConsumer",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:ListStreamConsumers",
"kinesis:RegisterStreamConsumer",
"kinesis:SubscribeToShard"
],
"Resource": [
"arn:aws:kinesis:us-east-1:{account-id}:stream/stream1",
"arn:aws:kinesis:us-east-1:{account-id}:stream/stream2"
]
},
{
"Sid": "allowCreateTable",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:PutItem",
"dynamodb:DescribeTable",
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:Scan",
"dynamodb:UpdateItem",
"dynamodb:Query"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:{account-id}:table/kinesis-pipeline"
]
}
]
}
```

DynamoDb permissions are required as `kinesis` source in Data Prepper uses DynamoDb table for ingestion coordination among multiple workers.

## Metrics

The `kinesis` source includes the following metrics.

### Counters

* `recordsProcessed`: The number of stream records processed from Kinesis streams.
* `recordProcessingErrors`: The number of processing errors for stream records from Kinesis streams.
* `acknowledgementSetSuccesses`: The total number stream records processed which were successfully added to sink.
* `acknowledgementSetFailures`: The total number stream records processed which failed to be added to sink.



Loading