diff --git a/dynamodb-eventbridge-transformer/README.md b/dynamodb-eventbridge-transformer/README.md index 0c9328ba7..2ce9d3e36 100644 --- a/dynamodb-eventbridge-transformer/README.md +++ b/dynamodb-eventbridge-transformer/README.md @@ -6,7 +6,7 @@ The key components of this architecture are DynamoDB as source and EventBridge a To demonstrate the end-to-end message flow, the Lambda function writes sample data to the DynamoDB table. -The pattern uses an input transformer to change the event's structure from DynamoDB's response format, which includes data type descriptors, to a simpler JSON structure. The input transformer also allows us to transform the list using the following notation: `<$.dynamodb.NewImage.list.L[*].S>` +The pattern uses an input transformer to change the event's structure from DynamoDB's response format, which includes data type descriptors, to a simpler JSON structure. The input transformer also allows us to transform the list using the following notation: `<$.dynamodb.NewImage.Items.L[*].S>` Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/claim-check-pattern-cdk diff --git a/dynamodb-eventbridge-transformer/src/lib/lambda/writeDemoData.js b/dynamodb-eventbridge-transformer/src/lib/lambda/sampleOrderCreationFunction.js similarity index 67% rename from dynamodb-eventbridge-transformer/src/lib/lambda/writeDemoData.js rename to dynamodb-eventbridge-transformer/src/lib/lambda/sampleOrderCreationFunction.js index ff3b26049..407aa4ede 100644 --- a/dynamodb-eventbridge-transformer/src/lib/lambda/writeDemoData.js +++ b/dynamodb-eventbridge-transformer/src/lib/lambda/sampleOrderCreationFunction.js @@ -2,22 +2,21 @@ const { DynamoDBClient } = require('@aws-sdk/client-dynamodb'); const { DynamoDBDocumentClient, PutCommand} = require('@aws-sdk/lib-dynamodb'); const tableName = process.env.TABLE_NAME; -const messageWithEmptyList = { - "id": "111", - "list": [] +const testOrderWithoutItem = { + "orderID": "111", + "Items": [] }; -const messageWithOneItemList = { - "id": "222", - "list": ["One"] +const testOrderWithOneItem = { + "orderID": "222", + "Items": ["One"] }; -const messageWithMultipleItemList = { - "id": "333", - "list": ["One", "Two", "Three"] +const testOrderWithThreeItems = { + "orderID": "333", + "Items": ["One", "Two", "Three"] }; async function writeToDb(item) { - // put the message in the table const params = { TableName: tableName, Item: item @@ -30,8 +29,7 @@ async function writeToDb(item) { exports.handler = async function (event, context) { - var items = [messageWithEmptyList, messageWithOneItemList, messageWithMultipleItemList]; - + var items = [testOrderWithoutItem, testOrderWithOneItem, testOrderWithThreeItems]; console.log("items:", items); var result = await Promise.all(items.map(writeToDb)) @@ -40,7 +38,7 @@ exports.handler = async function (event, context) { const resonse = { eventType: "Some_Event_Type", ids: items - .map(element => element.id) + .map(element => element.orderID) } console.log('response:', resonse); return resonse; diff --git a/dynamodb-eventbridge-transformer/src/lib/pipes-from-dynamo-stack.ts b/dynamodb-eventbridge-transformer/src/lib/pipes-from-dynamo-stack.ts index 8ee18a472..ba878804d 100644 --- a/dynamodb-eventbridge-transformer/src/lib/pipes-from-dynamo-stack.ts +++ b/dynamodb-eventbridge-transformer/src/lib/pipes-from-dynamo-stack.ts @@ -12,41 +12,40 @@ export class PipesFromDynamoStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); - //dynamodb table "dataOrigin" with DynamoDB stream enabled - const dataOriginTable = new dynamodb.Table(this, 'dataOriginTable', { + // the DynamoDB table that orders are written to + const shoppingOrderTable = new dynamodb.Table(this, 'shoppingOrderTable', { partitionKey: { - name: 'id', + name: 'orderID', type: dynamodb.AttributeType.STRING }, removalPolicy: cdk.RemovalPolicy.DESTROY, stream: dynamodb.StreamViewType.NEW_IMAGE }); - // lambda function to write to dynamodb - const writeDemoData = new lambda.Function(this, 'writeDemoData', { - functionName: 'writeDemoData', + // the Lambda function that creates three sample orders for testing + const sampleOrderCreationFunction = new lambda.Function(this, 'sampleOrderCreationFunction', { + functionName: 'sampleOrderCreationFunction', runtime: lambda.Runtime.NODEJS_18_X, code: lambda.Code.fromAsset('lib/lambda'), - handler: 'writeDemoData.handler', + handler: 'sampleOrderCreationFunction.handler', environment: { - TABLE_NAME: dataOriginTable.tableName, + TABLE_NAME: shoppingOrderTable.tableName, } }); - // target event bus - const targetEventBus = new events.EventBus(this, 'targetEventBus', { - eventBusName: 'targetEventBus' + const eventBus = new events.EventBus(this, 'pipesFromDynamoStackEventBus', { + eventBusName: 'pipesFromDynamoStackEventBus' }); - // create an Amazon EventBridge rule to send all events to Amazon CloudWatch Logs: - const targetLogRule = new events.Rule(this, 'targetLogRule', { - ruleName: 'targetLogRule', - eventBus: targetEventBus, + // All events on the eventBus are written to Amazon CloudWatch Logs for visualization + const catchAllLogRule = new events.Rule(this, 'catchAllLogRule', { + ruleName: 'catchAllLogRule', + eventBus: eventBus, eventPattern: { source: events.Match.prefix(''), }, - targets: [new targets.CloudWatchLogGroup(new logs.LogGroup(this, 'TargetLogGroup', { - logGroupName: '/aws/events/targetLog', + targets: [new targets.CloudWatchLogGroup(new logs.LogGroup(this, 'PipesFromDynamoStackLogGroup', { + logGroupName: '/aws/events/pipesFromDynamoStackLogGroup/catchallLogGroup', removalPolicy: cdk.RemovalPolicy.DESTROY, }))], }); @@ -55,15 +54,13 @@ export class PipesFromDynamoStack extends cdk.Stack { assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com'), }); - dataOriginTable.grantReadWriteData(writeDemoData); - dataOriginTable.grantStreamRead(pipeRole); - targetEventBus.grantPutEventsTo(pipeRole); + shoppingOrderTable.grantReadWriteData(sampleOrderCreationFunction); + shoppingOrderTable.grantStreamRead(pipeRole); + eventBus.grantPutEventsTo(pipeRole); - // create an EventBridge Pipe that streams data from dataOriginTable to targetEventBus - const pipe = new pipes.CfnPipe(this, 'pipe', { + const newAndModifiedOrdersPipe = new pipes.CfnPipe(this, 'pipe', { roleArn: pipeRole.roleArn, - source: dataOriginTable.tableStreamArn!, - target: targetEventBus.eventBusArn, + source: shoppingOrderTable.tableStreamArn!, sourceParameters: { dynamoDbStreamParameters: { startingPosition: 'LATEST', @@ -75,8 +72,9 @@ export class PipesFromDynamoStack extends cdk.Stack { }], }, }, + target: eventBus.eventBusArn, targetParameters: { - inputTemplate: '{"id": <$.dynamodb.NewImage.id.S>, "list": <$.dynamodb.NewImage.list.L[*].S>}', + inputTemplate: '{"orderID": <$.dynamodb.NewImage.orderID.S>, "Items": <$.dynamodb.NewImage.Items.L[*].S>}', } }); }