Skip to content

Commit

Permalink
Perform ETL events via Step Functions (#757)
Browse files Browse the repository at this point in the history
## Ticket

Resolves #744

## Changes

- Changes the ETL job from invoking ECS directly, to instead invoking
Step Functions which then invokes ECS
- Makes some changes to the scheduled changes to increase their
distinction from the events jobs, and the readability of their IAM roles

## Context for reviewers

This is a similar PR to
#745. In-fact they use
many of the same resources. To goal of this PR is to run ETL events via
Step Functions. Step Functions create a tracking layer that's not
available when invoking ECS directly. This tracking layer allows you to
see the success and failure status of your jobs.

## Testing

navapbc/platform-test#136
  • Loading branch information
coilysiren authored Oct 18, 2024
1 parent 07a4237 commit 077a980
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 49 deletions.
99 changes: 69 additions & 30 deletions infra/modules/service/events_jobs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,18 @@ resource "aws_cloudwatch_event_target" "document_upload_jobs" {

target_id = "${local.cluster_name}-${each.key}"
rule = aws_cloudwatch_event_rule.file_upload_jobs[each.key].name
arn = aws_ecs_cluster.cluster.arn
arn = aws_sfn_state_machine.file_upload_jobs[each.key].arn
role_arn = aws_iam_role.events.arn

ecs_target {
task_definition_arn = aws_ecs_task_definition.app.arn
launch_type = "FARGATE"
propagate_tags = "TASK_DEFINITION"

# Configuring Network Configuration is required when the task definition uses the awsvpc network mode.
network_configuration {
subnets = var.private_subnet_ids
security_groups = [aws_security_group.app.id]
}
}

input_transformer {
input_paths = {
bucket_name = "$.detail.bucket.name",
object_key = "$.detail.object.key",
}

# When triggering the ECS task, override the command to run in the container to the
# command specified by the file_upload_job config. To do this define an input_template
# that transforms the input S3 event:
# When triggering the ECS task (via step functions), override the command to run in
# the container to the command specified by the file_upload_job config. To do this
# define an input_template that transforms the input S3 event:
# {
# detail: {
# bucket: { name: "mybucket" },
Expand All @@ -72,36 +60,87 @@ resource "aws_cloudwatch_event_target" "document_upload_jobs" {
# }
# to match the Amazon ECS RunTask TaskOverride structure:
# {
# containerOverrides: [{
# name: "container_name",
# command: ["command", "to", "run"]
# }]
# task_command = ["command", "to", "run"]
# }
# (see https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html#targets-specifics-ecs-task
# and https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskOverride.html)
#
# The task command can optionally use the bucket name or the object key in the command
# by including the placeholder values "<bucket_name>" or "<object_key>", e.g.
# {
# containerOverrides: [{
# name: "container_name",
# command: ["process_file.sh", "--bucket", "<bucket_name>", "--object", "<object_key>"]
# }]
# task_command: ["process_file.sh", "--bucket", "<bucket_name>", "--object", "<object_key>"]
# }
#
# Since jsonencode will cause the string "<bucket_name>" to turn into
# "U+003Cbucket_nameU+003E" and "<object_key>" to turn into "U+003Cobject_keyU+003E",
# we need to replace the unicode characters U+003C and U+003E with < and > to reverse
# the encoding.
#
# (see https://developer.hashicorp.com/terraform/language/functions/jsonencode and
# https://github.com/hashicorp/terraform/pull/18871)
input_template = replace(replace(jsonencode({
containerOverrides = [
{
name = local.container_name,
command = each.value.task_command
}
]
task_command = each.value.task_command
}), "\\u003c", "<"), "\\u003e", ">")
}
}

resource "aws_sfn_state_machine" "file_upload_jobs" {
for_each = var.file_upload_jobs

name = "${var.service_name}-${each.key}"
role_arn = aws_iam_role.workflow_orchestrator.arn

definition = jsonencode({
"StartAt" : "RunTask",
"States" : {
"RunTask" : {
"Type" : "Task",
# docs: https://docs.aws.amazon.com/step-functions/latest/dg/connect-ecs.html
"Resource" : "arn:aws:states:::ecs:runTask.sync",
"Parameters" : {
"Cluster" : aws_ecs_cluster.cluster.arn,
"TaskDefinition" : aws_ecs_task_definition.app.arn,
"LaunchType" : "FARGATE",
"NetworkConfiguration" : {
"AwsvpcConfiguration" : {
"Subnets" : var.private_subnet_ids,
"SecurityGroups" : [aws_security_group.app.id],
}
},
"Overrides" : {
"ContainerOverrides" : [
{
"Name" : local.container_name,
"Command.$" : "$.task_command"
}
]
}
},
"End" : true
}
}
})

logging_configuration {
log_destination = "${aws_cloudwatch_log_group.file_upload_jobs[each.key].arn}:*"
include_execution_data = true
level = "ERROR"
}

tracing_configuration {
enabled = true
}
}

resource "aws_cloudwatch_log_group" "file_upload_jobs" {
for_each = var.file_upload_jobs

name_prefix = "/aws/vendedlogs/states/${var.service_name}-${each.key}/file-upload-jobs/"

# Conservatively retain logs for 5 years.
# Looser requirements may allow shorter retention periods
retention_in_days = 1827

# TODO(https://github.com/navapbc/template-infra/issues/164) Encrypt with customer managed KMS key
# checkov:skip=CKV_AWS_158:Encrypt service logs with customer key in future work
}
45 changes: 27 additions & 18 deletions infra/modules/service/events_role.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,37 @@ resource "aws_iam_policy" "run_task" {
}

data "aws_iam_policy_document" "run_task" {

statement {
effect = "Allow"
actions = ["ecs:RunTask"]
resources = ["${aws_ecs_task_definition.app.arn_without_revision}:*"]
condition {
test = "ArnLike"
variable = "ecs:cluster"
values = [aws_ecs_cluster.cluster.arn]
sid = "StepFunctionsEvents"
actions = [
"events:PutTargets",
"events:PutRule",
"events:DescribeRule",
]
resources = ["arn:aws:events:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule"]
}

dynamic "statement" {
for_each = aws_sfn_state_machine.file_upload_jobs

content {
actions = [
"states:StartExecution",
]
resources = [statement.value.arn]
}
}

statement {
effect = "Allow"
actions = ["iam:PassRole"]
resources = [
aws_iam_role.task_executor.arn,
aws_iam_role.app_service.arn,
]
condition {
test = "StringLike"
variable = "iam:PassedToService"
values = ["ecs-tasks.amazonaws.com"]
dynamic "statement" {
for_each = aws_sfn_state_machine.file_upload_jobs

content {
actions = [
"states:DescribeExecution",
"states:StopExecution",
]
resources = ["${statement.value.arn}:*"]
}
}
}
2 changes: 1 addition & 1 deletion infra/modules/service/scheduled_jobs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ resource "aws_sfn_state_machine" "scheduled_jobs" {
resource "aws_cloudwatch_log_group" "scheduled_jobs" {
for_each = var.scheduled_jobs

name_prefix = "/aws/vendedlogs/states/${var.service_name}-${each.key}"
name_prefix = "/aws/vendedlogs/states/${var.service_name}-${each.key}/scheduled-jobs/"

# Conservatively retain logs for 5 years.
# Looser requirements may allow shorter retention periods
Expand Down

0 comments on commit 077a980

Please sign in to comment.