Skip to content

Commit

Permalink
Add ability to convey account id in sqs funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
tim.reichard committed Nov 30, 2022
1 parent e9a515c commit eb4697a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 16 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ History
=======


v0.17.23 (2022-11-30)

* Add account_id argument to SQS functions for cross account functionality.


v0.17.22 (2022-11-17)

* Fix issue with writing excel data to s3 for data ingestion project.
Expand Down
47 changes: 32 additions & 15 deletions aioradio/aws/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,31 @@ async def add_regions(regions: List[str]):


@AWS_SERVICE.active
async def create_queue(queue: str, region: str, attributes: Dict[str, str]) -> Dict[str, Any]:
async def create_queue(queue: str, region: str, attributes: Dict[str, str], account_id: str='') -> Dict[str, Any]:
"""Create SQS queue in region defined.
Args:
queue (str): sqs queue
region (str): AWS region
attributes (Dict[str, str]): sqs queue attributes
account_id (str, optional): AWS account ID
Returns:
Dict[str, str]: response of operation
"""

return await SQS[region]['client']['obj'].create_queue(QueueName=queue, Attributes=attributes)
if account_id:
result = await SQS[region]['client']['obj'].create_queue(QueueName=queue, QueueOwnerAWSAccountId=account_id, Attributes=attributes)
else:
result = await SQS[region]['client']['obj'].create_queue(QueueName=queue, Attributes=attributes)
return result


@AWS_SERVICE.active
async def get_messages(
queue: str,
region: str,
account_id: str='',
wait_time: int=20,
max_messages: int=10,
visibility_timeout: int=30,
Expand All @@ -54,6 +60,7 @@ async def get_messages(
Args:
queue (str): sqs queue
region (str): AWS region
account_id (str, optional): AWS account ID
wait_time (int, optional): time to wait polling for messages. Defaults to 20.
max_messages (int, optional): max messages polled. Defaults to 10.
visibility_timeout (int, optional): timeout for when message will return to queue if not deleted. Defaults to 30.
Expand All @@ -64,7 +71,10 @@ async def get_messages(
"""

messages = []
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)
if account_id:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id)
else:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)
queue_url = resp['QueueUrl']
resp = await SQS[region]['client']['obj'].receive_message(
QueueUrl=queue_url,
Expand All @@ -79,66 +89,73 @@ async def get_messages(


@AWS_SERVICE.active
async def send_messages(
queue: str,
region: str,
entries: List[Dict[str, str]]) -> Dict[str, list]:
async def send_messages(queue: str, region: str, entries: List[Dict[str, str]], account_id: str='') -> Dict[str, list]:
"""Send up to 10 messages to an SQS queue.
Args:
queue (str): sqs queue
region (str): AWS region
entries (List[Dict[str, str]]): List of dicts containing the keys: Id and MessageBody
account_id (str, optional): AWS account ID
Returns:
Dict[str, list]: dict with two keys, either Successful or Failed
"""

resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)
if account_id:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id)
else:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)

queue_url = resp['QueueUrl']
result = await SQS[region]['client']['obj'].send_message_batch(QueueUrl=queue_url, Entries=entries)

return result


@AWS_SERVICE.active
async def delete_messages(
queue: str,
region: str,
entries: List[Dict[str, str]]) -> Dict[str, list]:
async def delete_messages(queue: str, region: str, entries: List[Dict[str, str]], account_id: str='') -> Dict[str, list]:
"""Delete up to 10 messages from an SQS queue.
Args:
queue (str): sqs queue
region (str): AWS region
entries (List[Dict[str, str]]): List of dicts containing the keys: Id and ReceiptHandle
account_id (str, optional): AWS account ID
Returns:
Dict[str, list]: dict with two keys, either Successful or Failed
"""

resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)
if account_id:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id)
else:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)
queue_url = resp['QueueUrl']
result = await SQS[region]['client']['obj'].delete_message_batch(QueueUrl=queue_url, Entries=entries)

return result


@AWS_SERVICE.active
async def purge_messages(queue: str, region: str) -> str:
async def purge_messages(queue: str, region: str, account_id: str='') -> str:
"""Purge messages from queue in region defined.
Args:
queue (str): sqs queue
region (str): AWS region
account_id (str, optional): AWS account ID
Returns:
str: error message if any
"""

error = ''
try:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)
if account_id:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue, QueueOwnerAWSAccountId=account_id)
else:
resp = await SQS[region]['client']['obj'].get_queue_url(QueueName=queue)
queue_url = resp['QueueUrl']
await SQS[region]['client']['obj'].purge_queue(QueueUrl=queue_url)
except ClientError as err:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fileobj.read()

setup(name='aioradio',
version='0.17.22',
version='0.17.23',
description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit eb4697a

Please sign in to comment.