Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Serverless Pattern - SQS to SQS message copy with filter using EventBridge Pipe using SAM #2058

136 changes: 136 additions & 0 deletions sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copying Amazon SQS messages between queues using an Amazon EventBridge Pipes filte

The pattern shows the filter capability of Amazon EventBridge Pipes while copying data from one Amazon SQS queue to another queue. The SAM template deploys two Amazon SQS queues, an Amazon EventBridge Pipe with filter having source as one of the SQS queues and target as another queue.

Learn more about this pattern at Serverless Land Patterns:https://serverlessland.com/patterns/eventbridge-pipes-sqs-to-sqs-with-data-filter

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

- [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
- [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
- [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
2. Change directory to the pattern directory:
```
cd sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam
```
3. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
```
sam deploy --guided
```
4. During the prompts:
* Enter a stack name
* Enter `us-east-1` or any other AWS Region.
* Allow SAM CLI to create IAM roles with the required permissions. Please keep all other options to default.
5. Make a note of the output, which will be used during testing.

## How it works

* This template creates two Amazon SQS queues - `source-queue` and `target-queue` along with an Amazon EventBridge Pipe.
* The Amazon EventBridge pipe copies messages from `source-queue` to `target-queue` only if message payload (JSON) contains `color` attribute having values `red` or `blue`.
* Once the stack is deployed, we will use `send-messages-to-source-queue.sh` to send messages to the `source-queue`. This script sends four messages with different values for `color` attribute.
* We will validate received messages on `target-queue` using `receive-messages-from-target-queue.sh`. Only the messages with `red` and `blue` values for `color` attribute will be available in `target-queue`.


Please refer to the architecture diagram below:

![End to End Architecture](image/architecture.png)

## Testing

1. Run the script `send-messages-to-source-queue.sh` to send four messages to the `source-queue` with a payload in `{"id":1, "color":"<value>"}` format having `red`, `blue`, `green` and `yellow` as color values. The script will ask for the queue URL and the region. Please provide the `SourceQueueURL` value received from the outout of the deployment step and your deployment region.
```bash
bash send-messages-to-source-queue.sh
```

Sample output:
```bash
Enter the SQS source queue URL:
{SourceQueueURL}
Enter your AWS region:
{your-region}
{
"MD5OfMessageBody": "266d1841ee6d382c85595fa0ca6e16b0",
"MessageId": "b1d51544-e725-451a-ba2f-13ad5f2acd9d"
}
{
"MD5OfMessageBody": "9cd1c133e3bc1406cf92d9ca0b39d7bc",
"MessageId": "8c68b9a1-a09c-4c88-aa8f-60353a5701ee"
}
{
"MD5OfMessageBody": "846a7a77cc95d371d002296f09dd6286",
"MessageId": "c4d87ea3-732f-4ae9-b227-9df4c4c04051"
}
{
"MD5OfMessageBody": "b77316a2ed44a33471ba262079fe8ce3",
"MessageId": "794d654a-95a7-45ee-8cd5-eb197de084b1"
}
```
2. Now, run the script `receive-messages-from-target-queue.sh` to receive messages from the `target-queue`. Please use the below aws cli command to read message from the SQS queue. Please replace the {SQSQueueURL} with the URL from the deployent output and also replace the {your-region} with the region that you selected during deployment:
```bash
bash receive-messages-from-target-queue.sh
```

Sample output:
```bash
Enter SQS target queue URL:
https://sqs.{your-region}.amazonaws.com/123456789012/target-queue
Enter your AWS region:
{your-region}
{
"Messages": [
{
"MessageId": "82e7xxx-xx-xx-xx-xxxxxde",
"ReceiptHandle": "AQEBWmPxxxxxx2sMw==",
"MD5OfBody": "64bxxxxx232b",
"Body": "{\"messageId\":\"e517xxxxx513\",\"receiptHandle\":\"AQEBxxxxVusg==\",\"body\":\"{\\\"id\\\":1, \\\"color\\\":\\\"red\\\"}\",\"attributes\":{\"ApproximateReceiveCount\":\"1\",\"SentTimestamp\":\"1705663525194\",\"SenderId\":\"AIxxxxIFR\",\"ApproximateFirstReceiveTimestamp\":\"1705663525205\"},\"messageAttributes\":{},\"md5OfBody\":\"266xxxxx16b0\",\"eventSource\":\"aws:sqs\",\"eventSourceARN\":\"arn:aws:sqs:{your-region}:123456789012:source-queue\",\"awsRegion\":\"{your-region}\"}",
"Attributes": {
"SenderId": "ARxxxxZC:c11xxxx92f",
"ApproximateFirstReceiveTimestamp": "1705663569479",
"ApproximateReceiveCount": "6",
"SentTimestamp": "1705663525269"
}
}
]
}
{
"Messages": [
{
"MessageId": "95f7xxx-xx-xx-xx-xxxxxde",
"ReceiptHandle": "AQEBWmPxxxxxx2sMw==",
"MD5OfBody": "64bxxxxx232b",
"Body": "{\"messageId\":\"e517xxxxx513\",\"receiptHandle\":\"AQEBxxxxVusg==\",\"body\":\"{\\\"id\\\":1, \\\"color\\\":\\\"blue\\\"}\",\"attributes\":{\"ApproximateReceiveCount\":\"1\",\"SentTimestamp\":\"1705663525194\",\"SenderId\":\"AIxxxxIFR\",\"ApproximateFirstReceiveTimestamp\":\"1705663525205\"},\"messageAttributes\":{},\"md5OfBody\":\"266xxxxx16b0\",\"eventSource\":\"aws:sqs\",\"eventSourceARN\":\"arn:aws:sqs:{your-region}:123456789012:source-queue\",\"awsRegion\":\"{your-region}\"}",
"Attributes": {
"SenderId": "ARxxxxZC:c11xxxx92f",
"ApproximateFirstReceiveTimestamp": "1705663566929",
"ApproximateReceiveCount": "6",
"SentTimestamp": "1705663525345"
}
}
]
}
```

As you see from the message, even though we pushed four messages into the `source-queue`, only the two messages that matched the filter of EventBridge pipe was sent to the `target-queue`.

## Cleanup


1. Delete the stack
```bash
sam delete
```

----
Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"title": "Copying Amazon SQS messages between queues using Amazon EventBridge Pipes",
"description": "The pattern shows the filter capability of Amazon EventBridge Pipes while copying data from one Amazon SQS queue to another queue.",
"language": "YAML",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This template creates two Amazon SQS queues - source-queue and target-queue along with an Amazon EventBridge Pipe.",
"The Amazon EventBridge pipe copies messages from source-queue to target-queue only if message payload (JSON) contains color attribute having values red or blue.",
"Once the stack is deployed, use `send-messages-to-source-queue.sh` to send messages to the source-queue. This scripts sends four messages with different values for color attribute.",
"Validate received messages on `target-queue` using `receive-messages-from-target-queue.sh`. Only the messages with red and blue values for color attribute are available in `target-queue`."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/eventbridge-pipes-sqs-to-sqs-with-data-filter",
"templateURL": "serverless-patterns/eventbridge-pipes-sqs-to-sqs-with-data-filter",
"projectFolder": "eventbridge-pipes-sqs-to-sqs-with-data-filter",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Amazon EventBridge Pipes filtering",
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html"
}
]
},
"deploy": {
"text": [
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Biswanath Mukherjee",
"image": "https://d1rwvjey2iif32.cloudfront.net",
"bio": "I am a Sr. Solutions Architect working at AWS India.",
"linkedin": "biswanathmukherjee"
}
]
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

# Get SQS queue URL
echo "Enter SQS target queue URL:"
read queue_url

# Get the AWS region
echo "Enter your AWS region:"
read region

# Get the number of messages in the queue
msg_count=$(aws sqs get-queue-attributes --queue-url $queue_url --attribute-names ApproximateNumberOfMessages --region $region --output text | awk '{print $2}')

# Keep reading messages until the queue is empty
while [ $msg_count -gt 0 ]; do
aws sqs receive-message --queue-url $queue_url --attribute-names All --message-attribute-names All --max-number-of-messages 1 --visibility-timeout 30 --wait-time-seconds 1 --region $region --output json

msg_count=$(aws sqs get-queue-attributes --queue-url $queue_url --attribute-names ApproximateNumberOfMessages --region $region --output text | awk '{print $2}')
done

echo "Queue is now empty"
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash

# Prompt for SQS queue URL
echo "Enter the SQS source queue URL:"
read sqsUrl

# Prompt for AWS region
echo "Enter your AWS region:"
read region

# Messages
msg1='{"id":1, "color":"red"}'
msg2='{"id":2, "color":"blue"}'
msg3='{"id":3, "color":"yellow"}'
msg4='{"id":4, "color":"green"}'

# Send messages
aws sqs send-message --queue-url $sqsUrl --message-body "$msg1" --region $region
aws sqs send-message --queue-url $sqsUrl --message-body "$msg2" --region $region
aws sqs send-message --queue-url $sqsUrl --message-body "$msg3" --region $region
aws sqs send-message --queue-url $sqsUrl --message-body "$msg4" --region $region

echo "4 messages sent to SQS queue: $sqsUrl"
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{
"title": "Copying Amazon SQS messages between queues using Amazon EventBridge Pipes",
"description": "The pattern shows the filter capability of Amazon EventBridge Pipes while copying data from one Amazon SQS queue to another queue.",
"language": "YAML",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This template creates two Amazon SQS queues - source-queue and target-queue along with an Amazon EventBridge Pipe.",
"The Amazon EventBridge pipe copies messages from source-queue to target-queue only if message payload (JSON) contains color attribute having values red or blue.",
"Once the stack is deployed, use `send-messages-to-source-queue.sh` to send messages to the source-queue. This scripts sends four messages with different values for color attribute.",
"Validate received messages on `target-queue` using `receive-messages-from-target-queue.sh`. Only the messages with red and blue values for color attribute are available in `target-queue`."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/eventbridge-pipes-sqs-to-sqs-with-data-filter",
"templateURL": "serverless-patterns/eventbridge-pipes-sqs-to-sqs-with-data-filter",
"projectFolder": "eventbridge-pipes-sqs-to-sqs-with-data-filter",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Amazon EventBridge Pipes filtering",
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-filtering.html"
}
]
},
"deploy": {
"text": [
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Biswanath Mukherjee",
"image": "https://d1rwvjey2iif32.cloudfront.net",
"bio": "I am a Sr. Solutions Architect working at AWS India.",
"linkedin": "biswanathmukherjee"
}
],
"patternArch": {
"icon1": {
"x": 15,
"y": 50,
"service": "sqs",
"label": "SQS (Source)"
},
"icon2": {
"x": 50,
"y": 50,
"service": "eventbridge-pipes",
"label": "EventBridge Pipes (extract-filter-load)"
},
"icon3": {
"x": 85,
"y": 50,
"service": "sqs",
"label": "SQS (Target)"
},
"line1": {
"from": "icon1",
"to": "icon2"
},
"line2": {
"from": "icon2",
"to": "icon3"
}
}
}
73 changes: 73 additions & 0 deletions sqs-eventbridgepipes-sqs-msg-copy-data-filter-sam/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Sample SAM template for an EventBridge pipe from SQS to SQS

Globals:
Function:
Timeout: 100

Resources:
# Define the source queue
SourceQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: source-queue
# Define the target queue
TargetQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: target-queue
# Define the pipe
Pipe:
Type: AWS::Pipes::Pipe
Properties:
RoleArn: !GetAtt PipeRole.Arn
Source: !GetAtt SourceQueue.Arn
Target: !GetAtt TargetQueue.Arn
# filter the incoming SQS messages
SourceParameters:
SqsQueueParameters:
BatchSize: 1
FilterCriteria:
Filters:
- Pattern: '{"body": {"color": ["red", "blue"]}}'

# Role for Pipe
PipeRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- pipes.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: SourcePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'sqs:ReceiveMessage'
- 'sqs:DeleteMessage'
- 'sqs:GetQueueAttributes'
Resource: !GetAtt SourceQueue.Arn
- PolicyName: TargetPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'sqs:sendMessage'
Resource: !GetAtt TargetQueue.Arn
Outputs:
SourceQueueURL:
Description: "Source Queue URL"
Value: !Ref SourceQueue
TargetQueueURL:
Description: "Target Queue URL"
Value: !Ref TargetQueue
Loading