Skip to content

Commit

Permalink
Adding a pattern to copy an object between two S3 buckets using S3 Ev…
Browse files Browse the repository at this point in the history
…ent Notification to SQS to EventBridge Pipes to Step Function

resolves: #2086
  • Loading branch information
Dan Straw committed Feb 4, 2024
1 parent 8b48d00 commit 0e15a74
Show file tree
Hide file tree
Showing 4 changed files with 448 additions and 0 deletions.
90 changes: 90 additions & 0 deletions s3-sqs-eventbridge-pipe-sfn-s3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copying a file from S3 to S3 via Eventbridge Pipes & Step Function

This pattern shows how to use S3 Event Notifications, queue them on SQS, and then use EventBridge Pipes to launch a
StepFunctions state machine and copy the file from the source S3 bucket to a destination. Modifying the state machine
would allow manipulation of the file.

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
2. Change directory to the pattern directory:
```
cd s3-sqs-eventbridge-pipe-sfn-s3
```
3. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
```
sam deploy --guided
```
4. During the prompts:
* Enter a stack name
* Enter the desired AWS Region
* Allow SAM CLI to create IAM roles with the required permissions.

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you
can use `sam deploy` in future to use these defaults.

5. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing.

## How it works

When a file is created in the source S3 bucket, an
[S3 Event Notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html) fires with
[this structure](https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html).
This is enqueued onto a standard Amazon SQS Queue.

The EventBridge service polls the SQS Queue and invokes the EventBridge Pipe synchronously with an event that contains queue
messages. EventBridge reads messages in batches and invokes the pipe once for each batch. When the pipe successfully
processes a batch, EventBridge deletes its messages from the queue.
The structure of the batch is [described here](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html)

The EventBridge Pipe then executes the
[AWS Step Functions Express Workflow](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-standard-vs-express.html)
state machine
[described in s3-sqs-eventbridge-pipe-sfn-s3.asl.json](./workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json).
This loops through each message in the batch dequeued from SQS using a Map state, loops through each record from the S3
Event Notification using another Map state, and then copies the file from the source S3 Bucket to the destination S3
Bucket. In a real world scenario the state machine would be modified to manipulate the file as desired.


## Testing

Provide steps to trigger the integration and show what should be observed if successful.

1. Stream logs from StepFunctions LogGroup

```
sam logs --cw-log-group <LogGroup Name> --tail
```

2. Put a file into the source bucket

```
aws s3api put-object --bucket <source-bucket-name> --key <source-bucket-filename> --body <local-filename>
```

3. Observe the logs for the new execution.

## Cleanup

1. Delete the stack

```bash
sam delete
```

----
Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
61 changes: 61 additions & 0 deletions s3-sqs-eventbridge-pipe-sfn-s3/s3-sqs-eventbridge-pipe-sfn-s3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"title": "Copy an object between two S3 buckets using S3 Event Notification to SQS to EventBridge Pipes to Step Function",
"description": "Create a Step Functions workflow to query Amazon Athena.",
"language": "",
"level": "200",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This sample project demonstrates how to copy an object between two S3 buckets and manipulate it in transit.",
"When an object is created in the source S3 bucket, an S3 Event Notification is fires and enqueued onto an SQS Queue.",
"The EventBridge service polls the SQS Queue and invokes an EventBridge pipe.",
"The EventBridge Pipe then executes an AWS Step Functions Express Workflow state machine.",
"This copies the file from the source S3 Bucket to the destination S3 Bucket.",
"In a real world scenario the state machine would be modified to manipulate the file as desired"
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/s3-sqs-eventbridge-pipe-sfn-s3",
"templateURL": "serverless-patterns/s3-sqs-eventbridge-pipe-sfn-s3",
"projectFolder": "s3-sqs-eventbridge-pipe-sfn-s3",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Configuring an S3 bucket for notifications",
"link": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html"
},
{
"text": "Amazon Simple Queue Service as an EventBridge Pipe source",
"link": "https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-sqs.html"
}
]
},
"deploy": {
"text": [
"sam deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Dan Straw",
"image": "https://avatars.githubusercontent.com/u/271028",
"bio": "Senior Solutions Architect at AWS",
"linkedin": "https://www.linkedin.com/in/danstraw/"
}
]
}
239 changes: 239 additions & 0 deletions s3-sqs-eventbridge-pipe-sfn-s3/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless patterns - S3 to SQS to EventBridge Pipes to Step Function to S3

Parameters:
SourceBucketName:
Type: String
DestinationBucketName:
Type: String

Resources:
## Source S3 bucket
SourceBucket:
Type: AWS::S3::Bucket
DependsOn:
- NotificationQueuePolicy
Properties:
BucketName: !Ref SourceBucketName
NotificationConfiguration:
QueueConfigurations:
- Event: "s3:ObjectCreated:*"
Queue: !GetAtt NotificationQueue.Arn

## Destination S3 Bucket
DestinationBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Ref DestinationBucketName

## SQS Queue
NotificationQueue:
Type: AWS::SQS::Queue
Properties:
RedrivePolicy:
deadLetterTargetArn: !GetAtt NotificationQueueDLQ.Arn
maxReceiveCount: 5

# DLQ for source
NotificationQueueDLQ:
Type: AWS::SQS::Queue

## Policy allowing S3 Event Notifications to be in SQS
NotificationQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Id: QueuePolicy
Statement:
- Sid: Allow-SendMessage-To-Queue-From-S3-Event-Notification
Effect: Allow
Principal:
Service: "s3.amazonaws.com"
Action:
- "sqs:SendMessage"
Resource: !GetAtt NotificationQueue.Arn
Condition:
ArnLike:
aws:SourceArn: !Join [ "",[ 'arn:aws:s3:::',!Ref SourceBucketName ] ]
StringEquals:
aws:SourceAccount: !Ref AWS::AccountId
Queues:
- Ref: NotificationQueue

# Logs for EventBridge Pipe
EventBridgePipeLogGroup:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 7
LogGroupName: /s3-sqs-eventbridge-pipe-sfn-s3/EventBridgePipe

# Eventbridge Pipe Definition
S3FileCopy:
Type: AWS::Pipes::Pipe
Properties:
RoleArn: !GetAtt EventBridgePipesRole.Arn
Name: s3-sqs-eventbridge-pipe-sfn-s3
DesiredState: RUNNING
Source: !GetAtt NotificationQueue.Arn
SourceParameters:
SqsQueueParameters:
BatchSize: 1
LogConfiguration:
CloudwatchLogsLogDestination:
LogGroupArn: !GetAtt EventBridgePipeLogGroup.Arn
IncludeExecutionData:
- ALL
Level: TRACE
Target: !Ref TargetStateMachine

## Role for EventBridge Pipes to read from SQS and execute Step Function
EventBridgePipesRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- pipes.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: CloudWatchLogs
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogDelivery'
- 'logs:GetLogDelivery'
- 'logs:UpdateLogDelivery'
- 'logs:DeleteLogDelivery'
- 'logs:ListLogDeliveries'
- 'logs:PutResourcePolicy'
- 'logs:DescribeResourcePolicies'
- 'logs:DescribeLogGroups'
Resource: '*'
- PolicyName: ReadSQS
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'sqs:ReceiveMessage'
- 'sqs:DeleteMessage'
- 'sqs:GetQueueAttributes'
Resource: !GetAtt NotificationQueue.Arn
- PolicyName: ExecuteSFN
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'states:StartExecution'
- 'states:StartSyncExecution'
Resource: !Ref TargetStateMachine

# Logs for StepFunctions
TargetStateMachineLogGroup:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 7
LogGroupName: /s3-sqs-eventbridge-pipe-sfn-s3/StateMachine

# Execution Role for StepFunctions
TargetStateMachineRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- states.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: CloudWatchLogs
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogDelivery'
- 'logs:GetLogDelivery'
- 'logs:UpdateLogDelivery'
- 'logs:DeleteLogDelivery'
- 'logs:ListLogDeliveries'
- 'logs:PutResourcePolicy'
- 'logs:DescribeResourcePolicies'
- 'logs:DescribeLogGroups'
Resource: '*'
- PolicyName: SourceBucketList
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:ListBucket'
Resource: !GetAtt SourceBucket.Arn
- PolicyName: SourceBucketReadOnly
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:Get*'
- 's3:List*'
- 's3:Describe*'
Resource: !Join
- ''
- - !GetAtt SourceBucket.Arn
- /*
- PolicyName: DestinationBucketList
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:ListBucket'
Resource: !GetAtt DestinationBucket.Arn
- PolicyName: DestinationBucketReadWrite
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 's3:*Object'
Resource: !Join
- ''
- - !GetAtt DestinationBucket.Arn
- /*

# Step Function Definition
TargetStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
Type: EXPRESS
DefinitionUri: workflow/s3-sqs-eventbridge-pipe-sfn-s3.asl.json
DefinitionSubstitutions:
DestinationBucketName: !Ref DestinationBucketName
Logging:
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt TargetStateMachineLogGroup.Arn
Level: ALL
IncludeExecutionData: true
Role: !GetAtt TargetStateMachineRole.Arn

Outputs:
SourceBucketName:
Value: !Ref SourceBucketName
Description: S3 Bucket for object storage
DestinationBucketName:
Value: !Ref DestinationBucketName
Description: S3 destination Bucket for object storage
Loading

0 comments on commit 0e15a74

Please sign in to comment.