diff --git a/nd_okta_auth/auth.py b/nd_okta_auth/auth.py new file mode 100644 index 0000000..7704e0e --- /dev/null +++ b/nd_okta_auth/auth.py @@ -0,0 +1,118 @@ +from __future__ import unicode_literals +import getpass +import logging +import sys +import time +import requests +from builtins import input + +import rainbow_logging_handler + +from nd_okta_auth import okta +from nd_okta_auth import aws +from nd_okta_auth.metadata import __desc__, __version__ + + +def user_input(text): + '''Wraps input() making testing support of py2 and py3 easier''' + return input(text) + + +def setup_logging(): + '''Returns back a pretty color-coded logger''' + logger = logging.getLogger() + logger.setLevel(logging.INFO) + handler = rainbow_logging_handler.RainbowLoggingHandler(sys.stdout) + fmt = '%(asctime)-10s (%(levelname)s) %(message)s' + formatter = logging.Formatter(fmt) + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + +def login(aws_profile: str, + okta_appid: str, + okta_org: str, + username: str, + reup: bool, + debug: bool = False): + # Generate our logger first, and write out our app name and version + log = setup_logging() + log.info('%s v%s' % (__desc__, __version__)) + + if debug: + log.setLevel(logging.DEBUG) + + # Ask the user for their password.. we do this once at the beginning, and + # we keep it in memory for as long as this tool is running. Its never ever + # written out or cached to disk anywhere. + password = getpass.getpass() + + # Generate our initial OktaSaml client and handle any exceptions thrown. + # Generally these are input validation issues. + try: + okta_client = okta.OktaSaml(okta_org, username, password) + except okta.EmptyInput: + log.error('Cannot enter a blank string for any input') + raise + + # Authenticate the Okta client. If necessary, we will ask for MFA input. + try: + okta_client.auth() + except okta.InvalidPassword: + log.error('Invalid Username ({user}) or Password'.format( + user=username)) + raise + except okta.ExhaustedFactors as e: + log.error(e) + raise + + # Once we're authenticated with an OktaSaml client object, we can use that + # object to get a fresh SAMLResponse repeatedly and refresh our AWS + # Credentials. + session = None + role_selection = None + while True: + # If an AWS Session object has been created already, lets check if its + # still valid. If it is, sleep a bit and skip to the next execution of + # the loop. + if session and session.is_valid: + log.debug('Credentials are still valid, sleeping') + time.sleep(15) + continue + + log.info('Getting SAML Assertion from {org}'.format(org=okta_org)) + + try: + assertion = okta_client.get_assertion(appid=okta_appid, + apptype='amazon_aws') + session = aws.Session(assertion, profile=aws_profile) + + # If role_selection is set we're in a reup loop. Re-set the role on + # the session to prevent the user being prompted for the role again + # on each subsequent renewal. + if role_selection is not None: + session.set_role(role_selection) + + session.assume_role() + + except aws.MultipleRoles: + log.warning('Multiple AWS roles found; please select one') + roles = session.available_roles() + for role_index, role in enumerate(roles): + print("[{}] Role: {}".format(role_index, role["role"])) + role_selection = user_input('Select a role from above: ') + session.set_role(role_selection) + session.assume_role() + except requests.exceptions.ConnectionError: + log.warning('Connection error... will retry') + time.sleep(5) + continue + + # If we're not running in re-up mode, once we have the assertion + # and creds, go ahead and quit. + if not reup: + break + + log.info('Reup enabled, sleeping...') + time.sleep(5) diff --git a/nd_okta_auth/main.py b/nd_okta_auth/main.py index 205bccf..cc80900 100644 --- a/nd_okta_auth/main.py +++ b/nd_okta_auth/main.py @@ -13,38 +13,10 @@ # limitations under the License. # # Copyright 2017 Nextdoor.com, Inc - -from __future__ import unicode_literals import argparse -import getpass -import logging -import sys -import time -import requests -from builtins import input - -import rainbow_logging_handler - -from nd_okta_auth import okta -from nd_okta_auth import aws -from nd_okta_auth.metadata import __desc__, __version__ - - -def user_input(text): - '''Wraps input() making testing support of py2 and py3 easier''' - return input(text) - - -def setup_logging(): - '''Returns back a pretty color-coded logger''' - logger = logging.getLogger() - logger.setLevel(logging.INFO) - handler = rainbow_logging_handler.RainbowLoggingHandler(sys.stdout) - fmt = '%(asctime)-10s (%(levelname)s) %(message)s' - formatter = logging.Formatter(fmt) - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger +from future.moves import sys +from nd_okta_auth import auth +from nd_okta_auth.metadata import __version__ def get_config_parser(argv): @@ -118,97 +90,17 @@ def get_config_parser(argv): return config -def main(argv): - # Generate our logger first, and write out our app name and version - log = setup_logging() - log.info('%s v%s' % (__desc__, __version__)) - - # Get our configuration object based on the CLI options. This handles - # parsing arguments and ensuring the user supplied the required params. - config = get_config_parser(argv) - - if config.debug: - log.setLevel(logging.DEBUG) - - # Ask the user for their password.. we do this once at the beginning, and - # we keep it in memory for as long as this tool is running. Its never ever - # written out or cached to disk anywhere. - password = getpass.getpass() - - # Generate our initial OktaSaml client and handle any exceptions thrown. - # Generally these are input validation issues. - try: - okta_client = okta.OktaSaml(config.org, config.username, password) - except okta.EmptyInput: - log.error('Cannot enter a blank string for any input') - sys.exit(1) - - # Authenticate the Okta client. If necessary, we will ask for MFA input. - try: - okta_client.auth() - except okta.InvalidPassword: - log.error('Invalid Username ({user}) or Password'.format( - user=config.username)) - sys.exit(1) - except okta.ExhaustedFactors as e: - log.error(e.message) - sys.exit(1) - - # Once we're authenticated with an OktaSaml client object, we can use that - # object to get a fresh SAMLResponse repeatedly and refresh our AWS - # Credentials. - session = None - role_selection = None - while True: - # If an AWS Session object has been created already, lets check if its - # still valid. If it is, sleep a bit and skip to the next execution of - # the loop. - if session and session.is_valid: - log.debug('Credentials are still valid, sleeping') - time.sleep(15) - continue - - log.info('Getting SAML Assertion from {org}'.format( - org=config.org)) - - try: - assertion = okta_client.get_assertion(appid=config.appid, - apptype='amazon_aws') - session = aws.Session(assertion, profile=config.name) - - # If role_selection is set we're in a reup loop. Re-set the role on - # the session to prevent the user being prompted for the role again - # on each subsequent renewal. - if role_selection is not None: - session.set_role(role_selection) - - session.assume_role() - - except aws.MultipleRoles: - log.warning('Multiple AWS roles found; please select one') - roles = session.available_roles() - for role_index, role in enumerate(roles): - print("[{}] Role: {}".format(role_index, role["role"])) - role_selection = user_input('Select a role from above: ') - session.set_role(role_selection) - session.assume_role() - except requests.exceptions.ConnectionError: - log.warning('Connection error... will retry') - time.sleep(5) - continue - - # If we're not running in re-up mode, once we have the assertion - # and creds, go ahead and quit. - if not config.reup: - break - - log.info('Reup enabled, sleeping...') - time.sleep(5) - - def entry_point(): """Zero-argument entry point for use with setuptools/distribute.""" - raise SystemExit(main(sys.argv)) + config = get_config_parser(sys.argv) + raise SystemExit( + auth.login(aws_profile=config.name, + okta_appid=config.appid, + okta_org=config.org, + username=config.username, + reup=config.reup, + debug=config.debug) + ) if __name__ == '__main__': diff --git a/nd_okta_auth/metadata.py b/nd_okta_auth/metadata.py index 82e91c3..c360e38 100644 --- a/nd_okta_auth/metadata.py +++ b/nd_okta_auth/metadata.py @@ -13,5 +13,5 @@ # Copyright 2017 Nextdoor.com, Inc -__version__ = '1.0.1' +__version__ = '1.0.2' __desc__ = 'Nextdoor Okta Auther' diff --git a/nd_okta_auth/test/auth_test.py b/nd_okta_auth/test/auth_test.py new file mode 100644 index 0000000..9313eed --- /dev/null +++ b/nd_okta_auth/test/auth_test.py @@ -0,0 +1,103 @@ +import logging +import unittest +from unittest import mock + +from nd_okta_auth import auth, aws, okta +from nd_okta_auth.auth import login, user_input + + +class AuthTest(unittest.TestCase): + + def test_setup_logger(self): + # Simple execution test - make sure that the logger code executes and + # returns a root logger. No mocks used here, want to ensure that the + # options passed to the logger are valid. + ret = auth.setup_logging() + self.assertEquals(type(ret), type(logging.getLogger())) + + @mock.patch('nd_okta_auth.auth.user_input') + @mock.patch('nd_okta_auth.aws.Session') + @mock.patch('nd_okta_auth.okta.OktaSaml') + @mock.patch('getpass.getpass') + def test_multirole(self, pass_mock, + okta_mock, aws_mock, input_mock): + # First call to this is the password. Second call is the mis-typed + # passcode. Third call is a valid passcode. + pass_mock.side_effect = ['test_password'] + input_mock.side_effect = '0' + + # Just mock out the entire Okta object, we won't really instantiate it + fake_okta = mock.MagicMock(name='OktaSaml') + okta_mock.return_value = fake_okta + aws_mock.return_value = mock.MagicMock(name='aws_mock') + + # Throw MultipleRoles to validate actions when there are multiple roles + mocked_session = aws_mock.return_value + mocked_session.assume_role.side_effect = [aws.MultipleRoles(), None] + + # Return multiple roles + mocked_session.available_roles = mock.Mock() + roles = [{'role': '1', 'principle': ''}, + {'role': '2', 'principle': ''}] + mocked_session.available_roles.return_value = roles + + _run_auth_login() + + # Ensure that getpass was called once for the password + pass_mock.assert_has_calls([ + mock.call(), + ]) + + # Ensure that user_input was called for the role selection + input_mock.assert_has_calls([ + mock.call('Select a role from above: '), + ]) + + @mock.patch('nd_okta_auth.okta.OktaSaml') + @mock.patch('getpass.getpass') + def test_bad_password(self, pass_mock, okta_mock): + pass_mock.return_value = 'test_password' + + # Just mock out the entire Okta object, we won't really instantiate it + fake_okta = mock.MagicMock(name='fake_okta') + fake_okta.auth.side_effect = okta.InvalidPassword + okta_mock.return_value = fake_okta + + with self.assertRaises(okta.InvalidPassword): + _run_auth_login() + + @mock.patch('nd_okta_auth.okta.OktaSaml') + @mock.patch('getpass.getpass') + def test_exhausted_factors(self, pass_mock, okta_mock): + pass_mock.return_value = 'test_password' + + # Just mock out the entire Okta object, we won't really instantiate it + fake_okta = mock.MagicMock(name='fake_okta') + fake_okta.auth.side_effect = okta.ExhaustedFactors + okta_mock.return_value = fake_okta + + with self.assertRaises(okta.ExhaustedFactors): + _run_auth_login() + + @mock.patch('nd_okta_auth.okta.OktaSaml') + @mock.patch('getpass.getpass') + def test_bad_input(self, pass_mock, okta_mock): + # Pretend that we got some bad input... + pass_mock.return_value = '' + okta_mock.side_effect = okta.EmptyInput + + with self.assertRaises(okta.EmptyInput): + _run_auth_login() + + @mock.patch('nd_okta_auth.auth.input') + def test_input(self, mock_input): + mock_input.return_value = 'test' + self.assertEqual('test', user_input('input test')) + + +def _run_auth_login(): + login(aws_profile='eng', + okta_appid='appid', + okta_org='org', + username='username', + reup=False) diff --git a/nd_okta_auth/test/main_test.py b/nd_okta_auth/test/main_test.py index 660131c..0c4e502 100644 --- a/nd_okta_auth/test/main_test.py +++ b/nd_okta_auth/test/main_test.py @@ -1,10 +1,10 @@ from __future__ import unicode_literals -import unittest -import logging + import sys +import unittest + from nd_okta_auth import main -from nd_okta_auth import aws -from nd_okta_auth import okta + if sys.version_info[0] < 3: # Python 2 import mock else: @@ -13,13 +13,6 @@ class MainTest(unittest.TestCase): - def test_setup_logger(self): - # Simple execution test - make sure that the logger code executes and - # returns a root logger. No mocks used here, want to ensure that the - # options passed to the logger are valid. - ret = main.setup_logging() - self.assertEquals(type(ret), type(logging.getLogger())) - def test_get_config_parser(self): # Simple execution test again - get the argument parser and make sure # it looks reasonably correct. Just validating that this function has @@ -37,116 +30,24 @@ def test_get_config_parser(self): self.assertEquals(ret.appid, 'app/id') self.assertEquals(ret.username, 'test') - @mock.patch('nd_okta_auth.aws.Session') - @mock.patch('nd_okta_auth.okta.OktaSaml') + @mock.patch('nd_okta_auth.auth.login') @mock.patch('nd_okta_auth.main.get_config_parser') - @mock.patch('getpass.getpass') - def test_entry_point(self, pass_mock, config_mock, okta_mock, aws_mock): - # Mock out the password getter and return a simple password - pass_mock.return_value = 'test_password' - - # Just mock out the entire Okta object, we won't really instantiate it - okta_mock.return_value = mock.MagicMock() - aws_mock.return_value = mock.MagicMock() - - # Mock out the arguments that were passed in + def test_entry_point(self, config_mock, auth_login): + # Give fake_parser = mock.MagicMock(name='fake_parser') - fake_parser.org = 'server' - fake_parser.username = 'username' + fake_parser.name = 'eng' + fake_parser.org = 'org' + fake_parser.appid = 'appid' fake_parser.username = 'username' fake_parser.debug = True - fake_parser.reup = 0 - config_mock.return_value = fake_parser - - main.main('test') - - okta_mock.assert_called_with('server', 'username', 'test_password') - - @mock.patch('nd_okta_auth.main.user_input') - @mock.patch('nd_okta_auth.aws.Session') - @mock.patch('nd_okta_auth.okta.OktaSaml') - @mock.patch('nd_okta_auth.main.get_config_parser') - @mock.patch('getpass.getpass') - def test_entry_point_multirole(self, pass_mock, config_mock, - okta_mock, aws_mock, input_mock): - # First call to this is the password. Second call is the mis-typed - # passcode. Third call is a valid passcode. - pass_mock.side_effect = ['test_password'] - input_mock.side_effect = '0' - - # Just mock out the entire Okta object, we won't really instantiate it - fake_okta = mock.MagicMock(name='OktaSaml') - okta_mock.return_value = fake_okta - aws_mock.return_value = mock.MagicMock(name='aws_mock') - - # Throw MultipleRoles to validate actions when there are multiple roles - mocked_session = aws_mock.return_value - mocked_session.assume_role.side_effect = [aws.MultipleRoles(), None] - - # Return multiple roles - mocked_session.available_roles = mock.Mock() - roles = [{'role': '1', 'principle': ''}, - {'role': '2', 'principle': ''}] - mocked_session.available_roles.return_value = roles - - # Make sure we don't get stuck in a loop, always have to mock out the - # reup option. - fake_parser = mock.MagicMock(name='fake_parser') - fake_parser.reup = 0 + fake_parser.reup = False config_mock.return_value = fake_parser - - main.main('test') - - # Ensure that getpass was called once for the password - pass_mock.assert_has_calls([ - mock.call(), - ]) - - # Ensure that user_input was called for the role selection - input_mock.assert_has_calls([ - mock.call('Select a role from above: '), - ]) - - @mock.patch('nd_okta_auth.okta.OktaSaml') - @mock.patch('nd_okta_auth.main.get_config_parser') - @mock.patch('getpass.getpass') - def test_entry_point_bad_password(self, pass_mock, config_mock, okta_mock): - # Mock out the password getter and return a simple password - pass_mock.return_value = 'test_password' - - # Just mock out the entire Okta object, we won't really instantiate it - fake_okta = mock.MagicMock(name='fake_okta') - fake_okta.auth.side_effect = okta.InvalidPassword - okta_mock.return_value = fake_okta - - # Mock out the arguments that were passed in - fake_parser = mock.MagicMock(name='fake_parser') - config_mock.return_value = fake_parser - - with self.assertRaises(SystemExit): - main.main('test') - - @mock.patch('nd_okta_auth.okta.OktaSaml') - @mock.patch('nd_okta_auth.main.get_config_parser') - @mock.patch('getpass.getpass') - def test_entry_point_bad_input(self, pass_mock, config_mock, okta_mock): - # Pretend that we got some bad input... - pass_mock.return_value = '' - okta_mock.side_effect = okta.EmptyInput - - # Mock out the arguments that were passed in - fake_parser = mock.MagicMock(name='fake_parser') - config_mock.return_value = fake_parser - - with self.assertRaises(SystemExit): - main.main('test') - - @mock.patch('nd_okta_auth.main.input') - def test_input(self, mock_input): - mock_input.return_value = 'test' - self.assertEqual('test', main.user_input('input test')) - - @mock.patch('nd_okta_auth.main.main') - def test_entry_point_func(self, main_mock): + # When with self.assertRaises(SystemExit): main.entry_point() + # Then + auth_login.assert_called_with(aws_profile='eng', + okta_appid='appid', + okta_org='org', + username='username', + reup=False, debug=True)