From eb4697ab04cb35e39c8bfd0e1f78d3d7b932e0d0 Mon Sep 17 00:00:00 2001 From: "tim.reichard" Date: Wed, 30 Nov 2022 12:34:41 -0600 Subject: [PATCH] Add ability to convey account id in sqs funcs --- HISTORY.rst | 5 +++++ aioradio/aws/sqs.py | 47 ++++++++++++++++++++++++++++++--------------- setup.py | 2 +- 3 files changed, 38 insertions(+), 16 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index f90d8c4..c687002 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.17.23 (2022-11-30) + +* Add account_id argument to SQS functions for cross account functionality. + + v0.17.22 (2022-11-17) * Fix issue with writing excel data to s3 for data ingestion project. diff --git a/aioradio/aws/sqs.py b/aioradio/aws/sqs.py index 77cab8a..92add1b 100644 --- a/aioradio/aws/sqs.py +++ b/aioradio/aws/sqs.py @@ -26,25 +26,31 @@ async def add_regions(regions: List[str]): @AWS_SERVICE.active -async def create_queue(queue: str, region: str, attributes: Dict[str, str]) -> Dict[str, Any]: +async def create_queue(queue: str, region: str, attributes: Dict[str, str], account_id: str='') -> Dict[str, Any]: """Create SQS queue in region defined. Args: queue (str): sqs queue region (str): AWS region attributes (Dict[str, str]): sqs queue attributes + account_id (str, optional): AWS account ID Returns: Dict[str, str]: response of operation """ - return await SQS[region]['client']['obj'].create_queue(QueueName=queue, Attributes=attributes) + if account_id: + result = await SQS[region]['client']['obj'].create_queue(QueueName=queue, QueueOwnerAWSAccountId=account_id, Attributes=attributes) + else: + result = await SQS[region]['client']['obj'].create_queue(QueueName=queue, Attributes=attributes) + return result @AWS_SERVICE.active async def get_messages( queue: str, region: str, + account_id: str='', wait_time: int=20, max_messages: int=10, visibility_timeout: int=30, @@ -54,6 +60,7 @@ async def get_messages( Args: queue (str): sqs queue region (str): AWS region + account_id (str, optional): AWS account ID wait_time (int, optional): time to wait polling for messages. Defaults to 20. max_messages (int, optional): max messages polled. Defaults to 10. visibility_timeout (int, optional): timeout for when message will return to queue if not deleted. Defaults to 30. @@ -64,7 +71,10 @@ async def get_messages( """ messages = [] - resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) + if account_id: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id) + else: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) queue_url = resp['QueueUrl'] resp = await SQS[region]['client']['obj'].receive_message( QueueUrl=queue_url, @@ -79,22 +89,24 @@ async def get_messages( @AWS_SERVICE.active -async def send_messages( - queue: str, - region: str, - entries: List[Dict[str, str]]) -> Dict[str, list]: +async def send_messages(queue: str, region: str, entries: List[Dict[str, str]], account_id: str='') -> Dict[str, list]: """Send up to 10 messages to an SQS queue. Args: queue (str): sqs queue region (str): AWS region entries (List[Dict[str, str]]): List of dicts containing the keys: Id and MessageBody + account_id (str, optional): AWS account ID Returns: Dict[str, list]: dict with two keys, either Successful or Failed """ - resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) + if account_id: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id) + else: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) + queue_url = resp['QueueUrl'] result = await SQS[region]['client']['obj'].send_message_batch(QueueUrl=queue_url, Entries=entries) @@ -102,22 +114,23 @@ async def send_messages( @AWS_SERVICE.active -async def delete_messages( - queue: str, - region: str, - entries: List[Dict[str, str]]) -> Dict[str, list]: +async def delete_messages(queue: str, region: str, entries: List[Dict[str, str]], account_id: str='') -> Dict[str, list]: """Delete up to 10 messages from an SQS queue. Args: queue (str): sqs queue region (str): AWS region entries (List[Dict[str, str]]): List of dicts containing the keys: Id and ReceiptHandle + account_id (str, optional): AWS account ID Returns: Dict[str, list]: dict with two keys, either Successful or Failed """ - resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) + if account_id: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id) + else: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) queue_url = resp['QueueUrl'] result = await SQS[region]['client']['obj'].delete_message_batch(QueueUrl=queue_url, Entries=entries) @@ -125,12 +138,13 @@ async def delete_messages( @AWS_SERVICE.active -async def purge_messages(queue: str, region: str) -> str: +async def purge_messages(queue: str, region: str, account_id: str='') -> str: """Purge messages from queue in region defined. Args: queue (str): sqs queue region (str): AWS region + account_id (str, optional): AWS account ID Returns: str: error message if any @@ -138,7 +152,10 @@ async def purge_messages(queue: str, region: str) -> str: error = '' try: - resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) + if account_id: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id) + else: + resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue) queue_url = resp['QueueUrl'] await SQS[region]['client']['obj'].purge_queue(QueueUrl=queue_url) except ClientError as err: diff --git a/setup.py b/setup.py index 15dd1db..b29a8c1 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.17.22', + version='0.17.23', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown",