diff --git a/cdk-kinesis-lambda-java/.gitignore b/cdk-kinesis-lambda-java/.gitignore
new file mode 100644
index 000000000..1db21f162
--- /dev/null
+++ b/cdk-kinesis-lambda-java/.gitignore
@@ -0,0 +1,13 @@
+.classpath.txt
+target
+.classpath
+.project
+.idea
+.settings
+.vscode
+*.iml
+
+# CDK asset staging directory
+.cdk.staging
+cdk.out
+
diff --git a/cdk-kinesis-lambda-java/README.md b/cdk-kinesis-lambda-java/README.md
new file mode 100644
index 000000000..9ebe17103
--- /dev/null
+++ b/cdk-kinesis-lambda-java/README.md
@@ -0,0 +1,80 @@
+# Kinesis Data Streams with Lambda Integration
+
+![architecture diagram](architecture.png)
+
+## Requirements
+
+* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
+* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
+* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
+* [AWS CDK Toolkit](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured
+* [Java 11+](https://docs.aws.amazon.com/corretto/latest/corretto-11-ug/downloads-list.html) installed
+* [Docker](https://docs.docker.com/get-docker/) Installed
+
+## Deployment Instructions
+
+1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
+
+ ```
+ git clone https://github.com/aws-samples/serverless-patterns
+ ```
+2. Change directory to the pattern directory:
+
+ ```
+ cd serverless-patterns/cdk-kinesis-lambda-java
+ ```
+3. From the command line, use AWS CDK to deploy the AWS resources for the serverless application
+
+ ```bash
+ cd infrastructure
+ ```
+4. From the command line, Synthesize the cdk stack to emits the synthesized CloudFormation template. Set up will make sure to build and package
+ the lambda functions residing in software directory.
+
+ ```bash
+ cdk synth
+ ```
+5. From the command line, use AWS CDK to deploy the AWS resources.
+
+ ```bash
+ cdk deploy
+ ```
+ Alternatively infrastructure/deploy.sh can be used to build and deploy the stack
+
+6. Note the outputs of CDK and copy the Kinesis Resource Name. Use the copied stream name in the producer.sh file which will be used in the next step to put records in to the stream created.
+
+## How it works
+
+This Kinesis-Lambda integration pattern makes use of the aws-kinesisstreams-lambda [Solution construct](https://docs.aws.amazon.com/solutions/latest/constructs/aws-kinesisstreams-lambda.html) to create the infrastructure.
+
+Lambda get triggered based on the events from the Kinesis Data Stream. For any error in invocation of the lambda function events are persisted in the configured dead-letter SQS queue.
+
+In the example the Kinesis Event Source is configured with `maxretryattempt` as 1, bisectBatchOnError set to true, and `reportBatchItemFailures` set to true with batch size of 3.
+
+Lambda code has been updated to handle exception on any error due to event processing as per the best practice to return the sequence number. Using this configuration and approach duplicate message reprocessing can be avoided.
+
+For more details on handling Success and Failure conditions in Kinesis data streams consumption, refer the [documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-errors).
+## Testing
+Update the producer.sh file with Kinesis stream name which got created. Update the number of messages to get published in to the stream by updating the number in loop as shown in the below statement
+
+while [ $a -lt 24 ]
+
+From the command line
+
+ ```bash
+ cd ../software
+ cd KinesisCliProducers
+ sh producers.sh
+ ```
+This will publish the messages in the Kinesis stream and the lambda function gets triggered based on that.
+
+## Cleanup
+
+1. Delete the stack
+ ```bash
+ cdk destroy
+ ```
+----
+Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+SPDX-License-Identifier: MIT-0
\ No newline at end of file
diff --git a/cdk-kinesis-lambda-java/architecture.png b/cdk-kinesis-lambda-java/architecture.png
new file mode 100644
index 000000000..0b9724cd7
Binary files /dev/null and b/cdk-kinesis-lambda-java/architecture.png differ
diff --git a/cdk-kinesis-lambda-java/example-pattern.json b/cdk-kinesis-lambda-java/example-pattern.json
new file mode 100644
index 000000000..dbc99507e
--- /dev/null
+++ b/cdk-kinesis-lambda-java/example-pattern.json
@@ -0,0 +1,54 @@
+{
+ "title": "Kinesis to Lambda with error handling",
+ "description": "Create a Java Lambda function with Event Source as Kinesis Data source",
+ "language": "Java",
+ "level": "200",
+ "framework": "CDK",
+ "introBox": {
+ "headline": "How it works",
+ "text": [
+ "This sample project demonstrates how to use AWS Lambda (Java runtime) to subscribe to events from a Kinesis Data Stream with error handling to avoid redundant message processing",
+ "This example uses AWS CDK constructs"
+ ]
+ },
+ "gitHub": {
+ "template": {
+ "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/cdk-kinesis-lambda-java",
+ "templateURL": "serverless-patterns/cdk-kinesis-lambda-java",
+ "projectFolder": "cdk-kinesis-lambda-java",
+ "templateFile": "infrastructure/src/main/java/com/myorg/InfrastructureStack.java"
+ }
+ },
+ "resources": {
+ "bullets": [
+ {
+ "text": "AWS CDK Construct for Kinesis and Lambda",
+ "link": "https://docs.aws.amazon.com/solutions/latest/constructs/aws-kinesisstreams-lambda.html"
+ }
+ ]
+ },
+ "deploy": {
+ "text": [
+ "cdk deploy"
+ ]
+ },
+ "testing": {
+ "text": [
+ "See the GitHub repo for detailed testing instructions."
+ ]
+ },
+ "cleanup": {
+ "text": [
+ "Delete the stack: cdk destroy
."
+ ]
+ },
+ "authors": [
+ {
+ "name": "Shiva Mahalingam",
+ "image": "",
+ "bio": "Solutions Architect @ AWS",
+ "linkedin": "shivamahalingam",
+ "twitter": ""
+ }
+ ]
+}
diff --git a/cdk-kinesis-lambda-java/infrastructure/.gitignore b/cdk-kinesis-lambda-java/infrastructure/.gitignore
new file mode 100644
index 000000000..1db21f162
--- /dev/null
+++ b/cdk-kinesis-lambda-java/infrastructure/.gitignore
@@ -0,0 +1,13 @@
+.classpath.txt
+target
+.classpath
+.project
+.idea
+.settings
+.vscode
+*.iml
+
+# CDK asset staging directory
+.cdk.staging
+cdk.out
+
diff --git a/cdk-kinesis-lambda-java/infrastructure/cdk.json b/cdk-kinesis-lambda-java/infrastructure/cdk.json
new file mode 100644
index 000000000..5bf1bc04f
--- /dev/null
+++ b/cdk-kinesis-lambda-java/infrastructure/cdk.json
@@ -0,0 +1,5 @@
+{
+ "app": "mvn -e -q compile exec:java",
+ "context": {
+ }
+}
diff --git a/cdk-kinesis-lambda-java/infrastructure/deploy.sh b/cdk-kinesis-lambda-java/infrastructure/deploy.sh
new file mode 100755
index 000000000..202548591
--- /dev/null
+++ b/cdk-kinesis-lambda-java/infrastructure/deploy.sh
@@ -0,0 +1,6 @@
+cd ../software/KinesisLambdaClient
+mvn clean package
+cd ../../infrastructure
+mvn clean compile
+cdk synth
+cdk deploy
\ No newline at end of file
diff --git a/cdk-kinesis-lambda-java/infrastructure/pom.xml b/cdk-kinesis-lambda-java/infrastructure/pom.xml
new file mode 100644
index 000000000..03cd45ad3
--- /dev/null
+++ b/cdk-kinesis-lambda-java/infrastructure/pom.xml
@@ -0,0 +1,77 @@
+
+
+ 4.0.0
+ com.myorg
+ infrastructure
+ 0.1
+
+ UTF-8
+ 2.44.0
+ 5.9.1
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.10.1
+
+
+ 11
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.1.0
+
+ com.myorg.InfrastructureApp
+
+
+
+
+
+
+
+ software.amazon.awscdk
+ aws-cdk-lib
+ ${cdk.version}
+
+
+
+
+ software.amazon.awscdk
+ apigatewayv2-integrations-alpha
+ 2.44.0-alpha.0
+
+
+ software.constructs
+ constructs
+ 10.1.120
+
+
+ software.amazon.awsconstructs
+ kinesisstreamslambda
+ 2.42.0
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit.version}
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.23.1
+ test
+
+
+
diff --git a/cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureApp.java b/cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureApp.java
new file mode 100644
index 000000000..1c9d6e9fa
--- /dev/null
+++ b/cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureApp.java
@@ -0,0 +1,13 @@
+package com.myorg;
+
+import software.amazon.awscdk.App;
+
+public final class InfrastructureApp {
+ public static void main(final String[] args) {
+ App app = new App();
+
+ new InfrastructureStack(app, "KinesisDSLambdaStack");
+
+ app.synth();
+ }
+}
diff --git a/cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureStack.java b/cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureStack.java
new file mode 100644
index 000000000..17fa3011a
--- /dev/null
+++ b/cdk-kinesis-lambda-java/infrastructure/src/main/java/com/myorg/InfrastructureStack.java
@@ -0,0 +1,77 @@
+package com.myorg;
+
+import java.util.Arrays;
+import java.util.List;
+
+import software.amazon.awscdk.App;
+import software.amazon.awscdk.BundlingOptions;
+import software.amazon.awscdk.services.lambda.*;
+import software.amazon.awscdk.services.lambda.Runtime;
+import software.amazon.awscdk.services.lambda.eventsources.KinesisEventSourceProps;
+import software.amazon.awsconstructs.services.kinesisstreamslambda.KinesisStreamsToLambda;
+import software.amazon.awsconstructs.services.kinesisstreamslambda.KinesisStreamsToLambdaProps;
+import software.constructs.Construct;
+import software.amazon.awscdk.DockerVolume;
+import software.amazon.awscdk.Duration;
+import software.amazon.awscdk.Stack;
+import software.amazon.awscdk.StackProps;
+import software.amazon.awscdk.services.s3.assets.AssetOptions;
+
+import static java.util.Collections.singletonList;
+import static software.amazon.awscdk.BundlingOutput.ARCHIVED;
+
+public class InfrastructureStack extends Stack {
+ public InfrastructureStack(final App parent, final String id) {
+ this(parent, id, null);
+ }
+
+ public InfrastructureStack(final Construct parent, final String id, final StackProps props) {
+ super(parent, id, props);
+
+ List kinesisLambdaClientPackagingInstructions = Arrays.asList(
+ "/bin/sh",
+ "-c",
+ "cd KinesisLambdaClient " +
+ "&& mvn clean install " +
+ "&& cp /asset-input/KinesisLambdaClient/target/KinesisLambdaClient.jar /asset-output/"
+ );
+
+ BundlingOptions.Builder builderOptions = BundlingOptions.builder()
+ .command(kinesisLambdaClientPackagingInstructions)
+ .image(Runtime.JAVA_11.getBundlingImage())
+ .volumes(singletonList(
+ // Mount local .m2 repo to avoid download all the dependencies again inside the container
+ DockerVolume.builder()
+ .hostPath(System.getProperty("user.home") + "/.m2/")
+ .containerPath("/root/.m2/")
+ .build()
+ ))
+ .user("root")
+ .outputType(ARCHIVED);
+
+ new KinesisStreamsToLambda(this, "KinesisToLambdaPattern", new KinesisStreamsToLambdaProps.Builder()
+ .kinesisEventSourceProps(new KinesisEventSourceProps.Builder()
+ .startingPosition(StartingPosition.TRIM_HORIZON)
+ .batchSize(3)
+ .maxBatchingWindow(Duration.seconds(20))
+ .maxRecordAge(Duration.seconds(3600))
+ .bisectBatchOnError(true)
+ .retryAttempts(1)
+ .reportBatchItemFailures(true)
+ .build())
+ .lambdaFunctionProps(new FunctionProps.Builder()
+ .runtime(Runtime.JAVA_11)
+ .code(Code.fromAsset("../software/", AssetOptions.builder()
+ .bundling(builderOptions
+ .command(kinesisLambdaClientPackagingInstructions)
+ .build())
+ .build()))
+ .handler("com.myorg.kinesis.client.App")
+ .memorySize(1024)
+ .functionName("KinesisLambdaClient")
+ .reservedConcurrentExecutions(1)
+ .timeout(Duration.seconds(10))
+ .build())
+ .build());
+ }
+}
diff --git a/cdk-kinesis-lambda-java/infrastructure/src/test/java/com/myorg/InfrastructureStackTest.java b/cdk-kinesis-lambda-java/infrastructure/src/test/java/com/myorg/InfrastructureStackTest.java
new file mode 100644
index 000000000..14e7677bc
--- /dev/null
+++ b/cdk-kinesis-lambda-java/infrastructure/src/test/java/com/myorg/InfrastructureStackTest.java
@@ -0,0 +1,27 @@
+package com.myorg;
+
+import software.amazon.awscdk.App;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import java.io.IOException;
+
+import org.junit.jupiter.api.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class InfrastructureStackTest {
+ private final static ObjectMapper JSON =
+ new ObjectMapper().configure(SerializationFeature.INDENT_OUTPUT, true);
+
+ @Test
+ public void testStack() throws IOException {
+ App app = new App();
+ InfrastructureStack stack = new InfrastructureStack(app, "test");
+
+ JsonNode actual = JSON.valueToTree(app.synth().getStackArtifact(stack.getArtifactId()).getTemplate());
+
+ assertThat(actual.toString())
+ .contains("AWS::ApiGatewayV2::Api")
+ .contains("AWS::Lambda::Function");
+ }
+}
diff --git a/cdk-kinesis-lambda-java/software/KinesisCliProducers/producers.sh b/cdk-kinesis-lambda-java/software/KinesisCliProducers/producers.sh
new file mode 100644
index 000000000..c4538345e
--- /dev/null
+++ b/cdk-kinesis-lambda-java/software/KinesisCliProducers/producers.sh
@@ -0,0 +1,15 @@
+a=0
+# -lt is less than operator
+
+#Iterate the loop until a less than 10
+while [ $a -lt 24 ]
+do
+ # Print the values
+ aws kinesis put-record --stream-name LambdaPackagingStack-KinesisToLambdaPatternKinesisStreamFA60BE3F-ODRbhifDN9wS \
+ --data '{"user_id":"user1", "score": 100}' \
+ --partition-key $a
+ echo $a
+
+ # increment the value
+ a=`expr $a + 1`
+done
diff --git a/cdk-kinesis-lambda-java/software/KinesisLambdaClient/pom.xml b/cdk-kinesis-lambda-java/software/KinesisLambdaClient/pom.xml
new file mode 100644
index 000000000..37a1990ec
--- /dev/null
+++ b/cdk-kinesis-lambda-java/software/KinesisLambdaClient/pom.xml
@@ -0,0 +1,105 @@
+
+ 4.0.0
+ cdk-sample
+ KinesisLambdaClient
+ 1.0
+ jar
+ KinesisLambdaClient
+
+ 11
+ 11
+ 2.19.0
+ UTF-8
+
+
+
+
+ software.amazon.lambda
+ powertools-tracing
+ 1.12.3
+
+
+ software.amazon.lambda
+ powertools-metrics
+ 1.12.3
+
+
+ com.amazonaws
+ aws-lambda-java-core
+ 1.2.1
+
+
+ com.amazonaws
+ aws-lambda-java-events
+ 3.11.0
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+
+
+
+
+ org.codehaus.mojo
+ aspectj-maven-plugin
+ 1.14.0
+
+
+ ${maven.compiler.target}
+ ${maven.compiler.target}
+
+
+ software.amazon.lambda
+ powertools-tracing
+
+
+ software.amazon.lambda
+ powertools-metrics
+
+
+
+
+
+
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.4.0
+
+ false
+ KinesisLambdaClient
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
diff --git a/cdk-kinesis-lambda-java/software/KinesisLambdaClient/src/main/java/com/myorg/kinesis/client/App.java b/cdk-kinesis-lambda-java/software/KinesisLambdaClient/src/main/java/com/myorg/kinesis/client/App.java
new file mode 100644
index 000000000..f1ffbf43a
--- /dev/null
+++ b/cdk-kinesis-lambda-java/software/KinesisLambdaClient/src/main/java/com/myorg/kinesis/client/App.java
@@ -0,0 +1,65 @@
+package com.myorg.kinesis.client;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
+import software.amazon.lambda.powertools.metrics.Metrics;
+import software.amazon.lambda.powertools.tracing.Tracing;
+
+import static software.amazon.lambda.powertools.tracing.CaptureMode.DISABLED;
+
+/**
+ * Handler for requests to Lambda function.
+ */
+public class App implements RequestHandler {
+ @Override
+ public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {
+
+ System.out.println("Lambda Trigger Event: "+input);
+ List batchItemFailures = new ArrayList<>();
+ String curRecordSequenceNumber = "";
+ int i = 1;
+ for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
+ try {
+ //Process your record
+ KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
+ curRecordSequenceNumber = kinesisRecord.getSequenceNumber();
+ String partitionKey = kinesisRecord.getPartitionKey();
+ System.out.println("Record Number: "+curRecordSequenceNumber);
+ System.out.println("Partition Key: "+partitionKey + " Retrieval ID: "+ i++);
+// System.out.println("Record Data: "+kinesisRecord.getData().asCharBuffer().toString());
+// if (partitionKey.endsWith("9"))
+// throw new RuntimeException();
+ } catch (Exception e) {
+ /* Since we are working with streams, we can return the failed item immediately.
+ Lambda will immediately begin to retry processing from this failed item onwards. */
+ System.out.println("Running Catch Block");
+ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
+
+ return new StreamsEventResponse(batchItemFailures);
+ }
+ }
+
+ return new StreamsEventResponse(batchItemFailures);
+ }
+ @Tracing(namespace = "getPageContents")
+ private String getPageContents(String address) throws IOException {
+ URL url = new URL(address);
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream()))) {
+ return br.lines().collect(Collectors.joining(System.lineSeparator()));
+ }
+ }
+}
\ No newline at end of file