-
Notifications
You must be signed in to change notification settings - Fork 928
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2058 from biswanathmukherjee/biswanathmukherjee-f…
…eature-pipes-sqs-to-sqs-with-data-filter New Serverless Pattern - SQS to SQS message copy with filter using EventBridge Pipe using SAM
- Loading branch information
Showing
7 changed files
with
391 additions
and
0 deletions.
There are no files selected for viewing
136 changes: 136 additions & 0 deletions
136
sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/README.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
# Copying Amazon SQS messages between queues using an Amazon EventBridge Pipes filte | ||
|
||
The pattern shows the filter capability of Amazon EventBridge Pipes while copying data from one Amazon SQS queue to another queue. The SAM template deploys two Amazon SQS queues, an Amazon EventBridge Pipe with filter having source as one of the SQS queues and target as another queue. | ||
|
||
Learn more about this pattern at Serverless Land Patterns:https://serverlessland.com/patterns/eventbridge-pipes-sqs-to-sqs-with-data-filter | ||
|
||
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. | ||
|
||
## Requirements | ||
|
||
- [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. | ||
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured | ||
- [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) | ||
- [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed | ||
|
||
## Deployment Instructions | ||
|
||
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: | ||
``` | ||
git clone https://github.com/aws-samples/serverless-patterns | ||
``` | ||
2. Change directory to the pattern directory: | ||
``` | ||
cd sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam | ||
``` | ||
3. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file: | ||
``` | ||
sam deploy --guided | ||
``` | ||
4. During the prompts: | ||
* Enter a stack name | ||
* Enter `us-east-1` or any other AWS Region. | ||
* Allow SAM CLI to create IAM roles with the required permissions. Please keep all other options to default. | ||
5. Make a note of the output, which will be used during testing. | ||
|
||
## How it works | ||
|
||
* This template creates two Amazon SQS queues - `source-queue` and `target-queue` along with an Amazon EventBridge Pipe. | ||
* The Amazon EventBridge pipe copies messages from `source-queue` to `target-queue` only if message payload (JSON) contains `color` attribute having values `red` or `blue`. | ||
* Once the stack is deployed, we will use `send-messages-to-source-queue.sh` to send messages to the `source-queue`. This script sends four messages with different values for `color` attribute. | ||
* We will validate received messages on `target-queue` using `receive-messages-from-target-queue.sh`. Only the messages with `red` and `blue` values for `color` attribute will be available in `target-queue`. | ||
|
||
|
||
Please refer to the architecture diagram below: | ||
|
||
![End to End Architecture](image/architecture.png) | ||
|
||
## Testing | ||
|
||
1. Run the script `send-messages-to-source-queue.sh` to send four messages to the `source-queue` with a payload in `{"id":1, "color":"<value>"}` format having `red`, `blue`, `green` and `yellow` as color values. The script will ask for the queue URL and the region. Please provide the `SourceQueueURL` value received from the outout of the deployment step and your deployment region. | ||
```bash | ||
bash send-messages-to-source-queue.sh | ||
``` | ||
|
||
Sample output: | ||
```bash | ||
Enter the SQS source queue URL: | ||
{SourceQueueURL} | ||
Enter your AWS region: | ||
{your-region} | ||
{ | ||
"MD5OfMessageBody": "266d1841ee6d382c85595fa0ca6e16b0", | ||
"MessageId": "b1d51544-e725-451a-ba2f-13ad5f2acd9d" | ||
} | ||
{ | ||
"MD5OfMessageBody": "9cd1c133e3bc1406cf92d9ca0b39d7bc", | ||
"MessageId": "8c68b9a1-a09c-4c88-aa8f-60353a5701ee" | ||
} | ||
{ | ||
"MD5OfMessageBody": "846a7a77cc95d371d002296f09dd6286", | ||
"MessageId": "c4d87ea3-732f-4ae9-b227-9df4c4c04051" | ||
} | ||
{ | ||
"MD5OfMessageBody": "b77316a2ed44a33471ba262079fe8ce3", | ||
"MessageId": "794d654a-95a7-45ee-8cd5-eb197de084b1" | ||
} | ||
``` | ||
2. Now, run the script `receive-messages-from-target-queue.sh` to receive messages from the `target-queue`. Please use the below aws cli command to read message from the SQS queue. Please replace the {SQSQueueURL} with the URL from the deployent output and also replace the {your-region} with the region that you selected during deployment: | ||
```bash | ||
bash receive-messages-from-target-queue.sh | ||
``` | ||
|
||
Sample output: | ||
```bash | ||
Enter SQS target queue URL: | ||
https://sqs.{your-region}.amazonaws.com/123456789012/target-queue | ||
Enter your AWS region: | ||
{your-region} | ||
{ | ||
"Messages": [ | ||
{ | ||
"MessageId": "82e7xxx-xx-xx-xx-xxxxxde", | ||
"ReceiptHandle": "AQEBWmPxxxxxx2sMw==", | ||
"MD5OfBody": "64bxxxxx232b", | ||
"Body": "{\"messageId\":\"e517xxxxx513\",\"receiptHandle\":\"AQEBxxxxVusg==\",\"body\":\"{\\\"id\\\":1, \\\"color\\\":\\\"red\\\"}\",\"attributes\":{\"ApproximateReceiveCount\":\"1\",\"SentTimestamp\":\"1705663525194\",\"SenderId\":\"AIxxxxIFR\",\"ApproximateFirstReceiveTimestamp\":\"1705663525205\"},\"messageAttributes\":{},\"md5OfBody\":\"266xxxxx16b0\",\"eventSource\":\"aws:sqs\",\"eventSourceARN\":\"arn:aws:sqs:{your-region}:123456789012:source-queue\",\"awsRegion\":\"{your-region}\"}", | ||
"Attributes": { | ||
"SenderId": "ARxxxxZC:c11xxxx92f", | ||
"ApproximateFirstReceiveTimestamp": "1705663569479", | ||
"ApproximateReceiveCount": "6", | ||
"SentTimestamp": "1705663525269" | ||
} | ||
} | ||
] | ||
} | ||
{ | ||
"Messages": [ | ||
{ | ||
"MessageId": "95f7xxx-xx-xx-xx-xxxxxde", | ||
"ReceiptHandle": "AQEBWmPxxxxxx2sMw==", | ||
"MD5OfBody": "64bxxxxx232b", | ||
"Body": "{\"messageId\":\"e517xxxxx513\",\"receiptHandle\":\"AQEBxxxxVusg==\",\"body\":\"{\\\"id\\\":1, \\\"color\\\":\\\"blue\\\"}\",\"attributes\":{\"ApproximateReceiveCount\":\"1\",\"SentTimestamp\":\"1705663525194\",\"SenderId\":\"AIxxxxIFR\",\"ApproximateFirstReceiveTimestamp\":\"1705663525205\"},\"messageAttributes\":{},\"md5OfBody\":\"266xxxxx16b0\",\"eventSource\":\"aws:sqs\",\"eventSourceARN\":\"arn:aws:sqs:{your-region}:123456789012:source-queue\",\"awsRegion\":\"{your-region}\"}", | ||
"Attributes": { | ||
"SenderId": "ARxxxxZC:c11xxxx92f", | ||
"ApproximateFirstReceiveTimestamp": "1705663566929", | ||
"ApproximateReceiveCount": "6", | ||
"SentTimestamp": "1705663525345" | ||
} | ||
} | ||
] | ||
} | ||
``` | ||
|
||
As you see from the message, even though we pushed four messages into the `source-queue`, only the two messages that matched the filter of EventBridge pipe was sent to the `target-queue`. | ||
|
||
## Cleanup | ||
|
||
|
||
1. Delete the stack | ||
```bash | ||
sam delete | ||
``` | ||
|
||
---- | ||
Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
|
||
SPDX-License-Identifier: MIT-0 |
55 changes: 55 additions & 0 deletions
55
sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/example-pattern.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
{ | ||
"title": "Copying Amazon SQS messages between queues using Amazon EventBridge Pipes", | ||
"description": "The pattern shows the filter capability of Amazon EventBridge Pipes while copying data from one Amazon SQS queue to another queue.", | ||
"language": "YAML", | ||
"level": "200", | ||
"framework": "SAM", | ||
"introBox": { | ||
"headline": "How it works", | ||
"text": [ | ||
"This template creates two Amazon SQS queues - source-queue and target-queue along with an Amazon EventBridge Pipe.", | ||
"The Amazon EventBridge pipe copies messages from source-queue to target-queue only if message payload (JSON) contains color attribute having values red or blue.", | ||
"Once the stack is deployed, use `send-messages-to-source-queue.sh` to send messages to the source-queue. This scripts sends four messages with different values for color attribute.", | ||
"Validate received messages on `target-queue` using `receive-messages-from-target-queue.sh`. Only the messages with red and blue values for color attribute are available in `target-queue`." | ||
] | ||
}, | ||
"gitHub": { | ||
"template": { | ||
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/eventbridge-pipes-sqs-to-sqs-with-data-filter", | ||
"templateURL": "serverless-patterns/eventbridge-pipes-sqs-to-sqs-with-data-filter", | ||
"projectFolder": "eventbridge-pipes-sqs-to-sqs-with-data-filter", | ||
"templateFile": "template.yaml" | ||
} | ||
}, | ||
"resources": { | ||
"bullets": [ | ||
{ | ||
"text": "Amazon EventBridge Pipes filtering", | ||
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html" | ||
} | ||
] | ||
}, | ||
"deploy": { | ||
"text": [ | ||
"sam deploy --guided" | ||
] | ||
}, | ||
"testing": { | ||
"text": [ | ||
"See the GitHub repo for detailed testing instructions." | ||
] | ||
}, | ||
"cleanup": { | ||
"text": [ | ||
"Delete the stack: <code>sam delete</code>." | ||
] | ||
}, | ||
"authors": [ | ||
{ | ||
"name": "Biswanath Mukherjee", | ||
"image": "https://d1rwvjey2iif32.cloudfront.net", | ||
"bio": "I am a Sr. Solutions Architect working at AWS India.", | ||
"linkedin": "biswanathmukherjee" | ||
} | ||
] | ||
} |
Binary file added
BIN
+19.2 KB
sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/image/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 21 additions & 0 deletions
21
sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/receive-messages-from-target-queue.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
#!/bin/bash | ||
|
||
# Get SQS queue URL | ||
echo "Enter SQS target queue URL:" | ||
read queue_url | ||
|
||
# Get the AWS region | ||
echo "Enter your AWS region:" | ||
read region | ||
|
||
# Get the number of messages in the queue | ||
msg_count=$(aws sqs get-queue-attributes --queue-url $queue_url --attribute-names ApproximateNumberOfMessages --region $region --output text | awk '{print $2}') | ||
|
||
# Keep reading messages until the queue is empty | ||
while [ $msg_count -gt 0 ]; do | ||
aws sqs receive-message --queue-url $queue_url --attribute-names All --message-attribute-names All --max-number-of-messages 1 --visibility-timeout 30 --wait-time-seconds 1 --region $region --output json | ||
|
||
msg_count=$(aws sqs get-queue-attributes --queue-url $queue_url --attribute-names ApproximateNumberOfMessages --region $region --output text | awk '{print $2}') | ||
done | ||
|
||
echo "Queue is now empty" |
23 changes: 23 additions & 0 deletions
23
sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/send-messages-to-source-queue.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
#!/bin/bash | ||
|
||
# Prompt for SQS queue URL | ||
echo "Enter the SQS source queue URL:" | ||
read sqsUrl | ||
|
||
# Prompt for AWS region | ||
echo "Enter your AWS region:" | ||
read region | ||
|
||
# Messages | ||
msg1='{"id":1, "color":"red"}' | ||
msg2='{"id":2, "color":"blue"}' | ||
msg3='{"id":3, "color":"yellow"}' | ||
msg4='{"id":4, "color":"green"}' | ||
|
||
# Send messages | ||
aws sqs send-message --queue-url $sqsUrl --message-body "$msg1" --region $region | ||
aws sqs send-message --queue-url $sqsUrl --message-body "$msg2" --region $region | ||
aws sqs send-message --queue-url $sqsUrl --message-body "$msg3" --region $region | ||
aws sqs send-message --queue-url $sqsUrl --message-body "$msg4" --region $region | ||
|
||
echo "4 messages sent to SQS queue: $sqsUrl" |
83 changes: 83 additions & 0 deletions
83
...idgepipes-sqs-msg-copy-data-filter-sam/sqs-to-sqs-message-copy-with-eventbridgepipes.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
{ | ||
"title": "Copying Amazon SQS messages between queues using Amazon EventBridge Pipes", | ||
"description": "The pattern shows the filter capability of Amazon EventBridge Pipes while copying data from one Amazon SQS queue to another queue.", | ||
"language": "YAML", | ||
"level": "200", | ||
"framework": "SAM", | ||
"introBox": { | ||
"headline": "How it works", | ||
"text": [ | ||
"This template creates two Amazon SQS queues - source-queue and target-queue along with an Amazon EventBridge Pipe.", | ||
"The Amazon EventBridge pipe copies messages from source-queue to target-queue only if message payload (JSON) contains color attribute having values red or blue.", | ||
"Once the stack is deployed, use `send-messages-to-source-queue.sh` to send messages to the source-queue. This scripts sends four messages with different values for color attribute.", | ||
"Validate received messages on `target-queue` using `receive-messages-from-target-queue.sh`. Only the messages with red and blue values for color attribute are available in `target-queue`." | ||
] | ||
}, | ||
"gitHub": { | ||
"template": { | ||
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/eventbridge-pipes-sqs-to-sqs-with-data-filter", | ||
"templateURL": "serverless-patterns/eventbridge-pipes-sqs-to-sqs-with-data-filter", | ||
"projectFolder": "eventbridge-pipes-sqs-to-sqs-with-data-filter", | ||
"templateFile": "template.yaml" | ||
} | ||
}, | ||
"resources": { | ||
"bullets": [ | ||
{ | ||
"text": "Amazon EventBridge Pipes filtering", | ||
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html" | ||
} | ||
] | ||
}, | ||
"deploy": { | ||
"text": [ | ||
"sam deploy --guided" | ||
] | ||
}, | ||
"testing": { | ||
"text": [ | ||
"See the GitHub repo for detailed testing instructions." | ||
] | ||
}, | ||
"cleanup": { | ||
"text": [ | ||
"Delete the stack: <code>sam delete</code>." | ||
] | ||
}, | ||
"authors": [ | ||
{ | ||
"name": "Biswanath Mukherjee", | ||
"image": "https://d1rwvjey2iif32.cloudfront.net", | ||
"bio": "I am a Sr. Solutions Architect working at AWS India.", | ||
"linkedin": "biswanathmukherjee" | ||
} | ||
], | ||
"patternArch": { | ||
"icon1": { | ||
"x": 15, | ||
"y": 50, | ||
"service": "sqs", | ||
"label": "SQS (Source)" | ||
}, | ||
"icon2": { | ||
"x": 50, | ||
"y": 50, | ||
"service": "eventbridge-pipes", | ||
"label": "EventBridge Pipes (extract-filter-load)" | ||
}, | ||
"icon3": { | ||
"x": 85, | ||
"y": 50, | ||
"service": "sqs", | ||
"label": "SQS (Target)" | ||
}, | ||
"line1": { | ||
"from": "icon1", | ||
"to": "icon2" | ||
}, | ||
"line2": { | ||
"from": "icon2", | ||
"to": "icon3" | ||
} | ||
} | ||
} |
73 changes: 73 additions & 0 deletions
73
sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/template.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
AWSTemplateFormatVersion: '2010-09-09' | ||
Transform: AWS::Serverless-2016-10-31 | ||
Description: Sample SAM template for an EventBridge pipe from SQS to SQS | ||
|
||
Globals: | ||
Function: | ||
Timeout: 100 | ||
|
||
Resources: | ||
# Define the source queue | ||
SourceQueue: | ||
Type: AWS::SQS::Queue | ||
Properties: | ||
QueueName: source-queue | ||
# Define the target queue | ||
TargetQueue: | ||
Type: AWS::SQS::Queue | ||
Properties: | ||
QueueName: target-queue | ||
# Define the pipe | ||
Pipe: | ||
Type: AWS::Pipes::Pipe | ||
Properties: | ||
RoleArn: !GetAtt PipeRole.Arn | ||
Source: !GetAtt SourceQueue.Arn | ||
Target: !GetAtt TargetQueue.Arn | ||
# filter the incoming SQS messages | ||
SourceParameters: | ||
SqsQueueParameters: | ||
BatchSize: 1 | ||
FilterCriteria: | ||
Filters: | ||
- Pattern: '{"body": {"color": ["red", "blue"]}}' | ||
|
||
# Role for Pipe | ||
PipeRole: | ||
Type: AWS::IAM::Role | ||
Properties: | ||
AssumeRolePolicyDocument: | ||
Version: 2012-10-17 | ||
Statement: | ||
- Effect: Allow | ||
Principal: | ||
Service: | ||
- pipes.amazonaws.com | ||
Action: | ||
- sts:AssumeRole | ||
Policies: | ||
- PolicyName: SourcePolicy | ||
PolicyDocument: | ||
Version: 2012-10-17 | ||
Statement: | ||
- Effect: Allow | ||
Action: | ||
- 'sqs:ReceiveMessage' | ||
- 'sqs:DeleteMessage' | ||
- 'sqs:GetQueueAttributes' | ||
Resource: !GetAtt SourceQueue.Arn | ||
- PolicyName: TargetPolicy | ||
PolicyDocument: | ||
Version: 2012-10-17 | ||
Statement: | ||
- Effect: Allow | ||
Action: | ||
- 'sqs:sendMessage' | ||
Resource: !GetAtt TargetQueue.Arn | ||
Outputs: | ||
SourceQueueURL: | ||
Description: "Source Queue URL" | ||
Value: !Ref SourceQueue | ||
TargetQueueURL: | ||
Description: "Target Queue URL" | ||
Value: !Ref TargetQueue |