Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dead letter queue to handle failed messages #687

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions infra/lambda/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ ruby '3.3.0'
gem 'dynamoid', '3.8.0'
gem 'aws-sdk-dynamodb'
gem 'aws-sdk-sqs'
gem 'aws-sdk-sns'
gem 'superconfig'
gem 'zeitwerk'
51 changes: 51 additions & 0 deletions infra/lambda/dlq_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

require 'json'
require 'dynamoid'
require 'aws-sdk-dynamodb'
require 'aws-sdk-sns'
require 'superconfig'
require 'zeitwerk'

Dynamoid.configure do |config|
config.region = ENV.fetch('AWS_REGION', 'us-west-2')
config.namespace = nil
end

EnvConfig = SuperConfig.new do
mandatory :DYNAMO_REGISTRATIONS_TABLE, :string
mandatory :REGISTRATION_HISTORY_DYNAMO_TABLE, :string
mandatory :SNS_TOPIC_ARN, :string
end

loader = Zeitwerk::Loader.new
loader.push_dir('./registration_lib')
loader.setup

def lambda_handler(event:, context:)
# Parse the incoming event data
step_data = JSON.parse(event['Records'][0]['body'])

attendee_id = step_data['attendee_id']
competition_id = step_data['competition_id']
user_id = step_data['user_id']

# Find the registration in DynamoDB
registration = Registration.find(attendee_id)

# Initialize SNS client
sns = Aws::SNS::Client.new(region: ENV['AWS_REGION'])

# Handle the result based on registration status
if registration
message = "Competitor with user ID #{user_id} for competition #{competition_id} was registered, meaning the error occurred after registration. This should be fixed, but can be ignored temporally."
else
message = "Competitor with user ID #{user_id} for competition #{competition_id} was NOT registered. This should be fixed as soon as possible."
end

# Publish message to SNS topic
sns.publish({
topic_arn: ENV['SNS_TOPIC_ARN'],
message: message
})
end
17 changes: 17 additions & 0 deletions infra/lambda/package_dlq_lambda.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
bundle install --path vendor/bundle
# remove old zip if it exists
rm -f registration_status.zip
# We include the models here so we don't need to maintain two versions
# We have to copy them over, because we want to maintain the paths for the vendor folder, but not for the models
lib_folder=registration_lib
mkdir $lib_folder
cp ../../app/models/registration.rb ./$lib_folder/registration.rb
cp ../../app/models/registration_history.rb ./$lib_folder/registration_history.rb
cp ../../lib/lane.rb ./$lib_folder/lane.rb
cp ../../lib/history.rb ./$lib_folder/history.rb
zip -r registration_status.zip dlq_handler.rb ./$lib_folder/*.rb vendor
# remove lib files again
rm -rf $lib_folder
# remove any bundler or vendor files
rm -rf .bundle
rm -rf vendor
File renamed without changes.
8 changes: 8 additions & 0 deletions infra/shared/sqs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ resource "aws_sqs_queue" "this" {
receive_wait_time_seconds = 1 # The time the queue waits until it sends messages when polling to better batch message
visibility_timeout_seconds = 60 # The time until the message is set to be available again to be picked up by another worker
# because the initial worker might have died
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.deadletter-queue.arn
maxReceiveCount = 4
})
}

resource "aws_sqs_queue" "deadletter-queue" {
name = "registrations-dlq"
}

output "queue" {
Expand Down
28 changes: 28 additions & 0 deletions infra/staging/lambda.tf
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
resource "aws_lambda_function" "dlq_lambda" {
filename = "./lambda/registration_dlq.zip"
function_name = "${var.name_prefix}-dlq-handler-staging"
role = aws_iam_role.lambda_role.arn
handler = "dlq_handler.lambda_handler"
runtime = "ruby3.3"
source_code_hash = filebase64sha256("./lambda/registration_dlq.zip")
timeout = 10
environment {
variables = {
DYNAMO_REGISTRATIONS_TABLE = aws_dynamodb_table.registrations.name
REGISTRATION_HISTORY_DYNAMO_TABLE = aws_dynamodb_table.registration_history.name
}
}
}

resource "aws_lambda_event_source_mapping" "dlq" {
event_source_arn = aws_sqs_queue.deadletter-queue.arn
function_name = aws_lambda_function.dlq_lambda.arn
}

resource "aws_lambda_function" "registration_status_lambda" {
filename = "./lambda/registration_status.zip"
function_name = "${var.name_prefix}-poller-lambda-staging"
Expand Down Expand Up @@ -71,6 +92,13 @@ data "aws_iam_policy_document" "lambda_policy" {
]
resources = [aws_sqs_queue.this.arn]
}
statement {
effect = "Allow"
actions = [
"sqs:*",
]
resources = [aws_sqs_queue.deadletter-queue.arn]
}
}

resource "aws_iam_role_policy" "lambda_policy_attachment" {
Expand Down
8 changes: 8 additions & 0 deletions infra/staging/sqs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ resource "aws_sqs_queue" "this" {
receive_wait_time_seconds = 1 # The time the queue waits until it sends messages when polling to better batch message
visibility_timeout_seconds = 60 # The time until the message is set to be available again to be picked up by another worker
# because the initial worker might have died
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.deadletter-queue.arn
maxReceiveCount = 4
})
tags = {
Env = "staging"
}
}

resource "aws_sqs_queue" "deadletter-queue" {
name = "registrations-staging-dlq"
}
22 changes: 22 additions & 0 deletions infra/worker/lambda.tf
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
resource "aws_lambda_function" "dlq_lambda" {
filename = "./lambda/registration_dlq.zip"
function_name = "${var.name_prefix}-dlq-handler-staging"
role = aws_iam_role.lambda_role.arn
handler = "dlq_handler.lambda_handler"
runtime = "ruby3.3"
source_code_hash = filebase64sha256("./lambda/registration_dlq.zip")
timeout = 10
environment {
variables = {
DYNAMO_REGISTRATIONS_TABLE = var.shared_resources.dynamo_registration_table.name
REGISTRATION_HISTORY_DYNAMO_TABLE = var.shared_resources.dynamo_registration_history_table.name
}
}
}

resource "aws_lambda_event_source_mapping" "dlq" {
event_source_arn = var.shared_resources.deadletter-queue.arn
function_name = aws_lambda_function.dlq_lambda.arn
}


resource "aws_lambda_function" "registration_status_lambda" {
filename = "./lambda/registration_status.zip"
function_name = "${var.name_prefix}-poller-lambda-prod"
Expand Down
Loading