Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changed sampleItems to order, so that the example is more specific an… #1429

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dynamodb-eventbridge-transformer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The key components of this architecture are DynamoDB as source and EventBridge a

To demonstrate the end-to-end message flow, the Lambda function writes sample data to the DynamoDB table.

The pattern uses an input transformer to change the event's structure from DynamoDB's response format, which includes data type descriptors, to a simpler JSON structure. The input transformer also allows us to transform the list using the following notation: `<$.dynamodb.NewImage.list.L[*].S>`
The pattern uses an input transformer to change the event's structure from DynamoDB's response format, which includes data type descriptors, to a simpler JSON structure. The input transformer also allows us to transform the list using the following notation: `<$.dynamodb.NewImage.Items.L[*].S>`

Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/claim-check-pattern-cdk

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, PutCommand} = require('@aws-sdk/lib-dynamodb');
const tableName = process.env.TABLE_NAME;

const messageWithEmptyList = {
"id": "111",
"list": []
const testOrderWithoutItem = {
"orderID": "111",
"Items": []
};
const messageWithOneItemList = {
"id": "222",
"list": ["One"]
const testOrderWithOneItem = {
"orderID": "222",
"Items": ["One"]
};
const messageWithMultipleItemList = {
"id": "333",
"list": ["One", "Two", "Three"]
const testOrderWithThreeItems = {
"orderID": "333",
"Items": ["One", "Two", "Three"]
};


async function writeToDb(item) {
// put the message in the table
const params = {
TableName: tableName,
Item: item
Expand All @@ -30,8 +29,7 @@ async function writeToDb(item) {

exports.handler = async function (event, context) {

var items = [messageWithEmptyList, messageWithOneItemList, messageWithMultipleItemList];

var items = [testOrderWithoutItem, testOrderWithOneItem, testOrderWithThreeItems];
console.log("items:", items);

var result = await Promise.all(items.map(writeToDb))
Expand All @@ -40,7 +38,7 @@ exports.handler = async function (event, context) {
const resonse = {
eventType: "Some_Event_Type",
ids: items
.map(element => element.id)
.map(element => element.orderID)
}
console.log('response:', resonse);
return resonse;
Expand Down
48 changes: 23 additions & 25 deletions dynamodb-eventbridge-transformer/src/lib/pipes-from-dynamo-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,40 @@ export class PipesFromDynamoStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);

//dynamodb table "dataOrigin" with DynamoDB stream enabled
const dataOriginTable = new dynamodb.Table(this, 'dataOriginTable', {
// the DynamoDB table that orders are written to
const shoppingOrderTable = new dynamodb.Table(this, 'shoppingOrderTable', {
partitionKey: {
name: 'id',
name: 'orderID',
type: dynamodb.AttributeType.STRING
},
removalPolicy: cdk.RemovalPolicy.DESTROY,
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// lambda function to write to dynamodb
const writeDemoData = new lambda.Function(this, 'writeDemoData', {
functionName: 'writeDemoData',
// the Lambda function that creates three sample orders for testing
const sampleOrderCreationFunction = new lambda.Function(this, 'sampleOrderCreationFunction', {
functionName: 'sampleOrderCreationFunction',
runtime: lambda.Runtime.NODEJS_18_X,
code: lambda.Code.fromAsset('lib/lambda'),
handler: 'writeDemoData.handler',
handler: 'sampleOrderCreationFunction.handler',
environment: {
TABLE_NAME: dataOriginTable.tableName,
TABLE_NAME: shoppingOrderTable.tableName,
}
});

// target event bus
const targetEventBus = new events.EventBus(this, 'targetEventBus', {
eventBusName: 'targetEventBus'
const eventBus = new events.EventBus(this, 'pipesFromDynamoStackEventBus', {
eventBusName: 'pipesFromDynamoStackEventBus'
});

// create an Amazon EventBridge rule to send all events to Amazon CloudWatch Logs:
const targetLogRule = new events.Rule(this, 'targetLogRule', {
ruleName: 'targetLogRule',
eventBus: targetEventBus,
// All events on the eventBus are written to Amazon CloudWatch Logs for visualization
const catchAllLogRule = new events.Rule(this, 'catchAllLogRule', {
ruleName: 'catchAllLogRule',
eventBus: eventBus,
eventPattern: {
source: events.Match.prefix(''),
},
targets: [new targets.CloudWatchLogGroup(new logs.LogGroup(this, 'TargetLogGroup', {
logGroupName: '/aws/events/targetLog',
targets: [new targets.CloudWatchLogGroup(new logs.LogGroup(this, 'PipesFromDynamoStackLogGroup', {
logGroupName: '/aws/events/pipesFromDynamoStackLogGroup/catchallLogGroup',
removalPolicy: cdk.RemovalPolicy.DESTROY,
}))],
});
Expand All @@ -55,15 +54,13 @@ export class PipesFromDynamoStack extends cdk.Stack {
assumedBy: new iam.ServicePrincipal('pipes.amazonaws.com'),
});

dataOriginTable.grantReadWriteData(writeDemoData);
dataOriginTable.grantStreamRead(pipeRole);
targetEventBus.grantPutEventsTo(pipeRole);
shoppingOrderTable.grantReadWriteData(sampleOrderCreationFunction);
shoppingOrderTable.grantStreamRead(pipeRole);
eventBus.grantPutEventsTo(pipeRole);

// create an EventBridge Pipe that streams data from dataOriginTable to targetEventBus
const pipe = new pipes.CfnPipe(this, 'pipe', {
const newAndModifiedOrdersPipe = new pipes.CfnPipe(this, 'pipe', {
roleArn: pipeRole.roleArn,
source: dataOriginTable.tableStreamArn!,
target: targetEventBus.eventBusArn,
source: shoppingOrderTable.tableStreamArn!,
sourceParameters: {
dynamoDbStreamParameters: {
startingPosition: 'LATEST',
Expand All @@ -75,8 +72,9 @@ export class PipesFromDynamoStack extends cdk.Stack {
}],
},
},
target: eventBus.eventBusArn,
targetParameters: {
inputTemplate: '{"id": <$.dynamodb.NewImage.id.S>, "list": <$.dynamodb.NewImage.list.L[*].S>}',
inputTemplate: '{"orderID": <$.dynamodb.NewImage.orderID.S>, "Items": <$.dynamodb.NewImage.Items.L[*].S>}',
}
});
}
Expand Down