Skip to content

Commit

Permalink
feat: support virtual sharding and stream failure processing (#1261) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Shreya authored Nov 26, 2019
1 parent bf5720d commit 27b5934
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 115 deletions.
14 changes: 13 additions & 1 deletion examples/2016-10-31/stream_processor/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,33 @@ Resources:
Handler: index.handler
Runtime: nodejs10.x
CodeUri: src/
Policies:
- SNSPublishMessagePolicy:
TopicName: !GetAtt MySnsTopic.TopicName
Events:
Stream:
Type: Kinesis
Properties:
Stream: !GetAtt Stream.Arn
MaximumBatchingWindowInSeconds: 20
ParallelizationFactor: 8
MaximumRetryAttempts: 100
BisectBatchOnFunctionError: true
MaximumRecordAgeInSeconds: 604800
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Destination: !Ref MySnsTopic

Stream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1

Outputs:
MySnsTopic:
Type: AWS::SNS::Topic

Outputs:
KinesisStream:
Description: "Kinesis Stream that will trigger Lambda function upon new records"
Value: !GetAtt Stream.Arn
2 changes: 1 addition & 1 deletion samtranslator/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.15.1'
__version__ = '1.16.0'
13 changes: 12 additions & 1 deletion samtranslator/model/eventsources/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ class PullEventSource(ResourceMacro):
'BatchSize': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(False, is_str()),
'Enabled': PropertyType(False, is_type(bool)),
'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int))
'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int)),
'MaximumRetryAttempts': PropertyType(False, is_type(int)),
'BisectBatchOnFunctionError': PropertyType(False, is_type(bool)),
'MaximumRecordAgeInSeconds': PropertyType(False, is_type(int)),
'DestinationConfig': PropertyType(False, is_type(dict)),
'ParallelizationFactor': PropertyType(False, is_type(int))
}

def get_policy_arn(self):
Expand Down Expand Up @@ -66,6 +71,12 @@ def to_cloudformation(self, **kwargs):
lambda_eventsourcemapping.BatchSize = self.BatchSize
lambda_eventsourcemapping.Enabled = self.Enabled
lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor

if 'Condition' in function.resource_attributes:
lambda_eventsourcemapping.set_resource_attribute('Condition', function.resource_attributes['Condition'])

Expand Down
5 changes: 5 additions & 0 deletions samtranslator/model/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class LambdaEventSourceMapping(Resource):
'EventSourceArn': PropertyType(True, is_str()),
'FunctionName': PropertyType(True, is_str()),
'MaximumBatchingWindowInSeconds': PropertyType(False, is_type(int)),
'MaximumRetryAttempts': PropertyType(False, is_type(int)),
'BisectBatchOnFunctionError': PropertyType(False, is_type(bool)),
'MaximumRecordAgeInSeconds': PropertyType(False, is_type(int)),
'DestinationConfig': PropertyType(False, is_type(dict)),
'ParallelizationFactor': PropertyType(False, is_type(int)),
'StartingPosition': PropertyType(False, is_str())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: EventSourceMapping example with MaximumBatchingWindowInSeconds property

Parameters:
MyBatchingWindowParam:
Expand All @@ -23,6 +22,9 @@ Resources:
}
}
Runtime: nodejs8.10
Policies:
- SQSSendMessagePolicy:
QueueName: !GetAtt MySqsQueue.QueueName
Events:
Stream:
Type: Kinesis
Expand All @@ -42,7 +44,14 @@ Resources:
Stream: !GetAtt DynamoDBTable.StreamArn
BatchSize: 100
MaximumBatchingWindowInSeconds: !Ref MyBatchingWindowParam
ParallelizationFactor: 8
MaximumRetryAttempts: 100
BisectBatchOnFunctionError: true
MaximumRecordAgeInSeconds: 86400
StartingPosition: TRIM_HORIZON
DestinationConfig:
OnFailure:
Destination: !GetAtt MySqsQueue.Arn

KinesisStream:
Type: AWS::Kinesis::Stream
Expand All @@ -66,4 +75,7 @@ Resources:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
StreamSpecification:
StreamViewType: NEW_IMAGE
StreamViewType: NEW_IMAGE

MySqsQueue:
Type: AWS::SQS::Queue
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
}
},
"Resources": {
"MySqsQueue": {
"Type": "AWS::SQS::Queue"
},
"MyFunctionForBatchingExampleStream": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand All @@ -24,6 +27,30 @@
"StartingPosition": "LATEST"
}
},
"DynamoDBTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"KeyType": "HASH",
"AttributeName": "id"
}
],
"StreamSpecification": {
"StreamViewType": "NEW_IMAGE"
},
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"ProvisionedThroughput": {
"WriteCapacityUnits": 5,
"ReadCapacityUnits": 5
}
}
},
"MyFunctionForBatchingExampleRole": {
"Type": "AWS::IAM::Role",
"Properties": {
Expand All @@ -32,6 +59,34 @@
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole"
],
"Policies": [
{
"PolicyName": "MyFunctionForBatchingExampleRolePolicy0",
"PolicyDocument": {
"Statement": [
{
"Action": [
"sqs:SendMessage*"
],
"Resource": {
"Fn::Sub": [
"arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${queueName}",
{
"queueName": {
"Fn::GetAtt": [
"MySqsQueue",
"QueueName"
]
}
}
]
},
"Effect": "Allow"
}
]
}
}
],
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
Expand Down Expand Up @@ -62,17 +117,31 @@
"MaximumBatchingWindowInSeconds": {
"Ref": "MyBatchingWindowParam"
},
"BatchSize": 100,
"FunctionName": {
"Ref": "MyFunctionForBatchingExample"
},
"StartingPosition": "TRIM_HORIZON",
"MaximumRecordAgeInSeconds": 86400,
"BatchSize": 100,
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Fn::GetAtt": [
"MySqsQueue",
"Arn"
]
}
}
},
"EventSourceArn": {
"Fn::GetAtt": [
"DynamoDBTable",
"StreamArn"
]
}
},
"StartingPosition": "TRIM_HORIZON",
"ParallelizationFactor": 8,
"MaximumRetryAttempts": 100,
"BisectBatchOnFunctionError": true
}
},
"MyFunctionForBatchingExample": {
Expand All @@ -97,30 +166,6 @@
]
}
},
"DynamoDBTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"KeySchema": [
{
"KeyType": "HASH",
"AttributeName": "id"
}
],
"StreamSpecification": {
"StreamViewType": "NEW_IMAGE"
},
"AttributeDefinitions": [
{
"AttributeName": "id",
"AttributeType": "S"
}
],
"ProvisionedThroughput": {
"WriteCapacityUnits": 5,
"ReadCapacityUnits": 5
}
}
},
"MyFunctionForBatchingExampleStreamEvent": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
Expand All @@ -145,13 +190,5 @@
"ShardCount": 1
}
}
},
"Parameters": {
"MyBatchingWindowParam": {
"Default": 45,
"Type": "Number",
"Description": "parameter for batching window in seconds"
}
},
"Description": "EventSourceMapping example with MaximumBatchingWindowInSeconds property"
}
}
Loading

0 comments on commit 27b5934

Please sign in to comment.