From 0064cad79adc74561d790ae4be66328f893602e7 Mon Sep 17 00:00:00 2001 From: FinnIckler Date: Fri, 13 Sep 2024 16:49:57 +0200 Subject: [PATCH 1/2] add DLQ --- infra/shared/sqs.tf | 8 ++++++++ infra/staging/sqs.tf | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/infra/shared/sqs.tf b/infra/shared/sqs.tf index 9d1bee1e..29694290 100644 --- a/infra/shared/sqs.tf +++ b/infra/shared/sqs.tf @@ -10,6 +10,14 @@ resource "aws_sqs_queue" "this" { receive_wait_time_seconds = 1 # The time the queue waits until it sends messages when polling to better batch message visibility_timeout_seconds = 60 # The time until the message is set to be available again to be picked up by another worker # because the initial worker might have died + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.deadletter-queue.arn + maxReceiveCount = 4 + }) +} + +resource "aws_sqs_queue" "deadletter-queue" { + name = "registrations-dlq" } output "queue" { diff --git a/infra/staging/sqs.tf b/infra/staging/sqs.tf index 5f85b255..a37a4c53 100644 --- a/infra/staging/sqs.tf +++ b/infra/staging/sqs.tf @@ -10,7 +10,15 @@ resource "aws_sqs_queue" "this" { receive_wait_time_seconds = 1 # The time the queue waits until it sends messages when polling to better batch message visibility_timeout_seconds = 60 # The time until the message is set to be available again to be picked up by another worker # because the initial worker might have died + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.deadletter-queue.arn + maxReceiveCount = 4 + }) tags = { Env = "staging" } } + +resource "aws_sqs_queue" "deadletter-queue" { + name = "registrations-staging-dlq" +} From dfebd4e0093fb067644bc586786ef561aff31e9c Mon Sep 17 00:00:00 2001 From: FinnIckler Date: Fri, 13 Sep 2024 16:50:13 +0200 Subject: [PATCH 2/2] add deadletterqueue lambda function --- infra/lambda/Gemfile | 1 + infra/lambda/dlq_handler.rb | 51 +++++++++++++++++++ infra/lambda/package_dlq_lambda.sh | 17 +++++++ ...ckage_lambda.sh => package_poll_lambda.sh} | 0 infra/staging/lambda.tf | 28 ++++++++++ infra/worker/lambda.tf | 22 ++++++++ 6 files changed, 119 insertions(+) create mode 100644 infra/lambda/dlq_handler.rb create mode 100644 infra/lambda/package_dlq_lambda.sh rename infra/lambda/{package_lambda.sh => package_poll_lambda.sh} (100%) diff --git a/infra/lambda/Gemfile b/infra/lambda/Gemfile index c7a7d920..ed4f2acd 100644 --- a/infra/lambda/Gemfile +++ b/infra/lambda/Gemfile @@ -7,5 +7,6 @@ ruby '3.3.0' gem 'dynamoid', '3.8.0' gem 'aws-sdk-dynamodb' gem 'aws-sdk-sqs' +gem 'aws-sdk-sns' gem 'superconfig' gem 'zeitwerk' diff --git a/infra/lambda/dlq_handler.rb b/infra/lambda/dlq_handler.rb new file mode 100644 index 00000000..43257bf4 --- /dev/null +++ b/infra/lambda/dlq_handler.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require 'json' +require 'dynamoid' +require 'aws-sdk-dynamodb' +require 'aws-sdk-sns' +require 'superconfig' +require 'zeitwerk' + +Dynamoid.configure do |config| + config.region = ENV.fetch('AWS_REGION', 'us-west-2') + config.namespace = nil +end + +EnvConfig = SuperConfig.new do + mandatory :DYNAMO_REGISTRATIONS_TABLE, :string + mandatory :REGISTRATION_HISTORY_DYNAMO_TABLE, :string + mandatory :SNS_TOPIC_ARN, :string +end + +loader = Zeitwerk::Loader.new +loader.push_dir('./registration_lib') +loader.setup + +def lambda_handler(event:, context:) + # Parse the incoming event data + step_data = JSON.parse(event['Records'][0]['body']) + + attendee_id = step_data['attendee_id'] + competition_id = step_data['competition_id'] + user_id = step_data['user_id'] + + # Find the registration in DynamoDB + registration = Registration.find(attendee_id) + + # Initialize SNS client + sns = Aws::SNS::Client.new(region: ENV['AWS_REGION']) + + # Handle the result based on registration status + if registration + message = "Competitor with user ID #{user_id} for competition #{competition_id} was registered, meaning the error occurred after registration. This should be fixed, but can be ignored temporally." + else + message = "Competitor with user ID #{user_id} for competition #{competition_id} was NOT registered. This should be fixed as soon as possible." + end + + # Publish message to SNS topic + sns.publish({ + topic_arn: ENV['SNS_TOPIC_ARN'], + message: message + }) +end diff --git a/infra/lambda/package_dlq_lambda.sh b/infra/lambda/package_dlq_lambda.sh new file mode 100644 index 00000000..a0af76ce --- /dev/null +++ b/infra/lambda/package_dlq_lambda.sh @@ -0,0 +1,17 @@ +bundle install --path vendor/bundle +# remove old zip if it exists +rm -f registration_status.zip +# We include the models here so we don't need to maintain two versions +# We have to copy them over, because we want to maintain the paths for the vendor folder, but not for the models +lib_folder=registration_lib +mkdir $lib_folder +cp ../../app/models/registration.rb ./$lib_folder/registration.rb +cp ../../app/models/registration_history.rb ./$lib_folder/registration_history.rb +cp ../../lib/lane.rb ./$lib_folder/lane.rb +cp ../../lib/history.rb ./$lib_folder/history.rb +zip -r registration_status.zip dlq_handler.rb ./$lib_folder/*.rb vendor +# remove lib files again +rm -rf $lib_folder +# remove any bundler or vendor files +rm -rf .bundle +rm -rf vendor diff --git a/infra/lambda/package_lambda.sh b/infra/lambda/package_poll_lambda.sh similarity index 100% rename from infra/lambda/package_lambda.sh rename to infra/lambda/package_poll_lambda.sh diff --git a/infra/staging/lambda.tf b/infra/staging/lambda.tf index 723f2379..f96d24b3 100644 --- a/infra/staging/lambda.tf +++ b/infra/staging/lambda.tf @@ -1,3 +1,24 @@ +resource "aws_lambda_function" "dlq_lambda" { + filename = "./lambda/registration_dlq.zip" + function_name = "${var.name_prefix}-dlq-handler-staging" + role = aws_iam_role.lambda_role.arn + handler = "dlq_handler.lambda_handler" + runtime = "ruby3.3" + source_code_hash = filebase64sha256("./lambda/registration_dlq.zip") + timeout = 10 + environment { + variables = { + DYNAMO_REGISTRATIONS_TABLE = aws_dynamodb_table.registrations.name + REGISTRATION_HISTORY_DYNAMO_TABLE = aws_dynamodb_table.registration_history.name + } + } +} + +resource "aws_lambda_event_source_mapping" "dlq" { + event_source_arn = aws_sqs_queue.deadletter-queue.arn + function_name = aws_lambda_function.dlq_lambda.arn +} + resource "aws_lambda_function" "registration_status_lambda" { filename = "./lambda/registration_status.zip" function_name = "${var.name_prefix}-poller-lambda-staging" @@ -71,6 +92,13 @@ data "aws_iam_policy_document" "lambda_policy" { ] resources = [aws_sqs_queue.this.arn] } + statement { + effect = "Allow" + actions = [ + "sqs:*", + ] + resources = [aws_sqs_queue.deadletter-queue.arn] + } } resource "aws_iam_role_policy" "lambda_policy_attachment" { diff --git a/infra/worker/lambda.tf b/infra/worker/lambda.tf index bedd4778..923a3a40 100644 --- a/infra/worker/lambda.tf +++ b/infra/worker/lambda.tf @@ -1,3 +1,25 @@ +resource "aws_lambda_function" "dlq_lambda" { + filename = "./lambda/registration_dlq.zip" + function_name = "${var.name_prefix}-dlq-handler-staging" + role = aws_iam_role.lambda_role.arn + handler = "dlq_handler.lambda_handler" + runtime = "ruby3.3" + source_code_hash = filebase64sha256("./lambda/registration_dlq.zip") + timeout = 10 + environment { + variables = { + DYNAMO_REGISTRATIONS_TABLE = var.shared_resources.dynamo_registration_table.name + REGISTRATION_HISTORY_DYNAMO_TABLE = var.shared_resources.dynamo_registration_history_table.name + } + } +} + +resource "aws_lambda_event_source_mapping" "dlq" { + event_source_arn = var.shared_resources.deadletter-queue.arn + function_name = aws_lambda_function.dlq_lambda.arn +} + + resource "aws_lambda_function" "registration_status_lambda" { filename = "./lambda/registration_status.zip" function_name = "${var.name_prefix}-poller-lambda-prod"