Skip to content

Commit

Permalink
Add a method to subscribe SQS queue to SNS topic
Browse files Browse the repository at this point in the history
An SQS queue might need to subscribe to an SNS Topic.
The method generates Subscription and QueuePolicy sections for
CloudFormation template
  • Loading branch information
jeromef853 committed Jun 19, 2024
1 parent 18b6e76 commit 9b33369
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 3 deletions.
4 changes: 3 additions & 1 deletion src/e3/aws/troposphere/sns/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ def add_lambda_subscription(
"Endpoint": function.arn,
"Protocol": "lambda",
"TopicArn": self.arn,
"DeliveryPolicy": delivery_policy,
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})

self.optional_resources.extend(
[
sns.SubscriptionResource(
Expand Down
40 changes: 38 additions & 2 deletions src/e3/aws/troposphere/sqs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from e3.aws.troposphere.iam.policy_document import PolicyDocument
from e3.aws.troposphere.iam.policy_statement import Allow

from troposphere import sqs, GetAtt, Ref
from troposphere import sns, sqs, GetAtt, Ref

if TYPE_CHECKING:
from typing import Optional
Expand All @@ -27,6 +27,10 @@ def __init__(
"""Initialize a SQS.
:param name: topic name
:param fifo: Set the queue type to fifo
:param visibility_timeout: set the length of time during which a message will be
unavailable after a message is delivered from the queue
:param dlq_name: dead letter queue name
"""
self.name = name
self.attr = {"QueueName": name, "VisibilityTimeout": visibility_timeout}
Expand All @@ -44,6 +48,7 @@ def __init__(
"deadLetterTargetArn": GetAtt(name_to_id(dlq_name), "Arn"),
"maxReceiveCount": "3",
}
self.optional_resources: list[AWSObject] = []

def allow_service_to_write(
self, service: str, name_suffix: str, condition: Optional[ConditionType] = None
Expand All @@ -64,6 +69,34 @@ def allow_service_to_write(
).as_dict,
)

def subscribe_to_sns_topic(
self, topic_arn: str, delivery_policy: dict | None = None
) -> None:
"""Subscribe to SNS topic.
:param topic_arn: ARN of the topic to subscribe
:param delivery_policy: The delivery policy to assign to the subscription
"""
sub_params = {
"Endpoint": self.arn,
"Protocol": "sqs",
"TopicArn": topic_arn,
}

if delivery_policy:
sub_params.update({"DeliveryPolicy": delivery_policy})

self.optional_resources.extend(
[
sns.SubscriptionResource(name_to_id(f"{self.name}Sub"), **sub_params),
self.allow_service_to_write(
service="sns",
name_suffix="Sub",
condition={"ArnLike": {"aws:SourceArn": topic_arn}},
),
]
)

@property
def arn(self) -> GetAtt:
"""SQS ARN."""
Expand All @@ -76,4 +109,7 @@ def ref(self) -> Ref:

def resources(self, stack: Stack) -> list[AWSObject]:
"""Compute AWS resources for the construct."""
return [sqs.Queue.from_dict(name_to_id(self.name), self.attr)]
return [
sqs.Queue.from_dict(name_to_id(self.name), self.attr),
*self.optional_resources,
]
Empty file.
88 changes: 88 additions & 0 deletions tests/tests_e3_aws/troposphere/sqs/sqs_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from e3.aws.troposphere import Stack
from e3.aws.troposphere.sqs import Queue

EXPECTED_QUEUE_DEFAULT_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
}
}


EXPECTED_QUEUE_TEMPLATE = {
"Myqueue": {
"Properties": {
"ContentBasedDeduplication": True,
"FifoQueue": True,
"QueueName": "myqueue.fifo",
"RedrivePolicy": {
"deadLetterTargetArn": {"Fn::GetAtt": ["Somedlqname", "Arn"]},
"maxReceiveCount": "3",
},
"VisibilityTimeout": 10,
},
"Type": "AWS::SQS::Queue",
}
}


EXPECTED_SQS_SUBSCRIPTION_TEMPLATE = {
"Myqueue": {
"Properties": {"QueueName": "myqueue", "VisibilityTimeout": 30},
"Type": "AWS::SQS::Queue",
},
"MyqueuePolicySub": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Condition": {"ArnLike": {"aws:SourceArn": "some_topic_arn"}},
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Resource": {"Fn::GetAtt": ["Myqueue", "Arn"]},
}
],
"Version": "2012-10-17",
},
"Queues": [{"Ref": "Myqueue"}],
},
"Type": "AWS::SQS::QueuePolicy",
},
"MyqueueSub": {
"Properties": {
"Endpoint": {"Fn::GetAtt": ["Myqueue", "Arn"]},
"Protocol": "sqs",
"TopicArn": "some_topic_arn",
},
"Type": "AWS::SNS::Subscription",
},
}


def test_queue_default(stack: Stack) -> None:
"""Test Queue default creation."""
stack.add(Queue(name="myqueue"))
assert stack.export()["Resources"] == EXPECTED_QUEUE_DEFAULT_TEMPLATE


def test_queue(stack: Stack) -> None:
"""Test Queue creation."""
stack.add(
Queue(
name="myqueue", fifo=True, visibility_timeout=10, dlq_name="some_dlq_name"
)
)
assert stack.export()["Resources"] == EXPECTED_QUEUE_TEMPLATE


def test_subscribe_to_sns_topic(stack: Stack) -> None:
"""Test sqs subscription to sns topic."""
queue = Queue(name="myqueue")
queue.subscribe_to_sns_topic("some_topic_arn")

stack.add(queue)

assert stack.export()["Resources"] == EXPECTED_SQS_SUBSCRIPTION_TEMPLATE

0 comments on commit 9b33369

Please sign in to comment.