Skip to content

Commit

Permalink
Merge pull request #1525 from uditaws/main
Browse files Browse the repository at this point in the history
Kinesis Data Stream to API Gateway via Data Firehose
  • Loading branch information
julianwood authored Aug 2, 2023
2 parents c8d8261 + 52f4adc commit 78c546c
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 0 deletions.
59 changes: 59 additions & 0 deletions kinesis-data-stream-firehose-apigw-sam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Kinesis Data Stream to API Gateway via Data Firehose

This sample project demonstrates how to send data received by Kinesis Data Stream to a HTTP Endpoint with Authorization (here API Gateway REST API) using Kinesis Firehose

Learn more about this pattern at [Serverless Land Patterns](https://serverlessland.com/patterns/kinesis-data-stream-firehose-apigw-sam).

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
1. Change directory to the pattern directory:
```
cd kinesis-data-stream-firehose-apigw-sam
```
1. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
```
sam deploy --guided
```
1. During the prompts:
* Enter a stack name
* Enter the desired AWS Region
* Enter Secret Name. To use default, click Enter.
* Allow SAM CLI to create IAM roles with the required permissions.

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults.

## Testing

* Visit [Kinesis Management Console](https://us-east-1.console.aws.amazon.com/kinesis/home)
* Select 'Data streams', copy name of the Data Stream having prefix `firehose-apigw-MyKinesisStream-`.
* Replace `<KinesisDataStreamName>` in the following command with the copied name of the Kinesis Data Stream in above step to get the test command.
```
aws kinesis put-record --stream-name <KinesisDataStreamName> --partition-key 001 --data $(echo -n "{\"hello\" : \"world\"}" | base64)
```
* Run the above command a few times and check the logs of Test Lambda Function to see the records put to Kinesis Data Stream

## AWS Documentation
- [Stream data to an HTTP endpoint with Amazon Kinesis Data Firehose](https://aws.amazon.com/blogs/big-data/stream-data-to-an-http-endpoint-with-amazon-kinesis-data-firehose/)
- [Kinesis Data Firehose HTTP Endpoint Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http)

## Cleanup

1. Delete the stack
```bash
sam delete
```


56 changes: 56 additions & 0 deletions kinesis-data-stream-firehose-apigw-sam/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"title": "Kinesis Data Stream to API Gateway via Data Firehose",
"description": "POST data received by Kinesis Data Stream to HTTP Endpoint securely using Kinesis Data Firehose",
"level": "200",
"language": "Python",
"framework": "SAM",
"introBox": {
"headline": "How it works",
"text": [
"This sample project demonstrates how to send data received by Kinesis Data Stream to a HTTP Endpoint with Authorization (here API Gateway REST API) using Kinesis Firehose"
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/kinesis-data-stream-firehose-apigw-sam",
"templateURL": "serverless-patterns/kinesis-data-stream-firehose-apigw-sam",
"projectFolder": "kinesis-data-stream-firehose-apigw-sam",
"templateFile": "kinesis-data-stream-firehose-apigw-sam/template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "Stream data to an HTTP endpoint with Amazon Kinesis Data Firehose",
"link": "https://aws.amazon.com/blogs/big-data/stream-data-to-an-http-endpoint-with-amazon-kinesis-data-firehose/"
},
{
"text": "Kinesis Data Firehose HTTP Endpoint Destination",
"link": "https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http"
}
]
},
"deploy": {
"text": [
"sam deploy"
]
},
"testing": {
"text": [
"See the Github repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Udit Parikh",
"image": "https://drive.google.com/file/d/1SfEjQtLkPaOHC7JlfugR4OaUD0yNjHu6/view?usp=drivesdk",
"bio": "Udit is passionate about serverless & event-driven architectures.",
"linkedin": "https://www.linkedin.com/in/parikhudit"
}
]
}
51 changes: 51 additions & 0 deletions kinesis-data-stream-firehose-apigw-sam/src/lambda_authorizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
import boto3
import os

secretsmanager_client = boto3.client('secretsmanager')

def lambda_handler(event, context):

# print(event)
access_key = event['authorizationToken']
methodArn = event['methodArn']
secret_name = os.environ['SECRET_NAME']
# print('Secret Name: ' + secret_name)
# print('Access Key: ' + access_key)

try:
response = secretsmanager_client.get_secret_value(
SecretId=secret_name
)
expected_key = response['SecretString']
# print('Expected key: ' + expected_key)
except Exception:
return build_response(False, methodArn)

if access_key != expected_key:
return build_response(False, methodArn)

return build_response(True, methodArn)

def build_response(allowRequest, methodArn):

output = {
"principalId": "yyyyyyyy",
"policyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:Invoke",
"Effect": "Deny",
"Resource": "arn:aws:execute-api:*"
}
]
}
}

if allowRequest is True:
output["policyDocument"]["Statement"][0]["Effect"] = "Allow"

output["policyDocument"]["Statement"][0]["Resource"] = methodArn

return output
20 changes: 20 additions & 0 deletions kinesis-data-stream-firehose-apigw-sam/src/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import json
import time

def lambda_handler(event, context):
print(event)

body = {
"requestId": event["headers"]["X-Amz-Firehose-Request-Id"],
"timestamp": int(time.time() * 1000)
}

output = {
"statusCode": 200,
"body": json.dumps(body),
"headers": {
"Content-Type": "application/json"
}
}
print(output)
return output
147 changes: 147 additions & 0 deletions kinesis-data-stream-firehose-apigw-sam/template.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: SAM application to POST data to API Gateway from Kinesis Data Streams via Kinesis Firehose

Parameters:
SecretName:
Type: String
Default: KinesisFirehoseHttpEndpointAccessKey

Resources:

MySecret:
Type: "AWS::SecretsManager::Secret"
Properties:
Name: "KinesisFirehoseHttpEndpointAccessKey"
Description: "Secret with dynamically generated secret password."
GenerateSecretString:
PasswordLength: 30
ExcludeCharacters: '"@/\'

MyApiGateway:
Type: AWS::Serverless::Api
Properties:
OpenApiVersion: 3.0.1
StageName: dev
Auth:
DefaultAuthorizer: MyLambdaAuthorizer
Authorizers:
MyLambdaAuthorizer:
FunctionArn: !GetAtt LambdaAuthorizerFunction.Arn
Identity:
Header: "X-Amz-Firehose-Access-Key"

LambdaAuthorizerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: lambda_authorizer.lambda_handler
Runtime: python3.10
Policies:
- AWSSecretsManagerGetSecretValuePolicy:
SecretArn: !Ref MySecret
Environment:
Variables:
SECRET_NAME: !Ref SecretName

TestLambdaFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/
Handler: lambda_function.lambda_handler
Runtime: python3.10
Events:
Post:
Type: Api
Properties:
Path: /
Method: POST
RestApiId: !Ref MyApiGateway

MyKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 2

MyKinesisFirehoseRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: KinesisFirehosePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- "firehose:CreateDeliveryStream"
- "firehose:DeleteDeliveryStream"
- "firehose:DescribeDeliveryStream"
- "firehose:PutRecord"
- "firehose:PutRecords"
- "firehose:PutRecordBatch"
- "firehose:UpdateDestination"
Resource:
- !GetAtt MyKinesisStream.Arn
- Effect: Allow
Action:
- "kinesis:GetRecords"
- "kinesis:GetShardIterator"
- "kinesis:DescribeStream"
- "kinesis:DescribeStreamSummary"
- "kinesis:DescribeStreamConsumer"
- "kinesis:SubscribeToShard"
- "kinesis:RegisterStreamConsumer"
Resource:
- "*"
- Effect: Allow
Action:
- "s3:GetObject"
- "s3:ListBucket"
- "s3:GetObjectVersion"
- "s3:PutObject"
- "s3:PutObjectAcl"
Resource:
- !Sub arn:aws:s3:::${MyS3Bucket}/*

MyS3Bucket:
Type: AWS::S3::Bucket

KinesisFirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: "MyDeliveryStream"
DeliveryStreamType: "KinesisStreamAsSource"
KinesisStreamSourceConfiguration:
KinesisStreamARN: !GetAtt MyKinesisStream.Arn
RoleARN: !GetAtt MyKinesisFirehoseRole.Arn
HttpEndpointDestinationConfiguration:
EndpointConfiguration:
Url: !Sub "https://${MyApiGateway}.execute-api.${AWS::Region}.amazonaws.com/dev/"
AccessKey: '{{resolve:secretsmanager:KinesisFirehoseHttpEndpointAccessKey:SecretString}}'
RoleARN: !GetAtt MyKinesisFirehoseRole.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 1
S3Configuration:
RoleARN: !GetAtt MyKinesisFirehoseRole.Arn
BucketARN: !GetAtt MyS3Bucket.Arn
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 1
ErrorOutputPrefix: "error/"
CompressionFormat: "UNCOMPRESSED"
Prefix: "raw/"

Outputs:
ApiEndpoint:
Description: API Gateway endpoint URL
Value:
Fn::Sub: "https://${MyApiGateway}.execute-api.${AWS::Region}.amazonaws.com/dev/"

0 comments on commit 78c546c

Please sign in to comment.