This repository has been archived by the owner on Aug 3, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
1d53c12
commit a658049
Showing
5 changed files
with
252 additions
and
238 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
from __future__ import unicode_literals | ||
import getpass | ||
import logging | ||
import sys | ||
import time | ||
import requests | ||
from builtins import input | ||
|
||
import rainbow_logging_handler | ||
|
||
from nd_okta_auth import okta | ||
from nd_okta_auth import aws | ||
from nd_okta_auth.metadata import __desc__, __version__ | ||
|
||
|
||
def user_input(text): | ||
'''Wraps input() making testing support of py2 and py3 easier''' | ||
return input(text) | ||
|
||
|
||
def setup_logging(): | ||
'''Returns back a pretty color-coded logger''' | ||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
handler = rainbow_logging_handler.RainbowLoggingHandler(sys.stdout) | ||
fmt = '%(asctime)-10s (%(levelname)s) %(message)s' | ||
formatter = logging.Formatter(fmt) | ||
handler.setFormatter(formatter) | ||
logger.addHandler(handler) | ||
return logger | ||
|
||
|
||
def login(aws_profile: str, | ||
okta_appid: str, | ||
okta_org: str, | ||
username: str, | ||
reup: bool, | ||
debug: bool = False): | ||
# Generate our logger first, and write out our app name and version | ||
log = setup_logging() | ||
log.info('%s v%s' % (__desc__, __version__)) | ||
|
||
if debug: | ||
log.setLevel(logging.DEBUG) | ||
|
||
# Ask the user for their password.. we do this once at the beginning, and | ||
# we keep it in memory for as long as this tool is running. Its never ever | ||
# written out or cached to disk anywhere. | ||
password = getpass.getpass() | ||
|
||
# Generate our initial OktaSaml client and handle any exceptions thrown. | ||
# Generally these are input validation issues. | ||
try: | ||
okta_client = okta.OktaSaml(okta_org, username, password) | ||
except okta.EmptyInput: | ||
log.error('Cannot enter a blank string for any input') | ||
raise | ||
|
||
# Authenticate the Okta client. If necessary, we will ask for MFA input. | ||
try: | ||
okta_client.auth() | ||
except okta.InvalidPassword: | ||
log.error('Invalid Username ({user}) or Password'.format( | ||
user=username)) | ||
raise | ||
except okta.ExhaustedFactors as e: | ||
log.error(e) | ||
raise | ||
|
||
# Once we're authenticated with an OktaSaml client object, we can use that | ||
# object to get a fresh SAMLResponse repeatedly and refresh our AWS | ||
# Credentials. | ||
session = None | ||
role_selection = None | ||
while True: | ||
# If an AWS Session object has been created already, lets check if its | ||
# still valid. If it is, sleep a bit and skip to the next execution of | ||
# the loop. | ||
if session and session.is_valid: | ||
log.debug('Credentials are still valid, sleeping') | ||
time.sleep(15) | ||
continue | ||
|
||
log.info('Getting SAML Assertion from {org}'.format(org=okta_org)) | ||
|
||
try: | ||
assertion = okta_client.get_assertion(appid=okta_appid, | ||
apptype='amazon_aws') | ||
session = aws.Session(assertion, profile=aws_profile) | ||
|
||
# If role_selection is set we're in a reup loop. Re-set the role on | ||
# the session to prevent the user being prompted for the role again | ||
# on each subsequent renewal. | ||
if role_selection is not None: | ||
session.set_role(role_selection) | ||
|
||
session.assume_role() | ||
|
||
except aws.MultipleRoles: | ||
log.warning('Multiple AWS roles found; please select one') | ||
roles = session.available_roles() | ||
for role_index, role in enumerate(roles): | ||
print("[{}] Role: {}".format(role_index, role["role"])) | ||
role_selection = user_input('Select a role from above: ') | ||
session.set_role(role_selection) | ||
session.assume_role() | ||
except requests.exceptions.ConnectionError: | ||
log.warning('Connection error... will retry') | ||
time.sleep(5) | ||
continue | ||
|
||
# If we're not running in re-up mode, once we have the assertion | ||
# and creds, go ahead and quit. | ||
if not reup: | ||
break | ||
|
||
log.info('Reup enabled, sleeping...') | ||
time.sleep(5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import logging | ||
import unittest | ||
from unittest import mock | ||
|
||
from nd_okta_auth import auth, aws, okta | ||
from nd_okta_auth.auth import login, user_input | ||
|
||
|
||
class AuthTest(unittest.TestCase): | ||
|
||
def test_setup_logger(self): | ||
# Simple execution test - make sure that the logger code executes and | ||
# returns a root logger. No mocks used here, want to ensure that the | ||
# options passed to the logger are valid. | ||
ret = auth.setup_logging() | ||
self.assertEquals(type(ret), type(logging.getLogger())) | ||
|
||
@mock.patch('nd_okta_auth.auth.user_input') | ||
@mock.patch('nd_okta_auth.aws.Session') | ||
@mock.patch('nd_okta_auth.okta.OktaSaml') | ||
@mock.patch('getpass.getpass') | ||
def test_multirole(self, pass_mock, | ||
okta_mock, aws_mock, input_mock): | ||
# First call to this is the password. Second call is the mis-typed | ||
# passcode. Third call is a valid passcode. | ||
pass_mock.side_effect = ['test_password'] | ||
input_mock.side_effect = '0' | ||
|
||
# Just mock out the entire Okta object, we won't really instantiate it | ||
fake_okta = mock.MagicMock(name='OktaSaml') | ||
okta_mock.return_value = fake_okta | ||
aws_mock.return_value = mock.MagicMock(name='aws_mock') | ||
|
||
# Throw MultipleRoles to validate actions when there are multiple roles | ||
mocked_session = aws_mock.return_value | ||
mocked_session.assume_role.side_effect = [aws.MultipleRoles(), None] | ||
|
||
# Return multiple roles | ||
mocked_session.available_roles = mock.Mock() | ||
roles = [{'role': '1', 'principle': ''}, | ||
{'role': '2', 'principle': ''}] | ||
mocked_session.available_roles.return_value = roles | ||
|
||
_run_auth_login() | ||
|
||
# Ensure that getpass was called once for the password | ||
pass_mock.assert_has_calls([ | ||
mock.call(), | ||
]) | ||
|
||
# Ensure that user_input was called for the role selection | ||
input_mock.assert_has_calls([ | ||
mock.call('Select a role from above: '), | ||
]) | ||
|
||
@mock.patch('nd_okta_auth.okta.OktaSaml') | ||
@mock.patch('getpass.getpass') | ||
def test_bad_password(self, pass_mock, okta_mock): | ||
pass_mock.return_value = 'test_password' | ||
|
||
# Just mock out the entire Okta object, we won't really instantiate it | ||
fake_okta = mock.MagicMock(name='fake_okta') | ||
fake_okta.auth.side_effect = okta.InvalidPassword | ||
okta_mock.return_value = fake_okta | ||
|
||
with self.assertRaises(okta.InvalidPassword): | ||
_run_auth_login() | ||
|
||
@mock.patch('nd_okta_auth.okta.OktaSaml') | ||
@mock.patch('getpass.getpass') | ||
def test_exhausted_factors(self, pass_mock, okta_mock): | ||
pass_mock.return_value = 'test_password' | ||
|
||
# Just mock out the entire Okta object, we won't really instantiate it | ||
fake_okta = mock.MagicMock(name='fake_okta') | ||
fake_okta.auth.side_effect = okta.ExhaustedFactors | ||
okta_mock.return_value = fake_okta | ||
|
||
with self.assertRaises(okta.ExhaustedFactors): | ||
_run_auth_login() | ||
|
||
@mock.patch('nd_okta_auth.okta.OktaSaml') | ||
@mock.patch('getpass.getpass') | ||
def test_bad_input(self, pass_mock, okta_mock): | ||
# Pretend that we got some bad input... | ||
pass_mock.return_value = '' | ||
okta_mock.side_effect = okta.EmptyInput | ||
|
||
with self.assertRaises(okta.EmptyInput): | ||
_run_auth_login() | ||
|
||
@mock.patch('nd_okta_auth.auth.input') | ||
def test_input(self, mock_input): | ||
mock_input.return_value = 'test' | ||
self.assertEqual('test', user_input('input test')) | ||
|
||
|
||
def _run_auth_login(): | ||
login(aws_profile='eng', | ||
okta_appid='appid', | ||
okta_org='org', | ||
username='username', | ||
reup=False) |
Oops, something went wrong.