-
Notifications
You must be signed in to change notification settings - Fork 928
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1616 from shivawsm/shivawsm-feature-KinesisDS-Lam…
…bda-with-error-handling
- Loading branch information
Showing
14 changed files
with
550 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
.classpath.txt | ||
target | ||
.classpath | ||
.project | ||
.idea | ||
.settings | ||
.vscode | ||
*.iml | ||
|
||
# CDK asset staging directory | ||
.cdk.staging | ||
cdk.out | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
# Kinesis Data Streams with Lambda Integration | ||
|
||
![architecture diagram](architecture.png) | ||
|
||
## Requirements | ||
|
||
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. | ||
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured | ||
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) | ||
* [AWS CDK Toolkit](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured | ||
* [Java 11+](https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html) installed | ||
* [Docker](https://docs.docker.com/get-docker/) Installed | ||
|
||
## Deployment Instructions | ||
|
||
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: | ||
|
||
``` | ||
git clone https://github.com/aws-samples/serverless-patterns | ||
``` | ||
2. Change directory to the pattern directory: | ||
|
||
``` | ||
cd serverless-patterns/cdk-kinesis-lambda-java | ||
``` | ||
3. From the command line, use AWS CDK to deploy the AWS resources for the serverless application | ||
|
||
```bash | ||
cd infrastructure | ||
``` | ||
4. From the command line, Synthesize the cdk stack to emits the synthesized CloudFormation template. Set up will make sure to build and package | ||
the lambda functions residing in software directory. | ||
|
||
```bash | ||
cdk synth | ||
``` | ||
5. From the command line, use AWS CDK to deploy the AWS resources. | ||
|
||
```bash | ||
cdk deploy | ||
``` | ||
Alternatively infrastructure/deploy.sh can be used to build and deploy the stack | ||
|
||
6. Note the outputs of CDK and copy the Kinesis Resource Name. Use the copied stream name in the producer.sh file which will be used in the next step to put records in to the stream created. | ||
|
||
## How it works | ||
|
||
This Kinesis-Lambda integration pattern makes use of the aws-kinesisstreams-lambda [Solution construct](https://docs.aws.amazon.com/solutions/latest/constructs/aws-kinesisstreams-lambda.html) to create the infrastructure. | ||
|
||
Lambda get triggered based on the events from the Kinesis Data Stream. For any error in invocation of the lambda function events are persisted in the configured dead-letter SQS queue. | ||
|
||
In the example the Kinesis Event Source is configured with `maxretryattempt` as 1, bisectBatchOnError set to true, and `reportBatchItemFailures` set to true with batch size of 3. | ||
|
||
Lambda code has been updated to handle exception on any error due to event processing as per the best practice to return the sequence number. Using this configuration and approach duplicate message reprocessing can be avoided. | ||
|
||
For more details on handling Success and Failure conditions in Kinesis data streams consumption, refer the [documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-errors). | ||
## Testing | ||
Update the producer.sh file with Kinesis stream name which got created. Update the number of messages to get published in to the stream by updating the number in loop as shown in the below statement | ||
|
||
while [ $a -lt 24 ] | ||
|
||
From the command line | ||
|
||
```bash | ||
cd ../software | ||
cd KinesisCliProducers | ||
sh producers.sh | ||
``` | ||
This will publish the messages in the Kinesis stream and the lambda function gets triggered based on that. | ||
|
||
## Cleanup | ||
|
||
1. Delete the stack | ||
```bash | ||
cdk destroy | ||
``` | ||
---- | ||
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
|
||
SPDX-License-Identifier: MIT-0 |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
{ | ||
"title": "Kinesis to Lambda with error handling", | ||
"description": "Create a Java Lambda function with Event Source as Kinesis Data source", | ||
"language": "Java", | ||
"level": "200", | ||
"framework": "CDK", | ||
"introBox": { | ||
"headline": "How it works", | ||
"text": [ | ||
"This sample project demonstrates how to use AWS Lambda (Java runtime) to subscribe to events from a Kinesis Data Stream with error handling to avoid redundant message processing", | ||
"This example uses AWS CDK constructs" | ||
] | ||
}, | ||
"gitHub": { | ||
"template": { | ||
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/cdk-kinesis-lambda-java", | ||
"templateURL": "serverless-patterns/cdk-kinesis-lambda-java", | ||
"projectFolder": "cdk-kinesis-lambda-java", | ||
"templateFile": "infrastructure/src/main/java/com/myorg/InfrastructureStack.java" | ||
} | ||
}, | ||
"resources": { | ||
"bullets": [ | ||
{ | ||
"text": "AWS CDK Construct for Kinesis and Lambda", | ||
"link": "https://docs.aws.amazon.com/solutions/latest/constructs/aws-kinesisstreams-lambda.html" | ||
} | ||
] | ||
}, | ||
"deploy": { | ||
"text": [ | ||
"cdk deploy" | ||
] | ||
}, | ||
"testing": { | ||
"text": [ | ||
"See the GitHub repo for detailed testing instructions." | ||
] | ||
}, | ||
"cleanup": { | ||
"text": [ | ||
"Delete the stack: <code>cdk destroy</code>." | ||
] | ||
}, | ||
"authors": [ | ||
{ | ||
"name": "Shiva Mahalingam", | ||
"image": "", | ||
"bio": "Solutions Architect @ AWS", | ||
"linkedin": "shivamahalingam", | ||
"twitter": "" | ||
} | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
.classpath.txt | ||
target | ||
.classpath | ||
.project | ||
.idea | ||
.settings | ||
.vscode | ||
*.iml | ||
|
||
# CDK asset staging directory | ||
.cdk.staging | ||
cdk.out | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"app": "mvn -e -q compile exec:java", | ||
"context": { | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
cd ../software/KinesisLambdaClient | ||
mvn clean package | ||
cd ../../infrastructure | ||
mvn clean compile | ||
cdk synth | ||
cdk deploy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" | ||
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<groupId>com.myorg</groupId> | ||
<artifactId>infrastructure</artifactId> | ||
<version>0.1</version> | ||
<properties> | ||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
<cdk.version>2.44.0</cdk.version> | ||
<junit.version>5.9.1</junit.version> | ||
</properties> | ||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<version>3.10.1</version> | ||
<configuration> | ||
<source>11</source> | ||
<target>11</target> | ||
</configuration> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.codehaus.mojo</groupId> | ||
<artifactId>exec-maven-plugin</artifactId> | ||
<version>3.1.0</version> | ||
<configuration> | ||
<mainClass>com.myorg.InfrastructureApp</mainClass> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
<dependencies> | ||
<!-- AWS Cloud Development Kit --> | ||
<dependency> | ||
<groupId>software.amazon.awscdk</groupId> | ||
<artifactId>aws-cdk-lib</artifactId> | ||
<version>${cdk.version}</version> | ||
</dependency> | ||
|
||
<!-- Respective AWS Construct Libraries --> | ||
<dependency> | ||
<groupId>software.amazon.awscdk</groupId> | ||
<artifactId>apigatewayv2-integrations-alpha</artifactId> | ||
<version>2.44.0-alpha.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.constructs</groupId> | ||
<artifactId>constructs</artifactId> | ||
<version>10.1.120</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>software.amazon.awsconstructs</groupId> | ||
<artifactId>kinesisstreamslambda</artifactId> | ||
<version>2.42.0</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-api</artifactId> | ||
<version>${junit.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-engine</artifactId> | ||
<version>${junit.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.assertj</groupId> | ||
<artifactId>assertj-core</artifactId> | ||
<version>3.23.1</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
</project> |
13 changes: 13 additions & 0 deletions
13
cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureApp.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package com.myorg; | ||
|
||
import software.amazon.awscdk.App; | ||
|
||
public final class InfrastructureApp { | ||
public static void main(final String[] args) { | ||
App app = new App(); | ||
|
||
new InfrastructureStack(app, "KinesisDSLambdaStack"); | ||
|
||
app.synth(); | ||
} | ||
} |
77 changes: 77 additions & 0 deletions
77
cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureStack.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package com.myorg; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
import software.amazon.awscdk.App; | ||
import software.amazon.awscdk.BundlingOptions; | ||
import software.amazon.awscdk.services.lambda.*; | ||
import software.amazon.awscdk.services.lambda.Runtime; | ||
import software.amazon.awscdk.services.lambda.eventsources.KinesisEventSourceProps; | ||
import software.amazon.awsconstructs.services.kinesisstreamslambda.KinesisStreamsToLambda; | ||
import software.amazon.awsconstructs.services.kinesisstreamslambda.KinesisStreamsToLambdaProps; | ||
import software.constructs.Construct; | ||
import software.amazon.awscdk.DockerVolume; | ||
import software.amazon.awscdk.Duration; | ||
import software.amazon.awscdk.Stack; | ||
import software.amazon.awscdk.StackProps; | ||
import software.amazon.awscdk.services.s3.assets.AssetOptions; | ||
|
||
import static java.util.Collections.singletonList; | ||
import static software.amazon.awscdk.BundlingOutput.ARCHIVED; | ||
|
||
public class InfrastructureStack extends Stack { | ||
public InfrastructureStack(final App parent, final String id) { | ||
this(parent, id, null); | ||
} | ||
|
||
public InfrastructureStack(final Construct parent, final String id, final StackProps props) { | ||
super(parent, id, props); | ||
|
||
List<String> kinesisLambdaClientPackagingInstructions = Arrays.asList( | ||
"/bin/sh", | ||
"-c", | ||
"cd KinesisLambdaClient " + | ||
"&& mvn clean install " + | ||
"&& cp /asset-input/KinesisLambdaClient/target/KinesisLambdaClient.jar /asset-output/" | ||
); | ||
|
||
BundlingOptions.Builder builderOptions = BundlingOptions.builder() | ||
.command(kinesisLambdaClientPackagingInstructions) | ||
.image(Runtime.JAVA_11.getBundlingImage()) | ||
.volumes(singletonList( | ||
// Mount local .m2 repo to avoid download all the dependencies again inside the container | ||
DockerVolume.builder() | ||
.hostPath(System.getProperty("user.home") + "/.m2/") | ||
.containerPath("/root/.m2/") | ||
.build() | ||
)) | ||
.user("root") | ||
.outputType(ARCHIVED); | ||
|
||
new KinesisStreamsToLambda(this, "KinesisToLambdaPattern", new KinesisStreamsToLambdaProps.Builder() | ||
.kinesisEventSourceProps(new KinesisEventSourceProps.Builder() | ||
.startingPosition(StartingPosition.TRIM_HORIZON) | ||
.batchSize(3) | ||
.maxBatchingWindow(Duration.seconds(20)) | ||
.maxRecordAge(Duration.seconds(3600)) | ||
.bisectBatchOnError(true) | ||
.retryAttempts(1) | ||
.reportBatchItemFailures(true) | ||
.build()) | ||
.lambdaFunctionProps(new FunctionProps.Builder() | ||
.runtime(Runtime.JAVA_11) | ||
.code(Code.fromAsset("../software/", AssetOptions.builder() | ||
.bundling(builderOptions | ||
.command(kinesisLambdaClientPackagingInstructions) | ||
.build()) | ||
.build())) | ||
.handler("com.myorg.kinesis.client.App") | ||
.memorySize(1024) | ||
.functionName("KinesisLambdaClient") | ||
.reservedConcurrentExecutions(1) | ||
.timeout(Duration.seconds(10)) | ||
.build()) | ||
.build()); | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
cdk-kinesis-lambda-java/infrastructure/src/test/java/com/myorg/InfrastructureStackTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package com.myorg; | ||
|
||
import software.amazon.awscdk.App; | ||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.databind.SerializationFeature; | ||
import java.io.IOException; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public class InfrastructureStackTest { | ||
private final static ObjectMapper JSON = | ||
new ObjectMapper().configure(SerializationFeature.INDENT_OUTPUT, true); | ||
|
||
@Test | ||
public void testStack() throws IOException { | ||
App app = new App(); | ||
InfrastructureStack stack = new InfrastructureStack(app, "test"); | ||
|
||
JsonNode actual = JSON.valueToTree(app.synth().getStackArtifact(stack.getArtifactId()).getTemplate()); | ||
|
||
assertThat(actual.toString()) | ||
.contains("AWS::ApiGatewayV2::Api") | ||
.contains("AWS::Lambda::Function"); | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
cdk-kinesis-lambda-java/software/KinesisCliProducers/producers.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
a=0 | ||
# -lt is less than operator | ||
|
||
#Iterate the loop until a less than 10 | ||
while [ $a -lt 24 ] | ||
do | ||
# Print the values | ||
aws kinesis put-record --stream-name LambdaPackagingStack-KinesisToLambdaPatternKinesisStreamFA60BE3F-ODRbhifDN9wS \ | ||
--data '{"user_id":"user1", "score": 100}' \ | ||
--partition-key $a | ||
echo $a | ||
|
||
# increment the value | ||
a=`expr $a + 1` | ||
done |
Oops, something went wrong.