Skip to content

Commit

Permalink
fix!: Remove FIFO queues (#4072)
Browse files Browse the repository at this point in the history
## Description
Removes FIFO queues as described in #4068 

## Breaking
The change will re-create queues in case FIFO is configured. Impact will
be that queued messages are lost


## Test

- [x] default example
- [x] multi runner example

---------

Co-authored-by: forest-pr|bot <forest-pr[bot]@users.noreply.github.com>
Co-authored-by: philips-labs-pr|bot <philips-labs-pr[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 20, 2024
1 parent a2280f7 commit 2f20a8b
Show file tree
Hide file tree
Showing 22 changed files with 22 additions and 101 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ Talk to the forestkeepers in the `runners-channel` on Slack.
| <a name="input_enable_cloudwatch_agent"></a> [enable\_cloudwatch\_agent](#input\_enable\_cloudwatch\_agent) | Enables the cloudwatch agent on the ec2 runner instances. The runner uses a default config that can be overridden via `cloudwatch_config`. | `bool` | `true` | no |
| <a name="input_enable_ephemeral_runners"></a> [enable\_ephemeral\_runners](#input\_enable\_ephemeral\_runners) | Enable ephemeral runners, runners will only be used once. | `bool` | `false` | no |
| <a name="input_enable_event_rule_binaries_syncer"></a> [enable\_event\_rule\_binaries\_syncer](#input\_enable\_event\_rule\_binaries\_syncer) | DEPRECATED: Replaced by `state_event_rule_binaries_syncer`. | `bool` | `null` | no |
| <a name="input_enable_fifo_build_queue"></a> [enable\_fifo\_build\_queue](#input\_enable\_fifo\_build\_queue) | Enable a FIFO queue to keep the order of events received by the webhook. Recommended for repo level runners. | `bool` | `false` | no |
| <a name="input_enable_jit_config"></a> [enable\_jit\_config](#input\_enable\_jit\_config) | Overwrite the default behavior for JIT configuration. By default JIT configuration is enabled for ephemeral runners and disabled for non-ephemeral runners. In case of GHES check first if the JIT config API is avaialbe. In case you upgradeing from 3.x to 4.x you can set `enable_jit_config` to `false` to avoid a breaking change when having your own AMI. | `bool` | `null` | no |
| <a name="input_enable_job_queued_check"></a> [enable\_job\_queued\_check](#input\_enable\_job\_queued\_check) | Only scale if the job event received by the scale up lambda is in the queued state. By default enabled for non ephemeral runners and disabled for ephemeral. Set this variable to overwrite the default behavior. | `bool` | `null` | no |
| <a name="input_enable_managed_runner_security_group"></a> [enable\_managed\_runner\_security\_group](#input\_enable\_managed\_runner\_security\_group) | Enables creation of the default managed security group. Unmanaged security groups can be specified via `runner_additional_security_group_ids`. | `bool` | `true` | no |
Expand Down Expand Up @@ -225,7 +224,6 @@ Talk to the forestkeepers in the `runners-channel` on Slack.
| <a name="input_runners_maximum_count"></a> [runners\_maximum\_count](#input\_runners\_maximum\_count) | The maximum number of runners that will be created. | `number` | `3` | no |
| <a name="input_runners_scale_down_lambda_memory_size"></a> [runners\_scale\_down\_lambda\_memory\_size](#input\_runners\_scale\_down\_lambda\_memory\_size) | Memory size limit in MB for scale-down lambda. | `number` | `512` | no |
| <a name="input_runners_scale_down_lambda_timeout"></a> [runners\_scale\_down\_lambda\_timeout](#input\_runners\_scale\_down\_lambda\_timeout) | Time out for the scale down lambda in seconds. | `number` | `60` | no |
| <a name="input_runners_scale_up_Lambda_memory_size"></a> [runners\_scale\_up\_Lambda\_memory\_size](#input\_runners\_scale\_up\_Lambda\_memory\_size) | Memory size limit in MB for scale-up lambda. | `number` | `null` | no |
| <a name="input_runners_scale_up_lambda_memory_size"></a> [runners\_scale\_up\_lambda\_memory\_size](#input\_runners\_scale\_up\_lambda\_memory\_size) | Memory size limit in MB for scale-up lambda. | `number` | `512` | no |
| <a name="input_runners_scale_up_lambda_timeout"></a> [runners\_scale\_up\_lambda\_timeout](#input\_runners\_scale\_up\_lambda\_timeout) | Time out for the scale up lambda in seconds. | `number` | `30` | no |
| <a name="input_runners_ssm_housekeeper"></a> [runners\_ssm\_housekeeper](#input\_runners\_ssm\_housekeeper) | Configuration for the SSM housekeeper lambda. This lambda deletes token / JIT config from SSM.<br/><br/> `schedule_expression`: is used to configure the schedule for the lambda.<br/> `enabled`: enable or disable the lambda trigger via the EventBridge.<br/> `lambda_memory_size`: lambda memery size limit.<br/> `lambda_timeout`: timeout for the lambda in seconds.<br/> `config`: configuration for the lambda function. Token path will be read by default from the module. | <pre>object({<br/> schedule_expression = optional(string, "rate(1 day)")<br/> enabled = optional(bool, true)<br/> lambda_memory_size = optional(number, 512)<br/> lambda_timeout = optional(number, 60)<br/> config = object({<br/> tokenPath = optional(string)<br/> minimumDaysOld = optional(number, 1)<br/> dryRun = optional(bool, false)<br/> })<br/> })</pre> | <pre>{<br/> "config": {}<br/>}</pre> | no |
Expand Down
1 change: 0 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ You can configure runners to be ephemeral, in which case runners will be used on
- The scale down lambda is still active, and should only remove orphan instances. But there is no strict check in place. So ensure you configure the `minimum_running_time_in_minutes` to a value that is high enough to get your runner booted and connected to avoid it being terminated before executing a job.
- The messages sent from the webhook lambda to the scale-up lambda are by default delayed by SQS, to give available runners a chance to start the job before the decision is made to scale more runners. For ephemeral runners there is no need to wait. Set `delay_webhook_event` to `0`.
- All events in the queue will lead to a new runner created by the lambda. By setting `enable_job_queued_check` to `true` you can enforce a rule of only creating a runner if the event has a correlated queued job. Setting this can avoid creating useless runners. For example, a job getting cancelled before a runner was created or if the job was already picked up by another runner. We suggest using this in combination with a pool.
- To ensure runners are created in the same order GitHub sends the events, by default we use a FIFO queue. This is mainly relevant for repo level runners. For ephemeral runners you can set `enable_fifo_build_queue` to `false`.
- Errors related to scaling should be retried via SQS. You can configure `job_queue_retention_in_seconds` and `redrive_build_queue` to tune the behavior. We have no mechanism to avoid events never being processed, which means potentially no runner gets created and the job in GitHub times out in 6 hours.

The example for [ephemeral runners](examples/ephemeral.md) is based on the [default example](examples/default.md). Have look at the diff to see the major configuration differences.
Expand Down
3 changes: 0 additions & 3 deletions examples/arm64/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ module "runners" {
delay_webhook_event = 5
runners_maximum_count = 1

# set up a fifo queue to remain order
enable_fifo_build_queue = true

# override scaling down
scale_down_schedule_expression = "cron(* * * * ? *)"
}
Expand Down
3 changes: 0 additions & 3 deletions examples/default/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ module "runners" {
delay_webhook_event = 5
runners_maximum_count = 2

# set up a fifo queue to remain order
enable_fifo_build_queue = true

# override scaling down
scale_down_schedule_expression = "cron(* * * * ? *)"

Expand Down
1 change: 0 additions & 1 deletion examples/multi-runner/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ module "runners" {
# labelMatchers = [["self-hosted", "linux", "x64", "amazon"]]
# exactMatch = false
# }
# fifo = true
# delay_webhook_event = 0
# runner_config = {
# runner_os = "linux"
Expand Down
17 changes: 0 additions & 17 deletions lambdas/functions/control-plane/src/aws/sqs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,6 @@ describe('Publish message to SQS', () => {
});
});

it('should publish message to SQS Fifo queue', async () => {
// setup
mockSQSClient.on(SendMessageCommand).resolves({
MessageId: '123',
});

// act
await publishMessage('test', 'https://sqs.eu-west-1.amazonaws.com/123456789/queued-builds.fifo');

// assert
expect(mockSQSClient).toHaveReceivedCommandWith(SendMessageCommand, {
QueueUrl: 'https://sqs.eu-west-1.amazonaws.com/123456789/queued-builds.fifo',
MessageBody: 'test',
MessageGroupId: '1', // Fifo queue
});
});

it('should log error if queue URL not found', async () => {
// setup
const logErrorSpy = jest.spyOn(logger, 'error');
Expand Down
1 change: 0 additions & 1 deletion lambdas/functions/control-plane/src/aws/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ export async function publishMessage(message: string, queueUrl: string, delayInS
QueueUrl: queueUrl,
MessageBody: message,
DelaySeconds: delayInSeconds,
MessageGroupId: queueUrl.endsWith('.fifo') ? '1' : undefined,
});

try {
Expand Down
2 changes: 1 addition & 1 deletion lambdas/functions/control-plane/src/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const sqsEvent = {
messageAttributes: {},
md5OfBody: '4aef3bd70526e152e86426a0938cbec6',
eventSource: 'aws:sqs',
eventSourceARN: 'arn:aws:sqs:us-west-2:916370655143:cicddev-queued-builds.fifo',
eventSourceARN: 'arn:aws:sqs:us-west-2:916370655143:cicddev-queued-builds',
awsRegion: 'us-west-2',
},
],
Expand Down
5 changes: 0 additions & 5 deletions lambdas/functions/webhook/src/ConfigLoader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ describe('ConfigLoader Tests', () => {
{
id: '1',
arn: 'arn:aws:sqs:us-east-1:123456789012:queue1',
fifo: false,
matcherConfig: {
labelMatchers: [['label1', 'label2']],
exactMatch: true,
Expand Down Expand Up @@ -100,7 +99,6 @@ describe('ConfigLoader Tests', () => {
{
id: '1',
arn: 'arn:aws:sqs:us-east-1:123456789012:queue1',
fifo: false,
matcherConfig: {
labelMatchers: [['label1', 'label2']],
exactMatch: true,
Expand Down Expand Up @@ -131,7 +129,6 @@ describe('ConfigLoader Tests', () => {
{
id: '1',
arn: 'arn:aws:sqs:us-east-1:123456789012:queue1',
fifo: false,
matcherConfig: {
labelMatchers: [['label1', 'label2']],
exactMatch: true,
Expand Down Expand Up @@ -211,7 +208,6 @@ describe('ConfigLoader Tests', () => {
const matcherConfig: RunnerMatcherConfig[] = [
{
arn: 'arn:aws:sqs:eu-central-1:123456:npalm-default-queued-builds',
fifo: true,
id: 'https://sqs.eu-central-1.amazonaws.com/123456/npalm-default-queued-builds',
matcherConfig: {
exactMatch: true,
Expand Down Expand Up @@ -248,7 +244,6 @@ describe('ConfigLoader Tests', () => {
const matcherConfig: RunnerMatcherConfig[] = [
{
arn: 'arn:aws:sqs:eu-central-1:123456:npalm-default-queued-builds',
fifo: true,
id: 'https://sqs.eu-central-1.amazonaws.com/123456/npalm-default-queued-builds',
matcherConfig: {
exactMatch: true,
Expand Down
2 changes: 0 additions & 2 deletions lambdas/functions/webhook/src/runners/dispatch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ describe('Dispatcher', () => {
eventType: 'workflow_job',
installationId: 0,
queueId: runnerConfig[0].id,
queueFifo: false,
repoOwnerType: 'Organization',
});
});
Expand Down Expand Up @@ -149,7 +148,6 @@ describe('Dispatcher', () => {
eventType: 'workflow_job',
installationId: 0,
queueId: 'match',
queueFifo: false,
repoOwnerType: 'Organization',
});
});
Expand Down
1 change: 0 additions & 1 deletion lambdas/functions/webhook/src/runners/dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ async function handleWorkflowJob(
eventType: githubEvent,
installationId: body.installation?.id ?? 0,
queueId: queue.id,
queueFifo: queue.fifo,
repoOwnerType: body.repository.owner.type,
});
logger.info(`Successfully dispatched job for ${body.repository.full_name} to the queue ${queue.id}`);
Expand Down
28 changes: 3 additions & 25 deletions lambdas/functions/webhook/src/sqs/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SendMessageCommandInput } from '@aws-sdk/client-sqs';
import { ActionRequestMessage, sendActionRequest } from '.';
import { sendActionRequest } from '.';

const mockSQS = {
sendMessage: jest.fn(() => {
Expand Down Expand Up @@ -30,38 +30,16 @@ describe('Test sending message to SQS.', () => {

it('no fifo queue', async () => {
// Arrange
const no_fifo_message: ActionRequestMessage = {
...message,
queueFifo: false,
};
const sqsMessage: SendMessageCommandInput = {
QueueUrl: queueUrl,
MessageBody: JSON.stringify(no_fifo_message),
MessageBody: JSON.stringify(message),
};

// Act
const result = sendActionRequest(no_fifo_message);
const result = sendActionRequest(message);

// Assert
expect(mockSQS.sendMessage).toHaveBeenCalledWith(sqsMessage);
await expect(result).resolves.not.toThrow();
});

it('use a fifo queue', async () => {
// Arrange
const fifo_message: ActionRequestMessage = {
...message,
queueFifo: true,
};
const sqsMessage: SendMessageCommandInput = {
QueueUrl: queueUrl,
MessageBody: JSON.stringify(fifo_message),
};
// Act
const result = sendActionRequest(fifo_message);

// Assert
expect(mockSQS.sendMessage).toHaveBeenCalledWith({ ...sqsMessage, MessageGroupId: String(message.id) });
await expect(result).resolves.not.toThrow();
});
});
5 changes: 0 additions & 5 deletions lambdas/functions/webhook/src/sqs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export interface ActionRequestMessage {
repositoryOwner: string;
installationId: number;
queueId: string;
queueFifo: boolean;
repoOwnerType: string;
}

Expand All @@ -26,7 +25,6 @@ export interface RunnerMatcherConfig {
matcherConfig: MatcherConfig;
id: string;
arn: string;
fifo: boolean;
}

export interface GithubWorkflowEvent {
Expand All @@ -42,9 +40,6 @@ export const sendActionRequest = async (message: ActionRequestMessage): Promise<
};

logger.debug(`sending message to SQS: ${JSON.stringify(sqsMessage)}`);
if (message.queueFifo) {
sqsMessage.MessageGroupId = String(message.id);
}

await sqs.sendMessage(sqsMessage);
};
1 change: 0 additions & 1 deletion lambdas/functions/webhook/src/webhook/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ function mockSSMResponse() {
{
id: '1',
arn: 'arn:aws:sqs:us-east-1:123456789012:queue1',
fifo: false,
matcherConfig: {
labelMatchers: [['label1', 'label2']],
exactMatch: true,
Expand Down
16 changes: 6 additions & 10 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,11 @@ resource "aws_sqs_queue_policy" "build_queue_policy" {
}

resource "aws_sqs_queue" "queued_builds" {
name = "${var.prefix}-queued-builds${var.enable_fifo_build_queue ? ".fifo" : ""}"
delay_seconds = var.delay_webhook_event
visibility_timeout_seconds = var.runners_scale_up_lambda_timeout
message_retention_seconds = var.job_queue_retention_in_seconds
fifo_queue = var.enable_fifo_build_queue
receive_wait_time_seconds = 0
content_based_deduplication = var.enable_fifo_build_queue
name = "${var.prefix}-queued-builds"
delay_seconds = var.delay_webhook_event
visibility_timeout_seconds = var.runners_scale_up_lambda_timeout
message_retention_seconds = var.job_queue_retention_in_seconds
receive_wait_time_seconds = 0
redrive_policy = var.redrive_build_queue.enabled ? jsonencode({
deadLetterTargetArn = aws_sqs_queue.queued_builds_dlq[0].arn,
maxReceiveCount = var.redrive_build_queue.maxReceiveCount
Expand All @@ -80,12 +78,11 @@ resource "aws_sqs_queue_policy" "build_queue_dlq_policy" {

resource "aws_sqs_queue" "queued_builds_dlq" {
count = var.redrive_build_queue.enabled ? 1 : 0
name = "${var.prefix}-queued-builds_dead_letter${var.enable_fifo_build_queue ? ".fifo" : ""}"
name = "${var.prefix}-queued-builds_dead_letter"

sqs_managed_sse_enabled = var.queue_encryption.sqs_managed_sse_enabled
kms_master_key_id = var.queue_encryption.kms_master_key_id
kms_data_key_reuse_period_seconds = var.queue_encryption.kms_data_key_reuse_period_seconds
fifo_queue = var.enable_fifo_build_queue
tags = var.tags
}

Expand Down Expand Up @@ -114,7 +111,6 @@ module "webhook" {
(aws_sqs_queue.queued_builds.id) = {
id : aws_sqs_queue.queued_builds.id
arn : aws_sqs_queue.queued_builds.arn
fifo : var.enable_fifo_build_queue
matcherConfig : {
labelMatchers : [local.runner_labels]
exactMatch : var.enable_runner_workflow_job_labels_check_all
Expand Down
2 changes: 1 addition & 1 deletion modules/multi-runner/README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions modules/multi-runner/notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
enable_workflow_job_events_queue
17 changes: 7 additions & 10 deletions modules/multi-runner/queues.tf
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ data "aws_iam_policy_document" "deny_unsecure_transport" {
}

resource "aws_sqs_queue" "queued_builds" {
for_each = var.multi_runner_config
name = "${var.prefix}-${each.key}-queued-builds${each.value.fifo ? ".fifo" : ""}"
delay_seconds = each.value.runner_config.delay_webhook_event
visibility_timeout_seconds = var.runners_scale_up_lambda_timeout
message_retention_seconds = each.value.runner_config.job_queue_retention_in_seconds
fifo_queue = each.value.fifo
receive_wait_time_seconds = 0
content_based_deduplication = each.value.fifo
for_each = var.multi_runner_config
name = "${var.prefix}-${each.key}-queued-builds"
delay_seconds = each.value.runner_config.delay_webhook_event
visibility_timeout_seconds = var.runners_scale_up_lambda_timeout
message_retention_seconds = each.value.runner_config.job_queue_retention_in_seconds
receive_wait_time_seconds = 0
redrive_policy = each.value.redrive_build_queue.enabled ? jsonencode({
deadLetterTargetArn = aws_sqs_queue.queued_builds_dlq[each.key].arn,
maxReceiveCount = each.value.redrive_build_queue.maxReceiveCount
Expand All @@ -55,12 +53,11 @@ resource "aws_sqs_queue_policy" "build_queue_policy" {

resource "aws_sqs_queue" "queued_builds_dlq" {
for_each = { for config, values in var.multi_runner_config : config => values if values.redrive_build_queue.enabled }
name = "${var.prefix}-${each.key}-queued-builds_dead_letter${each.value.fifo ? ".fifo" : ""}"
name = "${var.prefix}-${each.key}-queued-builds_dead_letter"

sqs_managed_sse_enabled = var.queue_encryption.sqs_managed_sse_enabled
kms_master_key_id = var.queue_encryption.kms_master_key_id
kms_data_key_reuse_period_seconds = var.queue_encryption.kms_data_key_reuse_period_seconds
fifo_queue = each.value.fifo
tags = var.tags
}

Expand Down
2 changes: 0 additions & 2 deletions modules/multi-runner/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ variable "multi_runner_config" {
exactMatch = optional(bool, false)
priority = optional(number, 999)
})
fifo = optional(bool, false)
redrive_build_queue = optional(object({
enabled = bool
maxReceiveCount = number
Expand Down Expand Up @@ -199,7 +198,6 @@ variable "multi_runner_config" {
exactMatch: "If set to true all labels in the workflow job must match the GitHub labels (os, architecture and `self-hosted`). When false if __any__ workflow label matches it will trigger the webhook."
priority: "If set it defines the priority of the matcher, the matcher with the lowest priority will be evaluated first. Default is 999, allowed values 0-999."
}
fifo: "Enable a FIFO queue to remain the order of events received by the webhook. Suggest to set to true for repo level runners."
redrive_build_queue: "Set options to attach (optional) a dead letter queue to the build queue, the queue between the webhook and the scale up lambda. You have the following options. 1. Disable by setting `enabled` to false. 2. Enable by setting `enabled` to `true`, `maxReceiveCount` to a number of max retries."
}
EOT
Expand Down
Loading

0 comments on commit 2f20a8b

Please sign in to comment.