Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Serverless Pattern - Amazon EventBridge pipe - filter and transform using cdk python #2043

Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0580d52
eventbridge pipe new files
manikandanaws Jan 13, 2024
85eb13b
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
216ce51
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
6a77775
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
cdb9c64
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
40943b4
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
6c72dab
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
4fcca68
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
478cfe9
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
ac2b400
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
6bbdcc6
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
682b8df
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
9a3bd87
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
dd50145
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
56023e3
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 16, 2024
170d705
Merge branch 'aws-samples:main' into manikandanks2000-serverless-patt…
manikandanks2000 Jan 17, 2024
20d381f
Delete cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 17, 2024
e1c351f
Delete cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 17, 2024
047e07b
Update example-pattern.json
manikandanks2000 Jan 22, 2024
50d2734
folder name changes
manikandanks2000 Jan 22, 2024
4335f10
folder name changes
manikandanks2000 Jan 23, 2024
27c5e01
pattern.json file changes for the folder name
manikandanks2000 Jan 24, 2024
d601dab
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-py…
manikandanks2000 Jan 27, 2024
762d054
Add final pattern file
bfreiberg Jan 29, 2024
f2e9046
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-python.json
julianwood Feb 1, 2024
9aebe3b
Update cdk-eventbridge-pipes-sqs-to-stepfunctions-python.json
julianwood Feb 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# EventBridge Pipe from SQS to Step Functions with filter and transformation

This pattern will use Amazon EventBridge Pipes to connect an Amazon SQS queue with an AWS Step Functions workflow. The pipe will apply a filter and transformation before sending the message to AWS Step Functions.
This pattern is implemented with CDK and Python.

Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/eventbridge-pipes-sqs-to-stepfunctions-cdk-python

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

- [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
- [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
- [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```bash
git clone https://github.com/aws-samples/serverless-patterns
```
2. Change directory to the pattern directory:
```bash
cd serverless-patterns/eventbridge-pipes-sqs-to-stepfunctions-cdk-python/
```
3. To manually create a virtualenv on MacOS and Linux:
```bash
$ python3 -m venv .venv
```
4. After the init process completes and the virtualenv is created, you can use the following
step to activate your virtualenv.
```bash
$ source .venv/bin/activate
```
5. If you are a Windows platform, you would activate the virtualenv like this:
```bash
% .venv\Scripts\activate.bat
```
6. Once the virtualenv is activated, you can install the required dependencies.
```bash
$ pip install -r requirements.txt
```
7. To deploy the application:
```bash
$ cdk deploy
```

## How it works

The template will create an Amazon SQS queue, AWS Step Functions workflow and Amazon EventBridge Pipe.
Sending messages to the queue will trigger the pipe to initiate AWS Step Function workflow execution.
The messages from SQS will be filtered and transformed before being send to AWS Step Functions.


Replace the "SQS_URL" with your SQS URL in the below command to send message to SQS that will trigger Step Function workflow execution:

```sh
aws sqs send-message \
--queue-url=SQS_URL \
--message-body '{"id":"id1","team": "Team1", "status": "COMPLETE"}'

aws sqs send-message \
--queue-url=SQS_URL \
--message-body '{"id":"id2","team": "Team2", "status": "COMPLETE"}'

aws sqs send-message \
--queue-url=SQS_URL \
--message-body '{"id":"id3","team": "Team3", "status": "NOTSTARTED"}'

Validate the result from the Step Functions execution history page.
The filter allows only messages with a status of `COMPLETE` to be delivered to AWS Step Functions.
AWS Step Functions receive the transformed message attributes as shown below.
"id" tranformed to "playerid"
"team" transformed to "teamname"
"status" transformed to "teamstatus"

```

## Delete stack

```bash
cdk destroy
```

---

Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/usr/bin/env python3
import os

import aws_cdk as cdk

from src.cdkstack import SqsToStepfunctionsFilterTransformStack


app = cdk.App()
SqsToStepfunctionsFilterTransformStack(app, "SqsToStepfunctionsFilterTransformStack")

app.synth()
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"app": "python3 app.py",
"watch": {
"include": [
"**"
],
"exclude": [
"README.md",
"cdk*.json",
"requirements*.txt",
"source.bat",
"**/__init__.py",
"python/__pycache__",
"tests"
]
},
"context": {
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/core:target-partitions": [
"aws",
"aws-cn"
],
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
"@aws-cdk/aws-iam:minimizePolicies": true,
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
"@aws-cdk/core:enablePartitionLiterals": true,
"@aws-cdk/aws-events:eventsTargetQueueSameAccount": true,
"@aws-cdk/aws-iam:standardizedServicePrincipals": true,
"@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true,
"@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true,
"@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true,
"@aws-cdk/customresources:installLatestAwsSdkDefault": false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"title": "EventBridge Pipe from SQS to Step Functions with filter and transformation",
"description": "Amazon EventBridge Pipe that filters and transforms messages from Amazon SQS to AWS Step Functions.",
"language": "Python",
"level": "300",
"framework": "CDK",
"introBox": {
"headline": "How it works",
"text": [
"The example will create an Amazon SQS queue, a AWS Step Functions workflow and an Amazon EventBridge Pipe.",
"Sending messages to the Amazon Simple Queue Service will trigger the pipe to start the AWS Step Function workflow execution.",
"SQS Messages will be filtered and transformed before sending to AWS Step Functions."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-python",
"templateURL": "serverless-patterns/cdk-eventbridge-pipes-sqs-to-stepfunctions-filter-transform-python",
"projectFolder": "src",
manikandanks2000 marked this conversation as resolved.
Show resolved Hide resolved
"templateFile": "src/cdkstack.py"
}
},
"resources": {
"bullets": [
{
"text": "Eventbridge Pipes filter and transformation",
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html"
}
]
},
"deploy": {
"text": [
"cdk deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"<code>cdk delete</code>"
]
},
"authors": [
{
"name": "Manikandan Karimanal",
"image": "https://drive.google.com/file/d/16r4jGGMpPhu4Igez-lzstIe-L5nJVMxf/view",
"bio": "Solution Architect @AWS",
"linkedin": "manikandanks"
}
]
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
aws-cdk-lib==2.60.0
constructs>=10.0.0,<11.0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from aws_cdk import (
CfnOutput,
Stack,
aws_iam as iam,
aws_pipes as pipes,
aws_sqs as sqs,
aws_stepfunctions as sfn,
)
from constructs import Construct
import json

class SqsToStepfunctionsFilterTransformStack(Stack):

def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)

sourcepipe = sqs.Queue(self, 'sqsqueue-sourcepipe')


targetpipe = sfn.StateMachine(self, "sf-targetqueue-state-machine",
definition=sfn.Pass(self, "start-state")
)

source_policy = iam.PolicyStatement(
actions=['sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes'],
resources=[sourcepipe.queue_arn],
effect=iam.Effect.ALLOW,
)

target_policy = iam.PolicyStatement(
actions=['states:StartExecution'],
resources=[targetpipe.state_machine_arn],
effect=iam.Effect.ALLOW,
)

pipe_role = iam.Role(self, 'pipe-role',
assumed_by=iam.ServicePrincipal('pipes.amazonaws.com'),
)

pipe_role.add_to_policy(source_policy)
pipe_role.add_to_policy(target_policy)

pipe = pipes.CfnPipe(self, "pipe",
role_arn=pipe_role.role_arn,
source=sourcepipe.queue_arn,
source_parameters=pipes.CfnPipe.PipeSourceParametersProperty(
sqs_queue_parameters=pipes.CfnPipe.PipeSourceSqsQueueParametersProperty(
batch_size=1
)

,

filter_criteria=pipes.CfnPipe.FilterCriteriaProperty(
filters=[pipes.CfnPipe.FilterProperty(
pattern=json.dumps({"body":{"status": ["COMPLETE"]}})
)]
)


),

target=targetpipe.state_machine_arn,
target_parameters=pipes.CfnPipe.PipeTargetParametersProperty(
step_function_state_machine_parameters=pipes.CfnPipe.PipeTargetStateMachineParametersProperty(
invocation_type="FIRE_AND_FORGET"
),
input_template=json.dumps({
"playerid": "<$.body.id>",
"teamname": "<$.body.team>",
"teamstatus": "<$.body.status>"
})
)
)


# Output
CfnOutput(self, "SQS_QUEUE_URL", value=sourcepipe.queue_url)
CfnOutput(self, "EVENTBRIDGE_PIPE_ARN", value=pipe.attr_arn)
CfnOutput(self, "STATE_MACHINE_ARN", value=targetpipe.state_machine_arn)
Loading