Skip to content

Commit

Permalink
Refactor for unit testing
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaters committed Aug 10, 2024
1 parent 3f001d7 commit 5e4a587
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 28 deletions.
35 changes: 21 additions & 14 deletions awx/main/credential_plugins/aws_assumerole.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
'id': 'access_key',
'label': _('AWS Access Key'),
'type': 'string',
'secret': True,
'help_text': _('The optional AWS access key for the user who will assume the role'),
},
{
Expand All @@ -34,7 +35,7 @@
'type': 'string',
'help_text': _('The optional External ID which will be provided to the assume role API'),
},
{'id': 'role_arn', 'label': 'AWS ARN Role Name', 'type': 'string', 'help_text': _('The ARN Role Name to be assumed in AWS')},
{'id': 'role_arn', 'label': 'AWS ARN Role Name', 'type': 'string', 'secret': True, 'help_text': _('The ARN Role Name to be assumed in AWS')},
],
'metadata': [
{
Expand All @@ -48,6 +49,23 @@
}


def aws_assumerole_getcreds(access_key, secret_key, role_arn, external_id):
if (access_key is None or len(access_key) == 0) and (secret_key is None or len(secret_key) == 0):
# Connect using credentials in the EE
connection = boto3.client(service_name="sts")
else:
# Connect to AWS using provided credentials
connection = boto3.client(service_name="sts", aws_access_key_id=access_key, aws_secret_access_key=secret_key)
try:
response = connection.assume_role(RoleArn=role_arn, RoleSessionName='AAP_AWS_Role_Session1', ExternalId=external_id)
except ClientError as ce:
raise ValueError(f'Got a bad client response from AWS: {ce.msg}.')

credentials = response.get("Credentials", {})

return credentials


def aws_assumerole_backend(**kwargs):
"""This backend function actually contacts AWS to assume a given role for the specified user"""
access_key = kwargs.get('access_key')
Expand All @@ -61,7 +79,7 @@ def aws_assumerole_backend(**kwargs):
# separate credentials, and should allow the same user to request
# multiple roles.
#
credential_key_hash = hashlib.sha256((access_key + role_arn).encode('utf-8'))
credential_key_hash = hashlib.sha256((str(access_key or '') + role_arn).encode('utf-8'))
credential_key = credential_key_hash.hexdigest()

credentials = _aws_cred_cache.get(credential_key, None)
Expand All @@ -71,18 +89,7 @@ def aws_assumerole_backend(**kwargs):
#
if (credentials is None) or (credentials['Expiration'] < datetime.datetime.now(credentials['Expiration'].tzinfo)):

if (access_key is None or len(access_key) == 0) and (secret_key is None or len(secret_key) == 0):
# Connect using credentials in the EE
connection = boto3.client(service_name="sts")
else:
# Connect to AWS using provided credentials
connection = boto3.client(service_name="sts", aws_access_key_id=access_key, aws_secret_access_key=secret_key)
try:
response = connection.assume_role(RoleArn=role_arn, RoleSessionName='AAP_AWS_Role_Session1', ExternalId=external_id)
except ClientError as ce:
raise ValueError(f'Got a bad client response from AWS: {ce.msg}.')

credentials = response.get("Credentials", {})
credentials = aws_assumerole_getcreds(access_key, secret_key, role_arn, external_id)

_aws_cred_cache[credential_key] = credentials

Expand Down
60 changes: 46 additions & 14 deletions awx/main/tests/functional/test_credential_plugins.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import datetime
from unittest import mock
from awx.main.credential_plugins import hashivault
from awx.main.credential_plugins import aws_assumerole
Expand Down Expand Up @@ -124,28 +125,59 @@ def test_hashivault_handle_auth_not_enough_args():

def test_aws_assumerole_with_accesssecret():
kwargs = {
'access_key': 'the_access_key',
'secret_key': 'the_secret_key',
'access_key': 'my_access_key',
'secret_key': 'my_secret_key',
'role_arn': 'the_arn',
'identifier': 'the_session_token',
'identifier': 'access_token',
}
with mock.patch.object(aws_assumerole, 'aws_assumerole_backend') as method_mock:
method_mock.return_value = 'the_session_token'
token = aws_assumerole.backend(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param=kwargs)
assert token == 'the_session_token'
method_call_with = [kwargs.get('access_key'), kwargs.get('secret_key'), kwargs.get('role_arn'), None]
with mock.patch.object(aws_assumerole, 'aws_assumerole_getcreds') as method_mock:
method_mock.return_value = {
'access_key': 'the_access_key',
'secret_key': 'the_secret_key',
'access_token': 'the_access_token',
'Expiration': datetime.datetime.today() + datetime.timedelta(days=1),
}
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_called_with(kwargs.get('access_key'), kwargs.get('secret_key'), kwargs.get('role_arn'), None)
assert token == 'the_access_token'
kwargs['identifier'] = 'secret_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_secret_key'
kwargs['identifier'] = 'access_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_access_key'


def test_aws_assumerole_with_arnonly():
kwargs = {
'role_arn': 'the_arn',
'identifier': 'the_session_token',
'identifier': 'access_token',
}
with mock.patch.object(aws_assumerole, 'aws_assumerole_backend') as method_mock:
method_mock.return_value = 'the_session_token'
token = aws_assumerole.backend(**kwargs)
method_mock.assert_called_with(**kwargs, auth_param=kwargs)
assert token == 'the_session_token'
with mock.patch.object(aws_assumerole, 'aws_assumerole_getcreds') as method_mock:
method_mock.return_value = {
'access_key': 'the_access_key',
'secret_key': 'the_secret_key',
'access_token': 'the_access_token',
'Expiration': datetime.datetime.today() + datetime.timedelta(days=1),
}
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_called_with(None, None, kwargs.get('role_arn'), None)
assert token == 'the_access_token'
kwargs['identifier'] = 'secret_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_secret_key'
kwargs['identifier'] = 'access_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_access_key'


class TestDelineaImports:
Expand Down

0 comments on commit 5e4a587

Please sign in to comment.