From 2367ffe94ca0eac3514445892a07f9f9f2f051fe Mon Sep 17 00:00:00 2001 From: Kethan sai Date: Fri, 12 Apr 2024 17:15:44 -0400 Subject: [PATCH] add event source on Lambda examples Gen 2 (#7184) * add event source example --- cspell.json | 2 + src/directory/directory.mjs | 6 + .../examples/dynamo-db-stream/index.mdx | 98 +++++++++++++ .../examples/kinesis-stream/index.mdx | 132 ++++++++++++++++++ 4 files changed, 238 insertions(+) create mode 100644 src/pages/[platform]/build-a-backend/functions/examples/dynamo-db-stream/index.mdx create mode 100644 src/pages/[platform]/build-a-backend/functions/examples/kinesis-stream/index.mdx diff --git a/cspell.json b/cspell.json index 49d85ad95f4..46e01714a16 100644 --- a/cspell.json +++ b/cspell.json @@ -1019,6 +1019,7 @@ "posts.graphql", "PostsTable", "posttitle", + "powertools", "pre-annotated", "pre-built", "pre-created", @@ -1047,6 +1048,7 @@ "pubsub", "Pubsub", "PubSub", + "Powertools", "PUSHINFOPROVIDER", "PushListenerService", "pushnotification", diff --git a/src/directory/directory.mjs b/src/directory/directory.mjs index 291b476fbb2..8a524bbf593 100644 --- a/src/directory/directory.mjs +++ b/src/directory/directory.mjs @@ -255,6 +255,12 @@ export const directory = { { path: 'src/pages/[platform]/build-a-backend/functions/examples/google-recaptcha-challenge/index.mdx' }, + { + path: 'src/pages/[platform]/build-a-backend/functions/examples/kinesis-stream/index.mdx' + }, + { + path: 'src/pages/[platform]/build-a-backend/functions/examples/dynamo-db-stream/index.mdx' + }, { path: 'src/pages/[platform]/build-a-backend/functions/examples/bedrock-response/index.mdx' } diff --git a/src/pages/[platform]/build-a-backend/functions/examples/dynamo-db-stream/index.mdx b/src/pages/[platform]/build-a-backend/functions/examples/dynamo-db-stream/index.mdx new file mode 100644 index 00000000000..78ac096eac0 --- /dev/null +++ b/src/pages/[platform]/build-a-backend/functions/examples/dynamo-db-stream/index.mdx @@ -0,0 +1,98 @@ +import { getCustomStaticPath } from '@/utils/getCustomStaticPath'; + +export const meta = { + title: 'DynamoDB stream', + description: + 'Create a Lambda event source using Amazon DynamoDB Streams to trigger a Lambda function in response to real-time events.', + platforms: [ + 'android', + 'angular', + 'flutter', + 'javascript', + 'nextjs', + 'react', + 'react-native', + 'swift', + 'vue' + ] +}; + +export function getStaticPaths() { + return getCustomStaticPath(meta.platforms); +} + +export function getStaticProps() { + return { + props: { + meta + } + }; +} + +With AWS Lambda, you can seamlessly integrate various event sources, such as Amazon DynamoDB, Amazon SQS, and others, to trigger Lambda functions in response to real-time events. This feature enables you to build responsive, event-driven applications that react to changes in data or system state without the need for polling services. + +In this guide, lets configure a Lambda function with an Amazon DynamoDB stream as an event source. The Lambda function is automatically triggered whenever an item is added, updated, or deleted from the table, enabling you to build real-time applications that react to changes in your data. In this example, we will use a `Todo` table created by a data model on the GraphQL API. + +To get started, install the AWS Lambda Powertools Logger package, which provides a structured logging capabilities for your Lambda function. + +```bash title="Terminal" showLineNumbers={false} +npm add @aws-lambda-powertools/logger +``` + +Second, create a new directory and a resource file, `amplify/functions/dynamoDB-function/resource.ts`. Then, define the Function with `defineFunction`: + +```ts title="amplify/functions/dynamoDB-function/resource.ts" +import { defineFunction } from "@aws-amplify/backend"; + +export const myDynamoDBFunction = defineFunction({ + name: "dynamoDB-function", +}); +``` + +Third, create the corresponding handler file, `amplify/functions/dynamoDB-function/handler.ts`, file with the following contents: + +```ts title="amplify/functions/dynamoDB-function/resource.ts" +import type { DynamoDBStreamHandler } from "aws-lambda"; +import { Logger } from "@aws-lambda-powertools/logger"; + +const logger = new Logger({ + logLevel: "INFO", + serviceName: "dynamodb-stream-handler", +}); + +export const handler: DynamoDBStreamHandler = async (event) => { + for (const record of event.Records) { + logger.info(`Processing record: ${record.eventID}`); + logger.info(`Event Type: ${record.eventName}`); + + if (record.eventName === "INSERT") { + // business logic to process new records + logger.info(`New Image: ${JSON.stringify(record.dynamodb?.NewImage)}`); + } + } + logger.info(`Successfully processed ${event.Records.length} records.`); +}; +``` + +Lastly, create DynamoDB table as event source in the `amplify/backend.ts` file: + +```ts title="amplify/backend.ts" +import { defineBackend } from "@aws-amplify/backend"; +import { StartingPosition } from "aws-cdk-lib/aws-lambda"; +import { DynamoEventSource } from "aws-cdk-lib/aws-lambda-event-sources"; +import { auth } from "./auth/resource"; +import { data } from "./data/resource"; +import { myDynamoDBFunction } from "./functions/kinesis-function/resource"; + +const backend = defineBackend({ + auth, + data, + myDynamoDBFunction, +}); + +const eventSource = new DynamoEventSource(backend.data.resources.tables["Todo"], { + startingPosition: StartingPosition.LATEST, +}); + +backend.myDynamoDBFunction.resources.lambda.addEventSource(eventSource); +``` diff --git a/src/pages/[platform]/build-a-backend/functions/examples/kinesis-stream/index.mdx b/src/pages/[platform]/build-a-backend/functions/examples/kinesis-stream/index.mdx new file mode 100644 index 00000000000..b863c486263 --- /dev/null +++ b/src/pages/[platform]/build-a-backend/functions/examples/kinesis-stream/index.mdx @@ -0,0 +1,132 @@ +import { getCustomStaticPath } from '@/utils/getCustomStaticPath'; + +export const meta = { + title: 'Kinesis stream', + description: + 'Create a Lambda event source for a Amazon Kinesis stream to trigger Lambda functions in response to real-time events', + platforms: [ + 'android', + 'angular', + 'flutter', + 'javascript', + 'nextjs', + 'react', + 'react-native', + 'swift', + 'vue' + ] +}; + +export function getStaticPaths() { + return getCustomStaticPath(meta.platforms); +} + +export function getStaticProps() { + return { + props: { + meta + } + }; +} + +With AWS Lambda, you can seamlessly integrate various event sources, such as Amazon Kinesis, Amazon SQS, and others, to trigger Lambda functions in response to real-time events. This feature enables you to build responsive, event-driven applications that react to changes in data or system state without the need for polling services. + +In this guide, lets configure a Lambda function with an Amazon Kinesis stream as an event source. The Lambda function is automatically triggered whenever new data is published to the stream, whether you're processing streaming data, reacting to application events, or automating workflows. + +To get started, install the AWS Lambda Powertools Logger package, which provides a structured logging capabilities for your Lambda function. + +```bash title="Terminal" showLineNumbers={false} +npm add @aws-lambda-powertools/logger +``` + +Second, create a new directory and a resource file, `amplify/functions/kinesis-function/resource.ts`. Then, define the Function with `defineFunction`: + +```ts title="amplify/functions/kinesis-function/resource.ts" +import { defineFunction } from "@aws-amplify/backend"; + +export const myKinesisFunction = defineFunction({ + name: "kinesis-function", +}); +``` + +Third, create the corresponding handler file, `amplify/functions/kinesis-function/handler.ts`, file with the following contents: + +```ts title="amplify/functions/kinesis-function/handler.ts" +import type { + KinesisStreamBatchResponse, + KinesisStreamHandler, + KinesisStreamRecordPayload, +} from "aws-lambda"; +import { Buffer } from "node:buffer"; +import { Logger } from "@aws-lambda-powertools/logger"; + +const logger = new Logger({ + logLevel: "INFO", + serviceName: "kinesis-stream-handler", +}); + +export const handler: KinesisStreamHandler = async ( + event, + context +): Promise => { + for (const record of event.Records) { + try { + logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); + const recordData = await getRecordDataAsync(record.kinesis); + logger.info(`Record Data: ${recordData}`); + } catch (err) { + logger.error(`An error occurred ${err}`); + /** + * When processing stream data, if any item fails, returning the failed item's position immediately + * prompts Lambda to retry from this item forward, ensuring continuous processing without skipping data. + */ + return { + batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], + }; + } + } + logger.info(`Successfully processed ${event.Records.length} records.`); + return { batchItemFailures: [] }; +}; + +async function getRecordDataAsync( + payload: KinesisStreamRecordPayload +): Promise { + const data = Buffer.from(payload.data, "base64").toString("utf-8"); + await Promise.resolve(1); // Placeholder for an async process + return data; +} +``` + +Lastly, create the Kinesis stream and add it as a event source in the `amplify/backend.ts` file: + +```ts title="amplify/backend.ts" +import { defineBackend } from "@aws-amplify/backend"; +import { KinesisEventSource } from "aws-cdk-lib/aws-lambda-event-sources"; +import { StartingPosition } from "aws-cdk-lib/aws-lambda"; +import { auth } from "./auth/resource"; +import { data } from "./data/resource"; +import { myKinesisFunction } from "./functions/kinesis-function/resource"; + +const backend = defineBackend({ + auth, + data, + myKinesisFunction, +}); + +const kinesisStack = backend.createStack("kinesis-stack"); + +const kinesisStream = new Stream(kinesisStack, "KinesisStream", { + streamName: "myKinesisStream", + shardCount: 1, +}); + +const eventSource = new KinesisEventSource(kinesisStream, { + startingPosition: StartingPosition.LATEST, + reportBatchItemFailures: true, +}); + +backend.myKinesisFunction.resources.lambda.addEventSource(eventSource); +``` + +For an example of streaming analytics data to the Amazon Kinesis stream from your frontend, see the [Streaming analytics data documentation](/[platform]/build-a-backend/add-aws-services/analytics/streaming-data/)