From 1f8ce7abbae3d064af2fc9242921a98570a19ecd Mon Sep 17 00:00:00 2001 From: Slava Markeyev Date: Wed, 24 Oct 2018 13:38:18 -0700 Subject: [PATCH] Add U2F 2FA support (#22) * Add U2F 2FA support * Continue on passcode too short * Fix imports and quotes * Cleanup class registration * Update MFA docs --- .circleci/config.yml | 4 +- README.md | 19 +- nd_okta_auth/base_client.py | 50 ++++ nd_okta_auth/factor/__init__.py | 42 ++++ nd_okta_auth/factor/push.py | 46 ++++ nd_okta_auth/factor/tests/push_test.py | 114 +++++++++ nd_okta_auth/factor/tests/totp_test.py | 103 ++++++++ nd_okta_auth/factor/tests/u2f_test.py | 257 ++++++++++++++++++++ nd_okta_auth/factor/totp.py | 55 +++++ nd_okta_auth/factor/u2f.py | 88 +++++++ nd_okta_auth/main.py | 11 +- nd_okta_auth/okta.py | 192 ++++----------- nd_okta_auth/test/main_test.py | 58 ----- nd_okta_auth/test/okta_test.py | 315 +++++++++++-------------- requirements.test.txt | 2 +- requirements.txt | 1 + 16 files changed, 959 insertions(+), 398 deletions(-) create mode 100644 nd_okta_auth/base_client.py create mode 100644 nd_okta_auth/factor/__init__.py create mode 100644 nd_okta_auth/factor/push.py create mode 100644 nd_okta_auth/factor/tests/push_test.py create mode 100644 nd_okta_auth/factor/tests/totp_test.py create mode 100644 nd_okta_auth/factor/tests/u2f_test.py create mode 100644 nd_okta_auth/factor/totp.py create mode 100644 nd_okta_auth/factor/u2f.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 4f98031..700c4f5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -30,10 +30,10 @@ jobs: . venv/bin/activate python setup.py pep8 - run: - name: pyflakes + name: flake8 command: | . venv/bin/activate - python setup.py pyflakes + python setup.py flake8 test-3.5: <<: *test-template docker: diff --git a/README.md b/README.md index df38d44..3d3d1f1 100644 --- a/README.md +++ b/README.md @@ -10,20 +10,22 @@ credentials that can be used for any of the Amazon SDK libraries or CLI tools. # Features We have support for logging into Okta, optionally handling MFA Authentication, -and then generating new SAML authenticated AWS sessions. In paritcular, this +and then generating new SAML authenticated AWS sessions. In particular, this tool has a few core features. ## Optional MFA Authentication -If you organization requires MFA for the _[initial login into Okta][okta_mfa]_, +If your organization requires MFA for the _[initial login into Okta][okta_mfa]_, we will automatically detect that requirement on a per-user basis and prompt -the user to complete the Multi Factor Authentication. +the user to complete the Multi Factor Authentication. The following factors +are supported by _nd\_okta\_auth_: -In paritcular, there is support for standard passcode based auth, as well as -support for [Okta Verify with Push][okta_verify]. If both are available, -Okta Verify with Push will be prioritized and a push notification is -_automatically sent to the user_. If the user declines the validation, then -optionally the Passcode can be entered in manually. +- [FIDO U2F][okta_u2f] (eg yubikey) +- [Okta Verify with Push][okta_verify] +- TOTP (Okta Verify, Duo, and Google Authenticator) + +If a user has multiple factors they will be prompted in the above order. A +user can hit Control-C to skip a factor. ## Re-Up Mode .. Automatic Credential Re-Generation @@ -96,4 +98,5 @@ Python 2.7.1+ and Python 3.5.0+ are supported [okta_aws_guide]: https://support.okta.com/help/servlet/fileField?retURL=%2Fhelp%2Farticles%2FKnowledge_Article%2FAmazon-Web-Services-and-Okta-Integration-Guide&entityId=ka0F0000000MeyyIAC&field=File_Attachment__Body__s [okta_mfa]: https://www.okta.com/products/adaptive-multi-factor-authentication/ [okta_verify]: https://www.okta.com/blog/tag/okta-verify-with-push/ +[okta_u2f]: https://support.okta.com/help/s/article/Using-YubiKey-Authentication-in-Okta [aws_saml]: http://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRoleWithSAML.html diff --git a/nd_okta_auth/base_client.py b/nd_okta_auth/base_client.py new file mode 100644 index 0000000..7a0101b --- /dev/null +++ b/nd_okta_auth/base_client.py @@ -0,0 +1,50 @@ +import logging +import requests +import sys + +if sys.version_info[0] < 3: # Python 2 + from exceptions import Exception + +log = logging.getLogger(__name__) + +BASE_URL = 'https://{organization}.okta.com' + + +class BaseException(Exception): + '''Base Exception for Okta Auth''' + + +class BaseOktaClient(object): + def __init__(self, organization): + self.base_url = BASE_URL.format(organization=organization) + self.session = requests.Session() + + def _request(self, path, data=None): + '''Basic URL Fetcher for Okta + + Any HTTPError is raised immediately, otherwise the response is parsed + as JSON and passed back as a dictionary. + + Args: + path: The path at the base url to call + data: Optional data to pass in as Post parameters + + Returns: + The response in dict form. + ''' + headers = {'Accept': 'application/json', + 'Content-Type': 'application/json'} + + if path.startswith('http'): + url = path + else: + url = '{base}/api/v1{path}'.format(base=self.base_url, path=path) + + resp = self.session.post(url=url, headers=headers, json=data, + allow_redirects=False) + + resp_obj = resp.json() + log.debug(resp_obj) + + resp.raise_for_status() + return resp_obj diff --git a/nd_okta_auth/factor/__init__.py b/nd_okta_auth/factor/__init__.py new file mode 100644 index 0000000..601e346 --- /dev/null +++ b/nd_okta_auth/factor/__init__.py @@ -0,0 +1,42 @@ +import abc +import logging + +from nd_okta_auth import base_client + + +class FactorVerificationFailed(base_client.BaseException): + '''Failed to authenticate with second factor''' + + +log = logging.getLogger(__name__) + + +class Factor(base_client.BaseOktaClient): + __metaclass__ = abc.ABCMeta + + verify_path = '/authn/factors/{fid}/verify' + + def __init__(self, organization): + base_client.BaseOktaClient.__init__(self, organization) + + @abc.abstractmethod + def name(self): + """Name of the second factor. Must be same as Okta's `factorType`""" + return + + @abc.abstractmethod + def verify(self, fid, state_token, sleep): + """Verify a user with a second factor.""" + return + + +def factors(organization): + from nd_okta_auth.factor.u2f import U2fFactor # noqa: F401 + from nd_okta_auth.factor.push import PushFactor # noqa: F401 + from nd_okta_auth.factor.totp import TotpFactor # noqa: F401 + + return [ + U2fFactor(organization), + PushFactor(organization), + TotpFactor(organization) + ] diff --git a/nd_okta_auth/factor/push.py b/nd_okta_auth/factor/push.py new file mode 100644 index 0000000..cba3275 --- /dev/null +++ b/nd_okta_auth/factor/push.py @@ -0,0 +1,46 @@ +import logging +import time + +from nd_okta_auth.factor import Factor, FactorVerificationFailed + +log = logging.getLogger(__name__) + + +class PushFactor(Factor): + + def name(self): + return 'push' + + def verify(self, fid, state_token, sleep): + '''Triggers an Okta Push Verification and waits. + + This method is meant to be called by self.auth() if a Login session + requires MFA, and the users profile supports Okta Push with Verify. + + We trigger the push, and then immediately go into a wait loop. Each + time we loop around, we pull the latest status for that push event. If + its Declined, we will throw an error. If its accepted, we write out our + SessionToken. + + Args: + fid: Okta Factor ID used to trigger the push + state_token: State Token allowing us to trigger the push + sleep: amount of time to sleep between checking for push status + ''' + log.warning('Okta Verify Push being sent...') + path = self.verify_path.format(fid=fid) + data = {'fid': fid, + 'stateToken': state_token} + ret = self._request(path, data) + + while ret['status'] != 'SUCCESS': + log.info('Waiting for Okta Verification...') + time.sleep(sleep) + + if ret.get('factorResult', 'REJECTED') == 'REJECTED': + raise FactorVerificationFailed('Okta Verify Push REJECTED') + + links = ret.get('_links') + ret = self._request(links['next']['href'], data) + + return ret diff --git a/nd_okta_auth/factor/tests/push_test.py b/nd_okta_auth/factor/tests/push_test.py new file mode 100644 index 0000000..5d42dcd --- /dev/null +++ b/nd_okta_auth/factor/tests/push_test.py @@ -0,0 +1,114 @@ +from __future__ import unicode_literals + +import sys +import unittest + +import requests + +from nd_okta_auth.factor import push as factor + +if sys.version_info[0] < 3: # Python 2 + import mock +else: + from unittest import mock + +# Successful response message from Okta when you have fully logged in +SUCCESS_RESPONSE = { + 'status': 'SUCCESS', + 'expiresAt': '2017-07-24T17:05:59.000Z', + '_embedded': { + 'user': { + 'profile': { + 'locale': 'en', + 'lastName': 'Foo', + 'login': 'bob@foobar.com', + 'firstName': 'Bob', 'timeZone': + 'America/Los_Angeles'}, + 'id': 'XXXIDXXX' + } + }, + 'sessionToken': 'XXXTOKENXXX' +} + +MFA_CHALLENGE_RESPONSE_OKTA_VERIFY = { + 'status': 'MFA_REQUIRED', + '_embedded': { + 'factors': [ + { + 'factorType': 'push', + 'id': 'abcd', + } + ] + }, + 'stateToken': 'token', +} +MFA_WAITING_RESPONSE = { + 'status': 'MFA_CHALLENGE', + 'factorResult': 'WAITING', + '_links': { + 'next': { + 'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', + } + }, + 'stateToken': 'token', +} +MFA_REJECTED_RESPONSE = { + 'status': 'MFA_CHALLENGE', + 'factorResult': 'REJECTED', + '_links': { + 'next': { + 'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', + } + }, + 'stateToken': 'token', +} + + +class OktaTest(unittest.TestCase): + def test_push_name(self): + push_factor = factor.PushFactor('foobar') + self.assertEqual('push', push_factor.name()) + + def test_push_success(self): + push_factor = factor.PushFactor('https://foobar.okta.com') + push_factor._request = mock.MagicMock(name='_request') + + push_factor._request.side_effect = [ + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + SUCCESS_RESPONSE, + ] + + ret = push_factor.verify('123', 'token', 0.1) + self.assertEqual(ret, SUCCESS_RESPONSE) + + def test_push_rejected(self): + push_factor = factor.PushFactor('https://foobar.okta.com') + push_factor._request = mock.MagicMock(name='_request') + + push_factor._request.side_effect = [ + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_WAITING_RESPONSE, + MFA_REJECTED_RESPONSE, + ] + + with self.assertRaises(factor.FactorVerificationFailed): + push_factor.verify('123', 'token', 0.1) + + def test_push_unknown_failure(self): + push_factor = factor.PushFactor('https://foobar.okta.com') + push_factor.get_passcode = mock.MagicMock(name='get_passcode') + push_factor.get_passcode.return_value = 123456 + + push_factor._request = mock.MagicMock(name='_request') + + resp = requests.Response() + resp.status_code = 500 + resp.body = "Internal Server Error" + push_factor._request.side_effect = requests.exceptions.HTTPError( + response=resp) + + with self.assertRaises(requests.exceptions.HTTPError): + push_factor.verify('123', 'token', 0.1) diff --git a/nd_okta_auth/factor/tests/totp_test.py b/nd_okta_auth/factor/tests/totp_test.py new file mode 100644 index 0000000..7855852 --- /dev/null +++ b/nd_okta_auth/factor/tests/totp_test.py @@ -0,0 +1,103 @@ +from __future__ import unicode_literals + +import sys +import unittest + +import requests + +from nd_okta_auth.factor import totp as factor + +if sys.version_info[0] < 3: # Python 2 + import mock +else: + from unittest import mock + +# Successful response message from Okta when you have fully logged in +SUCCESS_RESPONSE = { + 'status': 'SUCCESS', + 'expiresAt': '2017-07-24T17:05:59.000Z', + '_embedded': { + 'user': { + 'profile': { + 'locale': 'en', + 'lastName': 'Foo', + 'login': 'bob@foobar.com', + 'firstName': 'Bob', 'timeZone': + 'America/Los_Angeles'}, + 'id': 'XXXIDXXX' + } + }, + 'sessionToken': 'XXXTOKENXXX'} + +TOTP_REJECTED_RESPONSE = { + 'status': 'MFA_CHALLENGE', + 'factorResult': 'REJECTED', + '_links': { + 'next': { + 'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', + } + }, + 'stateToken': 'token' +} + + +class OktaTest(unittest.TestCase): + def test_totp_name(self): + totp_factor = factor.TotpFactor('foobar') + self.assertEqual('token:software:totp', totp_factor.name()) + + @mock.patch('nd_okta_auth.factor.totp.user_input') + def test_get_passcode(self, input_mock): + totp_factor = factor.TotpFactor('foobar') + input_mock.return_value = 123456 + + totp_factor._request = mock.MagicMock(name='_request') + totp_factor._request.side_effect = [ + SUCCESS_RESPONSE, + ] + + totp_factor.verify('123', 'token', 0.1) + input_mock.assert_called_with('Time-based one-time passcode: ') + + def test_totp_success(self): + totp_factor = factor.TotpFactor('foobar') + totp_factor.get_passcode = mock.MagicMock(name='get_passcode') + totp_factor.get_passcode.return_value = 123456 + + totp_factor._request = mock.MagicMock(name='_request') + totp_factor._request.side_effect = [ + SUCCESS_RESPONSE, + ] + + ret = totp_factor.verify('123', 'token', 0.1) + self.assertEqual(ret, SUCCESS_RESPONSE) + + def test_totp_unknown_failure(self): + totp_factor = factor.TotpFactor('foobar') + totp_factor.get_passcode = mock.MagicMock(name='get_passcode') + totp_factor.get_passcode.return_value = 123456 + + totp_factor._request = mock.MagicMock(name='_request') + + resp = requests.Response() + resp.status_code = 500 + resp.body = "Internal Server Error" + totp_factor._request.side_effect = requests.exceptions.HTTPError( + response=resp) + + with self.assertRaises(requests.exceptions.HTTPError): + totp_factor.verify('123', 'token', 0.1) + + def test_totp_try_again(self): + totp_factor = factor.TotpFactor('foobar') + totp_factor.get_passcode = mock.MagicMock(name='get_passcode') + totp_factor.get_passcode.side_effect = [123, 123456, 654321] + + totp_factor._request = mock.MagicMock(name='_request') + + resp = requests.Response() + resp.status_code = 403 + totp_factor._request.side_effect = [requests.exceptions.HTTPError( + response=resp), SUCCESS_RESPONSE] + + totp_factor.verify('123', 'token', 0.1) diff --git a/nd_okta_auth/factor/tests/u2f_test.py b/nd_okta_auth/factor/tests/u2f_test.py new file mode 100644 index 0000000..b236eb2 --- /dev/null +++ b/nd_okta_auth/factor/tests/u2f_test.py @@ -0,0 +1,257 @@ +from __future__ import unicode_literals + +import sys +import unittest + +import fido2 + +from nd_okta_auth.factor import u2f as factor + +if sys.version_info[0] < 3: # Python 2 + import mock +else: + from unittest import mock + +# Successful response message from Okta when you have fully logged in +SUCCESS_RESPONSE = { + 'status': 'SUCCESS', + 'expiresAt': '2017-07-24T17:05:59.000Z', + '_embedded': { + 'user': { + 'profile': { + 'locale': 'en', + 'lastName': 'Foo', + 'login': 'bob@foobar.com', + 'firstName': 'Bob', 'timeZone': + 'America/Los_Angeles'}, + 'id': 'XXXIDXXX' + } + }, + 'sessionToken': 'XXXTOKENXXX' +} + +CHALLENGE_RESPONSE = { + 'status': 'MFA_CHALLENGE', + '_embedded': { + 'factor': { + 'profile': { + 'credentialId': 'asfodiuhdwacdas', + 'version': 'U2F_V2', + 'appId': 'https://foobar.okta.com' + }, + "_embedded": { + "challenge": { + 'nonce': 'anonce', + 'timeoutSeconds': 20 + } + }, + 'id': '123', + 'factorType': 'u2f', + 'provider': 'FIDO', + 'vendorName': 'FIDO' + } + }, + 'expiresAt': '2017-07-24T17:05:59.000Z', + 'stateToken': 'XXXTOKENXXX', +} + +REJECTED_RESPONSE = { + 'status': 'REJECTED', + '_links': { + 'next': { + 'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', + } + }, + 'stateToken': 'token', +} + + +class OktaTest(unittest.TestCase): + def test_push_name(self): + u2f_factor = factor.U2fFactor('foobar') + self.assertEqual('u2f', u2f_factor.name()) + + def test_u2f_success(self): + u2f_factor = factor.U2fFactor('foobar') + + # Mock out the U2F device + mock_device = mock.MagicMock(name='mock_device') + u2f_factor._get_devices = mock.MagicMock(name='_get_devices') + u2f_factor._get_devices.return_value = [mock_device].__iter__() + + # Mock out U2F client + mock_client = mock.MagicMock(name='mock_client') + u2f_factor._get_client = mock.MagicMock(name='_get_client') + u2f_factor._get_client.return_value = mock_client + mock_client.sign.return_value = {"clientData": "foo", + "signatureData": "bar"} + + # Mock call to Okta API + u2f_factor._request = mock.MagicMock(name='_request') + u2f_factor._request.side_effect = [ + CHALLENGE_RESPONSE, + SUCCESS_RESPONSE, + ] + + # Run code + ret = u2f_factor.verify('123', 'XXXTOKENXXX', 0.1) + + # Check results + self.assertEqual(ret, SUCCESS_RESPONSE) + + u2f_factor._get_client.assert_called_once_with(mock_device, + 'https://foobar' + '.okta.com') + registered_keys = [ + {'version': 'U2F_V2', 'keyHandle': 'asfodiuhdwacdas'}] + mock_client.sign.assert_called_once_with('https://foobar.okta.com', + 'anonce', registered_keys) + + calls = [ + mock.call('/authn/factors/123/verify', + {'fid': '123', 'stateToken': 'XXXTOKENXXX'}), + + mock.call('/authn/factors/123/verify', + {'stateToken': 'XXXTOKENXXX', + 'clientData': 'foo', + 'signatureData': 'bar'}) + ] + u2f_factor._request.assert_has_calls(calls) + + def test_u2f_wait(self): + u2f_factor = factor.U2fFactor('foobar') + + # Mock out the U2F device + mock_device = mock.MagicMock(name='mock_device') + u2f_factor._get_devices = mock.MagicMock(name='_get_devices') + u2f_factor._get_devices.return_value = [None, None, None, None, + mock_device].__iter__() + + # Mock out U2F client + mock_client = mock.MagicMock(name='mock_client') + u2f_factor._get_client = mock.MagicMock(name='_get_client') + u2f_factor._get_client.return_value = mock_client + mock_client.sign.return_value = {"clientData": "foo", + "signatureData": "bar"} + + # Mock call to Okta API + u2f_factor._request = mock.MagicMock(name='_request') + u2f_factor._request.side_effect = [ + CHALLENGE_RESPONSE, + SUCCESS_RESPONSE, + ] + + # Run code + ret = u2f_factor.verify('123', 'XXXTOKENXXX', 0.1) + + # Check results + self.assertEqual(ret, SUCCESS_RESPONSE) + + def test_u2f_client_error(self): + u2f_factor = factor.U2fFactor('foobar') + + # Mock out the U2F device + mock_device = mock.MagicMock(name='mock_device') + u2f_factor._get_devices = mock.MagicMock(name='_get_devices') + u2f_factor._get_devices.return_value = [mock_device].__iter__() + + # Mock out U2F client + mock_client = mock.MagicMock(name='mock_client') + u2f_factor._get_client = mock.MagicMock(name='_get_client') + u2f_factor._get_client.return_value = mock_client + mock_client.sign.side_effect = fido2.client.ClientError(4) + + # Mock call to Okta API + u2f_factor._request = mock.MagicMock(name='_request') + u2f_factor._request.side_effect = [ + CHALLENGE_RESPONSE + ] + + # Run code + with self.assertRaises(factor.FactorVerificationFailed): + u2f_factor.verify('123', 'XXXTOKENXXX', 0.1) + + # Check results + u2f_factor._get_client.assert_called_once_with(mock_device, + 'https://foobar' + '.okta.com') + calls = [ + mock.call('/authn/factors/123/verify', + {'fid': '123', 'stateToken': 'XXXTOKENXXX'}) + ] + u2f_factor._request.assert_has_calls(calls) + + def test_u2f_rejected(self): + u2f_factor = factor.U2fFactor('foobar') + + # Mock out the U2F device + mock_device = mock.MagicMock(name='mock_device') + u2f_factor._get_devices = mock.MagicMock(name='_get_devices') + u2f_factor._get_devices.return_value = [mock_device].__iter__() + + # Mock out U2F client + mock_client = mock.MagicMock(name='mock_client') + u2f_factor._get_client = mock.MagicMock(name='_get_client') + u2f_factor._get_client.return_value = mock_client + mock_client.sign.return_value = {"clientData": "foo", + "signatureData": "bar"} + + # Mock call to Okta API + u2f_factor._request = mock.MagicMock(name='_request') + u2f_factor._request.side_effect = [ + CHALLENGE_RESPONSE, + REJECTED_RESPONSE, + ] + + # Run code + with self.assertRaises(factor.FactorVerificationFailed): + ret = u2f_factor.verify('123', 'XXXTOKENXXX', 0.1) + self.assertEqual(ret, REJECTED_RESPONSE) + + # Check results + u2f_factor._get_client.assert_called_once_with(mock_device, + 'https://foobar.' + 'okta.com') + registered_keys = [ + {'version': 'U2F_V2', 'keyHandle': 'asfodiuhdwacdas'}] + mock_client.sign.assert_called_once_with('https://foobar.okta.com', + 'anonce', registered_keys) + + calls = [ + mock.call('/authn/factors/123/verify', {'fid': '123', + 'stateToken': 'XXXTOKENXXX' + } + ), + + mock.call('/authn/factors/123/verify', + {'stateToken': 'XXXTOKENXXX', + 'clientData': 'foo', + 'signatureData': 'bar'}) + ] + u2f_factor._request.assert_has_calls(calls) + + def test_unexpected_status(self): + u2f_factor = factor.U2fFactor('foobar') + + # Mock out the U2F device + mock_device = mock.MagicMock(name='mock_device') + u2f_factor._get_devices = mock.MagicMock(name='_get_devices') + u2f_factor._get_devices.return_value = [mock_device].__iter__() + + # Mock out U2F client + mock_client = mock.MagicMock(name='mock_client') + u2f_factor._get_client = mock.MagicMock(name='_get_client') + u2f_factor._get_client.return_value = mock_client + mock_client.sign.return_value = {"clientData": "foo", + "signatureData": "bar"} + + # Mock call to Okta API + u2f_factor._request = mock.MagicMock(name='_request') + u2f_factor._request.side_effect = [ + SUCCESS_RESPONSE, + ] + + # Run code + with self.assertRaises(factor.FactorVerificationFailed): + ret = u2f_factor.verify('123', 'XXXTOKENXXX', 0.1) + self.assertEqual(ret, REJECTED_RESPONSE) diff --git a/nd_okta_auth/factor/totp.py b/nd_okta_auth/factor/totp.py new file mode 100644 index 0000000..317bc3c --- /dev/null +++ b/nd_okta_auth/factor/totp.py @@ -0,0 +1,55 @@ +import logging + +import requests + +from nd_okta_auth.factor import Factor + +log = logging.getLogger(__name__) + + +def user_input(text): + '''Wraps input() making testing support of py2 and py3 easier''' + return input(text) + + +class TotpFactor(Factor): + + def get_passcode(self): + return user_input('Time-based one-time passcode: ') + + def name(self): + return 'token:software:totp' + + def verify(self, fid, state_token, sleep): + '''Validates an Okta user with Passcode-based MFA. + + Takes in the supplied Factor ID (fid), State Token and user supplied + Passcode, and validates the auth. If successful, sets the session + token. If invalid, raises an exception. + + Args: + fid: Okta Factor ID (returned in the PasscodeRequired exception) + state_token: State Tken (returned in the PasscodeRequired + exception) + sleep: not used + Returns: + Response from okta + ''' + + while True: + passcode = str(self.get_passcode()) + if len(passcode) != 6: + log.error('Passcodes must be 6 digits') + continue + + path = '/authn/factors/{fid}/verify'.format(fid=fid) + data = {'fid': fid, + 'stateToken': state_token, + 'passCode': passcode} + try: + return self._request(path, data) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 403: + log.error('Invalid Passcode Detected') + continue + raise e diff --git a/nd_okta_auth/factor/u2f.py b/nd_okta_auth/factor/u2f.py new file mode 100644 index 0000000..653ca2b --- /dev/null +++ b/nd_okta_auth/factor/u2f.py @@ -0,0 +1,88 @@ +import logging +import time + +from fido2.client import U2fClient, ClientError +from fido2.hid import CtapHidDevice + +from nd_okta_auth.factor import Factor, FactorVerificationFailed + +log = logging.getLogger(__name__) + + +class U2fFactor(Factor): + + def name(self): + return 'u2f' + + def _get_devices(self): + return CtapHidDevice.list_devices() + + def _get_client(self, dev, appId): + return U2fClient(dev, appId) + + def verify(self, fid, state_token, sleep): + '''Validates user with Okta using user's U2F hardware device. + + This method is meant to be called by self.auth() if a Login session + requires MFA, and the users profile supports U2F. + + We wait for a U2F device to be plugged into USB, request a challenge + nonce from Okta, pass the challenge to the U2F device, and finally + send back the challenge response to Okta. If its accepted, we write + out our SessionToken. + + Args: + fid: Okta Factor ID used to trigger the push + state_token: State Token allowing us to trigger the push + sleep: amount of seconds to wait between checking for U2F hardware + keep to be plugged in. + ''' + path = self.verify_path.format(fid=fid) + + # Wait for U2F device to be plugged in before continuing on + dev = None + while True: + dev = next(self._get_devices(), None) + if dev: + break + log.info('Waiting for FIDO U2F device to be plugged in.') + time.sleep(sleep) + + # Request U2F nonce/challenge from Okta + log.info('Requesting U2F challenge nonce from Okta') + data = {'fid': fid, + 'stateToken': state_token} + ret = self._request(path, data) + + if ret['status'] != 'MFA_CHALLENGE': + raise FactorVerificationFailed('Expected MFA challenge') + + # Get U2F device to sign nonce/challenge + appId = self.base_url + client = self._get_client(dev, appId) + + nonce = ret['_embedded']['factor']['_embedded']['challenge']['nonce'] + credentialId = ret['_embedded']['factor']['profile']['credentialId'] + registered_keys = [{'version': 'U2F_V2', 'keyHandle': credentialId}] + + log.warning('Touch your authenticator device now...') + + try: + r = client.sign(appId, nonce, registered_keys) + except ClientError: + raise FactorVerificationFailed('U2F devices failed to ' + 'sign request. Have you ' + 'registered it with "{}"?' + .format(appId)) + + # Send challenge response back to Okta + data = {'stateToken': state_token, + 'clientData': r.get("clientData"), + 'signatureData': r.get("signatureData")} + + ret = self._request(path, data) + + if ret.get('status') != 'SUCCESS': + raise FactorVerificationFailed() + + return ret diff --git a/nd_okta_auth/main.py b/nd_okta_auth/main.py index cc684b9..205bccf 100644 --- a/nd_okta_auth/main.py +++ b/nd_okta_auth/main.py @@ -150,12 +150,9 @@ def main(argv): log.error('Invalid Username ({user}) or Password'.format( user=config.username)) sys.exit(1) - except okta.PasscodeRequired as e: - log.warning('MFA Requirement Detected - Enter your passcode here') - verified = False - while not verified: - passcode = user_input('MFA Passcode: ') - verified = okta_client.validate_mfa(e.fid, e.state_token, passcode) + except okta.ExhaustedFactors as e: + log.error(e.message) + sys.exit(1) # Once we're authenticated with an OktaSaml client object, we can use that # object to get a fresh SAMLResponse repeatedly and refresh our AWS @@ -195,7 +192,7 @@ def main(argv): role_selection = user_input('Select a role from above: ') session.set_role(role_selection) session.assume_role() - except requests.exceptions.ConnectionError as e: + except requests.exceptions.ConnectionError: log.warning('Connection error... will retry') time.sleep(5) continue diff --git a/nd_okta_auth/okta.py b/nd_okta_auth/okta.py index a243f12..9d870b4 100644 --- a/nd_okta_auth/okta.py +++ b/nd_okta_auth/okta.py @@ -7,50 +7,39 @@ ''' from __future__ import unicode_literals + import base64 import logging -import time import sys + import bs4 import requests + +from nd_okta_auth import factor, base_client + if sys.version_info[0] < 3: # Python 2 from exceptions import Exception log = logging.getLogger(__name__) -BASE_URL = 'https://{organization}.okta.com' - - -class BaseException(Exception): - '''Base Exception for Okta Auth''' - class UnknownError(Exception): '''Some Expected Return Was Received''' -class EmptyInput(BaseException): +class EmptyInput(base_client.BaseException): '''Invalid Input - Empty String Detected''' -class InvalidPassword(BaseException): +class InvalidPassword(base_client.BaseException): '''Invalid Password''' -class PasscodeRequired(BaseException): - '''A 2FA Passcode Must Be Entered''' - - def __init__(self, fid, state_token): - self.fid = fid - self.state_token = state_token - - -class OktaVerifyRequired(BaseException): - '''OktaVerify Authentication Is Required''' +class ExhaustedFactors(base_client.BaseException): + '''Failed to authenticate user with any factor''' -class Okta(object): - +class Okta(base_client.BaseOktaClient): '''Base Okta Login Object with MFA handling. This base login object handles connecting to Okta, authenticating a user, @@ -62,7 +51,7 @@ class Okta(object): ''' def __init__(self, organization, username, password): - self.base_url = BASE_URL.format(organization=organization) + base_client.BaseOktaClient.__init__(self, organization) log.debug('Base URL Set to: {url}'.format(url=self.base_url)) # Validate the inputs are reasonably sane @@ -72,37 +61,8 @@ def __init__(self, organization, username, password): self.username = username self.password = password - self.session = requests.Session() - - def _request(self, path, data=None): - '''Basic URL Fetcher for Okta - - Any HTTPError is raised immediately, otherwise the response is parsed - as JSON and passed back as a dictionary. - - Args: - path: The path at the base url to call - data: Optional data to pass in as Post parameters - - Returns: - The response in dict form. - ''' - headers = {'Accept': 'application/json', - 'Content-Type': 'application/json'} - - if path.startswith('http'): - url = path - else: - url = '{base}/api/v1{path}'.format(base=self.base_url, path=path) - - resp = self.session.post(url=url, headers=headers, json=data, - allow_redirects=False) - - resp_obj = resp.json() - log.debug(resp_obj) - - resp.raise_for_status() - return resp_obj + self.supported_factors = factor.factors(organization) + self.session_token = None def set_token(self, ret): '''Parses an authentication response and stores the token. @@ -120,76 +80,6 @@ def set_token(self, ret): firstName=firstName, lastName=lastName)) self.session_token = ret['sessionToken'] - def validate_mfa(self, fid, state_token, passcode): - '''Validates an Okta user with Passcode-based MFA. - - Takes in the supplied Factor ID (fid), State Token and user supplied - Passcode, and validates the auth. If successful, sets the session - token. If invalid, raises an exception. - - Args: - fid: Okta Factor ID (returned in the PasscodeRequired exception) - state_token: State Tken (returned in the PasscodeRequired - exception) - passcode: The user-supplied Passcode to verify - - Returns: - True/False whether or not authentication was successful - ''' - if len(passcode) != 6: - log.error('Passcodes must be 6 digits') - return False - - path = '/authn/factors/{fid}/verify'.format(fid=fid) - data = {'fid': fid, - 'stateToken': state_token, - 'passCode': passcode} - try: - ret = self._request(path, data) - except requests.exceptions.HTTPError as e: - if e.response.status_code == 403: - log.error('Invalid Passcode Detected') - return False - raise UnknownError(e.response.body) - - self.set_token(ret) - return True - - def okta_verify_with_push(self, fid, state_token, sleep=1): - '''Triggers an Okta Push Verification and waits. - - This metho is meant to be called by self.auth() if a Login session - requires MFA, and the users profile supports Okta Push with Verify. - - We trigger the push, and then immediately go into a wait loop. Each - time we loop around, we pull the latest status for that push event. If - its Declined, we will throw an error. If its accepted, we write out our - SessionToken. - - Args: - fid: Okta Factor ID used to trigger the push - state_token: State Token allowing us to trigger the push - ''' - log.warning('Okta Verify Push being sent...') - path = '/authn/factors/{fid}/verify'.format(fid=fid) - data = {'fid': fid, - 'stateToken': state_token} - ret = self._request(path, data) - - while ret['status'] != 'SUCCESS': - log.info('Waiting for Okta Verification...') - time.sleep(sleep) - - if ret.get('factorResult', 'REJECTED') == 'REJECTED': - log.error('Okta Verify Push REJECTED') - return False - - links = ret.get('_links') - ret = self._request(links['next']['href'], data) - - self.set_token(ret) - return True - def auth(self): '''Performs an initial authentication against Okta. @@ -230,24 +120,44 @@ def auth(self): raise UnknownError() if status == 'MFA_REQUIRED' or status == 'MFA_CHALLENGE': - for factor in ret['_embedded']['factors']: - if factor['factorType'] == 'push': - try: - if self.okta_verify_with_push(factor['id'], - ret['stateToken']): - return - except KeyboardInterrupt: - # Allow users to use MFA Passcode by - # breaking out of waiting for the push. - break - - for factor in ret['_embedded']['factors']: - if factor['factorType'] == 'token:software:totp': - raise PasscodeRequired( - fid=factor['id'], - state_token=ret['stateToken']) - - raise UnknownError(status) + # Factors enabled by the user + enabled_factors = {} + for enabled_factor in ret['_embedded']['factors']: + enabled_factors[enabled_factor['factorType']] = enabled_factor + + # Loop through locally supported factors + for supported_factor in self.supported_factors: + enabled_factor = enabled_factors.get(supported_factor.name(), + None) + + if enabled_factor is None: + continue + + log.info('Authenticating with factor: {}'.format( + supported_factor.name())) + + try: + ret = supported_factor.verify(enabled_factor['id'], + ret['stateToken'], sleep=1) + self.set_token(ret) + return + except KeyboardInterrupt: + # Allow users to use MFA Push by breaking + # out of waiting for U2F device. + log.info('User skipping factor: {}'.format( + supported_factor.name())) + continue + except factor.FactorVerificationFailed as e: + # Non fatal error that a factor failed to + # be verified. + log.error(e) + continue + except requests.exceptions.ReadTimeout: + log.error('HTTP timeout contacting Okta at {}'.format( + self.base_url)) + continue + + raise ExhaustedFactors('Failed to verify with any MFA factor') class OktaSaml(Okta): diff --git a/nd_okta_auth/test/main_test.py b/nd_okta_auth/test/main_test.py index 5d26605..660131c 100644 --- a/nd_okta_auth/test/main_test.py +++ b/nd_okta_auth/test/main_test.py @@ -62,64 +62,6 @@ def test_entry_point(self, pass_mock, config_mock, okta_mock, aws_mock): okta_mock.assert_called_with('server', 'username', 'test_password') - @mock.patch('nd_okta_auth.main.user_input') - @mock.patch('nd_okta_auth.aws.Session') - @mock.patch('nd_okta_auth.okta.OktaSaml') - @mock.patch('nd_okta_auth.main.get_config_parser') - @mock.patch('getpass.getpass') - def test_entry_point_mfa(self, pass_mock, config_mock, - okta_mock, aws_mock, input_mock): - # First call to this is the password. Second call is the mis-typed - # passcode. Third call is a valid passcode. - pass_mock.side_effect = ['test_password'] - input_mock.side_effect = ['123', '123456'] - - # Just mock out the entire Okta object, we won't really instantiate it - fake_okta = mock.MagicMock(name='OktaSaml') - okta_mock.return_value = fake_okta - aws_mock.return_value = mock.MagicMock() - - # Make sure we don't get stuck in a loop, always have to mock out the - # reup option. - fake_parser = mock.MagicMock(name='fake_parser') - fake_parser.reup = 0 - config_mock.return_value = fake_parser - - # Now, when we auth() throw a okta.PasscodeRequired exception to - # trigger the MFA requirement. Note, this is only the manually entered - # in passcode MFA req. OktaSaml client automatically handles Okta - # Verify with Push MFA reqs. - fake_okta.auth.side_effect = okta.PasscodeRequired( - fid='test_factor_id', - state_token='test_token') - - # Pretend that the validate_mfa() call fails the first time, and - # succeeds the second time. This simulates a typo on the MFA code. - fake_okta.validate_mfa.side_effect = [False, True] - - main.main('test') - - # Ensure that getpass was called once for the password - pass_mock.assert_has_calls([ - mock.call(), - ]) - - # Ensure that we called auth, then called validate_mfa() twice - each - # with different passcodes. Validating that the user was indeed asked - # for a passcode on each iteration. - fake_okta.assert_has_calls([ - mock.call.auth(), - mock.call.validate_mfa('test_factor_id', 'test_token', '123'), - mock.call.validate_mfa('test_factor_id', 'test_token', '123456'), - ]) - - # Ensure that user_input was called twice; once for the bad input and - # once for the retry - input_mock.assert_has_calls([ - mock.call('MFA Passcode: '), - mock.call('MFA Passcode: '), - ]) - @mock.patch('nd_okta_auth.main.user_input') @mock.patch('nd_okta_auth.aws.Session') @mock.patch('nd_okta_auth.okta.OktaSaml') diff --git a/nd_okta_auth/test/okta_test.py b/nd_okta_auth/test/okta_test.py index 36f598d..b145ec3 100644 --- a/nd_okta_auth/test/okta_test.py +++ b/nd_okta_auth/test/okta_test.py @@ -1,14 +1,18 @@ from __future__ import unicode_literals + +import sys import unittest + import requests -import sys -from nd_okta_auth import okta + +from nd_okta_auth.okta import EmptyInput +from nd_okta_auth import factor, okta + if sys.version_info[0] < 3: # Python 2 import mock else: from unittest import mock - # Successful response message from Okta when you have fully logged in SUCCESS_RESPONSE = { 'status': 'SUCCESS', @@ -20,7 +24,7 @@ 'lastName': 'Foo', 'login': 'bob@foobar.com', 'firstName': 'Bob', 'timeZone': - 'America/Los_Angeles'}, + 'America/Los_Angeles'}, 'id': 'XXXIDXXX' } }, @@ -32,61 +36,161 @@ 'status': 'MFA_ENROLL', 'stateToken': 'token', } -MFA_CHALLENGE_RESPONSE_OKTA_VERIFY = { + +MFA_REQUIRED_RESPONSE = { 'status': 'MFA_REQUIRED', '_embedded': { 'factors': [ { - 'factorType': 'push', + 'factorType': 'test_factor', 'id': 'abcd', } ] }, 'stateToken': 'token', } -MFA_CHALLENGE_RESPONSE_PASSCODE = { + +MFA_REQUIRED_RESPONSE_TWOFACTORS = { 'status': 'MFA_REQUIRED', '_embedded': { 'factors': [ { - 'factorType': 'token:software:totp', - 'id': 'abcd', + 'factorType': 'factor_one', + 'id': '1', + }, + { + 'factorType': 'factor_two', + 'id': '2', } ] }, 'stateToken': 'token', } -MFA_WAITING_RESPONSE = { - 'status': 'MFA_CHALLENGE', - 'factorResult': 'WAITING', - '_links': { - 'next': { - 'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', - } - }, - 'stateToken': 'token', -} -MFA_REJECTED_RESPONSE = { - 'status': 'MFA_CHALLENGE', - 'factorResult': 'REJECTED', - '_links': { - 'next': { - 'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', - } - }, - 'stateToken': 'token', -} class OktaTest(unittest.TestCase): def test_init_blank_inputs(self): - with self.assertRaises(okta.EmptyInput): + with self.assertRaises(EmptyInput): okta.Okta(organization='', username='test', password='test') with self.assertRaises(okta.EmptyInput): okta.Okta(organization=None, username='test', password='test') + def test_auth_bad_password(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='_request') + + resp = requests.Response() + resp.status_code = 401 + resp.body = 'Bad Password' + client._request.side_effect = requests.exceptions.HTTPError( + response=resp) + + with self.assertRaises(okta.InvalidPassword): + client.auth() + + def test_set_token(self): + client = okta.Okta('organization', 'username', 'password') + client.session = mock.MagicMock(name='session') + client.set_token(SUCCESS_RESPONSE) + self.assertEquals(client.session_token, 'XXXTOKENXXX') + + def test_auth(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='request') + client._request.side_effect = [SUCCESS_RESPONSE] + + ret = client.auth() + self.assertEquals(ret, None) + + def test_auth_requires_mfa_enroll(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='request') + client._request.side_effect = [MFA_ENROLL_RESPONSE] + + with self.assertRaises(okta.UnknownError): + client.auth() + + def test_auth_mfa_verify(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='request') + client._request.side_effect = [MFA_REQUIRED_RESPONSE] + + test_factor = mock.MagicMock(name='test_factor') + test_factor.name.return_value = 'test_factor' + test_factor.verify.return_value = SUCCESS_RESPONSE + + client.supported_factors = [test_factor] + client.auth() + + test_factor.verify.assert_called_with('abcd', 'token', sleep=1) + self.assertEquals(client.session_token, 'XXXTOKENXXX') + + def test_auth_mfa_verify_fail(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='request') + client._request.side_effect = [MFA_REQUIRED_RESPONSE] + + test_factor = mock.MagicMock(name='test_factor') + test_factor.name.return_value = 'test_factor' + test_factor.verify.side_effect = factor.FactorVerificationFailed + + client.supported_factors = [test_factor] + with self.assertRaises(okta.ExhaustedFactors): + client.auth() + + self.assertEquals(client.session_token, None) + + def test_verify_no_supported_factors(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='request') + client._request.side_effect = [MFA_REQUIRED_RESPONSE] + + test_factor = mock.MagicMock(name='test_factor') + test_factor.name.return_value = 'phone' + test_factor.verify.side_effect = factor.FactorVerificationFailed + + client.supported_factors = [test_factor] + with self.assertRaises(okta.ExhaustedFactors): + client.auth() + + self.assertEquals(client.session_token, None) + + def test_auth_http_timeout(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='request') + client._request.side_effect = [MFA_REQUIRED_RESPONSE] + + test_factor = mock.MagicMock(name='test_factor') + test_factor.name.return_value = 'test_factor' + test_factor.verify.side_effect = requests.exceptions.ReadTimeout + + client.supported_factors = [test_factor] + with self.assertRaises(okta.ExhaustedFactors): + client.auth() + + self.assertEquals(client.session_token, None) + + def test_auth_verify_interrupt(self): + client = okta.Okta('organization', 'username', 'password') + client._request = mock.MagicMock(name='request') + client._request.side_effect = [MFA_REQUIRED_RESPONSE_TWOFACTORS] + + factor_one = mock.MagicMock(name='factor_one') + factor_one.name.return_value = 'factor_one' + factor_one.verify.side_effect = KeyboardInterrupt + + factor_two = mock.MagicMock(name='factor_two') + factor_two.name.return_value = 'factor_two' + factor_two.verify.return_value = SUCCESS_RESPONSE + + client.supported_factors = [factor_one, factor_two] + client.auth() + + factor_two.verify.assert_called_with('2', 'token', sleep=1) + self.assertEquals(client.session_token, 'XXXTOKENXXX') + def test_request_good_response(self): client = okta.Okta('organization', 'username', 'password') client.session = mock.MagicMock(name='session') @@ -152,154 +256,3 @@ class TestExc(Exception): client.session.post.return_value = fake_response_object with self.assertRaises(TestExc): client._request('/test', {'test': True}) - - def test_set_token(self): - client = okta.Okta('organization', 'username', 'password') - client.session = mock.MagicMock(name='session') - client.set_token(SUCCESS_RESPONSE) - self.assertEquals(client.session_token, 'XXXTOKENXXX') - - def test_validate_mfa_too_short(self): - client = okta.Okta('organization', 'username', 'password') - ret = client.validate_mfa('fid', 'token', '123') - self.assertEquals(False, ret) - - def test_validate_mfa_invalid_token(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - resp = requests.Response() - resp.status_code = 403 - client._request.side_effect = requests.exceptions.HTTPError( - response=resp) - - ret = client.validate_mfa('fid', 'token', '123456') - self.assertEquals(False, ret) - - client._request.assert_has_calls([ - mock.call( - '/authn/factors/fid/verify', - {'fid': 'fid', 'stateToken': 'token', 'passCode': '123456'}) - ]) - - def test_validate_mfa_unknown_error(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - resp = requests.Response() - resp.status_code = 500 - resp.body = 'Something bad happened' - client._request.side_effect = requests.exceptions.HTTPError( - response=resp) - - with self.assertRaises(okta.UnknownError): - client.validate_mfa('fid', 'token', '123456') - - def test_validate_mfa(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - client._request.return_value = SUCCESS_RESPONSE - ret = client.validate_mfa('fid', 'token', '123456') - self.assertEquals(ret, True) - self.assertEquals(client.session_token, 'XXXTOKENXXX') - - def test_okta_verify_with_push(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - SUCCESS_RESPONSE, - ] - - ret = client.okta_verify_with_push('123', 'token', sleep=0.1) - self.assertEquals(ret, True) - - def test_okta_verify_with_push_rejected(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - client._request.side_effect = [ - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_WAITING_RESPONSE, - MFA_REJECTED_RESPONSE, - ] - - ret = client.okta_verify_with_push('123', 'token', sleep=0.1) - self.assertEquals(ret, False) - - def test_auth_bad_password(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - resp = requests.Response() - resp.status_code = 401 - resp.body = 'Bad Password' - client._request.side_effect = requests.exceptions.HTTPError( - response=resp) - - with self.assertRaises(okta.InvalidPassword): - client.auth() - - def test_auth(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - client._request.side_effect = [SUCCESS_RESPONSE] - - ret = client.auth() - self.assertEquals(ret, None) - - def test_auth_requires_mfa_enroll(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - client._request.side_effect = [MFA_ENROLL_RESPONSE] - - with self.assertRaises(okta.UnknownError): - client.auth() - - def test_auth_trigger_okta_verify(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - client.okta_verify_with_push = mock.MagicMock( - name='okta_verify_with_push') - - client._request.side_effect = [MFA_CHALLENGE_RESPONSE_OKTA_VERIFY] - - ret = client.auth() - self.assertEquals(ret, None) - client.okta_verify_with_push.assert_has_calls([ - mock.call('abcd', 'token') - ]) - - def test_auth_trigger_okta_verify_canceled(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - client.okta_verify_with_push = mock.MagicMock( - name='okta_verify_with_push') - client.okta_verify_with_push.side_effect = KeyboardInterrupt - - client._request.side_effect = [MFA_CHALLENGE_RESPONSE_OKTA_VERIFY] - - with self.assertRaises(okta.UnknownError): - client.auth() - - def test_auth_throws_passcode_required(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - client._request.side_effect = [MFA_CHALLENGE_RESPONSE_PASSCODE] - - with self.assertRaises(okta.PasscodeRequired): - client.auth() - - def test_auth_with_unexpected_response(self): - client = okta.Okta('organization', 'username', 'password') - client._request = mock.MagicMock(name='_request') - - client._request.side_effect = [{}] - - with self.assertRaises(okta.UnknownError): - client.auth() diff --git a/requirements.test.txt b/requirements.test.txt index 0e962dc..266870d 100644 --- a/requirements.test.txt +++ b/requirements.test.txt @@ -2,4 +2,4 @@ nose>=1.3.7 coverage mock pep8 -pyflakes +flake8 diff --git a/requirements.txt b/requirements.txt index 637f2ac..be07b46 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ requests>=2.10.0 boto3>=1.4.0 future==0.16.0 configparser==3.5.0 +fido2==0.3.0