This repository has been archived by the owner on Aug 3, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Add U2F 2FA support * Continue on passcode too short * Fix imports and quotes * Cleanup class registration * Update MFA docs
- Loading branch information
Showing
16 changed files
with
959 additions
and
398 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import logging | ||
import requests | ||
import sys | ||
|
||
if sys.version_info[0] < 3: # Python 2 | ||
from exceptions import Exception | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
BASE_URL = 'https://{organization}.okta.com' | ||
|
||
|
||
class BaseException(Exception): | ||
'''Base Exception for Okta Auth''' | ||
|
||
|
||
class BaseOktaClient(object): | ||
def __init__(self, organization): | ||
self.base_url = BASE_URL.format(organization=organization) | ||
self.session = requests.Session() | ||
|
||
def _request(self, path, data=None): | ||
'''Basic URL Fetcher for Okta | ||
Any HTTPError is raised immediately, otherwise the response is parsed | ||
as JSON and passed back as a dictionary. | ||
Args: | ||
path: The path at the base url to call | ||
data: Optional data to pass in as Post parameters | ||
Returns: | ||
The response in dict form. | ||
''' | ||
headers = {'Accept': 'application/json', | ||
'Content-Type': 'application/json'} | ||
|
||
if path.startswith('http'): | ||
url = path | ||
else: | ||
url = '{base}/api/v1{path}'.format(base=self.base_url, path=path) | ||
|
||
resp = self.session.post(url=url, headers=headers, json=data, | ||
allow_redirects=False) | ||
|
||
resp_obj = resp.json() | ||
log.debug(resp_obj) | ||
|
||
resp.raise_for_status() | ||
return resp_obj |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import abc | ||
import logging | ||
|
||
from nd_okta_auth import base_client | ||
|
||
|
||
class FactorVerificationFailed(base_client.BaseException): | ||
'''Failed to authenticate with second factor''' | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class Factor(base_client.BaseOktaClient): | ||
__metaclass__ = abc.ABCMeta | ||
|
||
verify_path = '/authn/factors/{fid}/verify' | ||
|
||
def __init__(self, organization): | ||
base_client.BaseOktaClient.__init__(self, organization) | ||
|
||
@abc.abstractmethod | ||
def name(self): | ||
"""Name of the second factor. Must be same as Okta's `factorType`""" | ||
return | ||
|
||
@abc.abstractmethod | ||
def verify(self, fid, state_token, sleep): | ||
"""Verify a user with a second factor.""" | ||
return | ||
|
||
|
||
def factors(organization): | ||
from nd_okta_auth.factor.u2f import U2fFactor # noqa: F401 | ||
from nd_okta_auth.factor.push import PushFactor # noqa: F401 | ||
from nd_okta_auth.factor.totp import TotpFactor # noqa: F401 | ||
|
||
return [ | ||
U2fFactor(organization), | ||
PushFactor(organization), | ||
TotpFactor(organization) | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import logging | ||
import time | ||
|
||
from nd_okta_auth.factor import Factor, FactorVerificationFailed | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class PushFactor(Factor): | ||
|
||
def name(self): | ||
return 'push' | ||
|
||
def verify(self, fid, state_token, sleep): | ||
'''Triggers an Okta Push Verification and waits. | ||
This method is meant to be called by self.auth() if a Login session | ||
requires MFA, and the users profile supports Okta Push with Verify. | ||
We trigger the push, and then immediately go into a wait loop. Each | ||
time we loop around, we pull the latest status for that push event. If | ||
its Declined, we will throw an error. If its accepted, we write out our | ||
SessionToken. | ||
Args: | ||
fid: Okta Factor ID used to trigger the push | ||
state_token: State Token allowing us to trigger the push | ||
sleep: amount of time to sleep between checking for push status | ||
''' | ||
log.warning('Okta Verify Push being sent...') | ||
path = self.verify_path.format(fid=fid) | ||
data = {'fid': fid, | ||
'stateToken': state_token} | ||
ret = self._request(path, data) | ||
|
||
while ret['status'] != 'SUCCESS': | ||
log.info('Waiting for Okta Verification...') | ||
time.sleep(sleep) | ||
|
||
if ret.get('factorResult', 'REJECTED') == 'REJECTED': | ||
raise FactorVerificationFailed('Okta Verify Push REJECTED') | ||
|
||
links = ret.get('_links') | ||
ret = self._request(links['next']['href'], data) | ||
|
||
return ret |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
from __future__ import unicode_literals | ||
|
||
import sys | ||
import unittest | ||
|
||
import requests | ||
|
||
from nd_okta_auth.factor import push as factor | ||
|
||
if sys.version_info[0] < 3: # Python 2 | ||
import mock | ||
else: | ||
from unittest import mock | ||
|
||
# Successful response message from Okta when you have fully logged in | ||
SUCCESS_RESPONSE = { | ||
'status': 'SUCCESS', | ||
'expiresAt': '2017-07-24T17:05:59.000Z', | ||
'_embedded': { | ||
'user': { | ||
'profile': { | ||
'locale': 'en', | ||
'lastName': 'Foo', | ||
'login': '[email protected]', | ||
'firstName': 'Bob', 'timeZone': | ||
'America/Los_Angeles'}, | ||
'id': 'XXXIDXXX' | ||
} | ||
}, | ||
'sessionToken': 'XXXTOKENXXX' | ||
} | ||
|
||
MFA_CHALLENGE_RESPONSE_OKTA_VERIFY = { | ||
'status': 'MFA_REQUIRED', | ||
'_embedded': { | ||
'factors': [ | ||
{ | ||
'factorType': 'push', | ||
'id': 'abcd', | ||
} | ||
] | ||
}, | ||
'stateToken': 'token', | ||
} | ||
MFA_WAITING_RESPONSE = { | ||
'status': 'MFA_CHALLENGE', | ||
'factorResult': 'WAITING', | ||
'_links': { | ||
'next': { | ||
'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', | ||
} | ||
}, | ||
'stateToken': 'token', | ||
} | ||
MFA_REJECTED_RESPONSE = { | ||
'status': 'MFA_CHALLENGE', | ||
'factorResult': 'REJECTED', | ||
'_links': { | ||
'next': { | ||
'href': 'https://foobar.okta.com/api/v1/authn/factors/X/verify', | ||
} | ||
}, | ||
'stateToken': 'token', | ||
} | ||
|
||
|
||
class OktaTest(unittest.TestCase): | ||
def test_push_name(self): | ||
push_factor = factor.PushFactor('foobar') | ||
self.assertEqual('push', push_factor.name()) | ||
|
||
def test_push_success(self): | ||
push_factor = factor.PushFactor('https://foobar.okta.com') | ||
push_factor._request = mock.MagicMock(name='_request') | ||
|
||
push_factor._request.side_effect = [ | ||
MFA_WAITING_RESPONSE, | ||
MFA_WAITING_RESPONSE, | ||
MFA_WAITING_RESPONSE, | ||
SUCCESS_RESPONSE, | ||
] | ||
|
||
ret = push_factor.verify('123', 'token', 0.1) | ||
self.assertEqual(ret, SUCCESS_RESPONSE) | ||
|
||
def test_push_rejected(self): | ||
push_factor = factor.PushFactor('https://foobar.okta.com') | ||
push_factor._request = mock.MagicMock(name='_request') | ||
|
||
push_factor._request.side_effect = [ | ||
MFA_WAITING_RESPONSE, | ||
MFA_WAITING_RESPONSE, | ||
MFA_WAITING_RESPONSE, | ||
MFA_REJECTED_RESPONSE, | ||
] | ||
|
||
with self.assertRaises(factor.FactorVerificationFailed): | ||
push_factor.verify('123', 'token', 0.1) | ||
|
||
def test_push_unknown_failure(self): | ||
push_factor = factor.PushFactor('https://foobar.okta.com') | ||
push_factor.get_passcode = mock.MagicMock(name='get_passcode') | ||
push_factor.get_passcode.return_value = 123456 | ||
|
||
push_factor._request = mock.MagicMock(name='_request') | ||
|
||
resp = requests.Response() | ||
resp.status_code = 500 | ||
resp.body = "Internal Server Error" | ||
push_factor._request.side_effect = requests.exceptions.HTTPError( | ||
response=resp) | ||
|
||
with self.assertRaises(requests.exceptions.HTTPError): | ||
push_factor.verify('123', 'token', 0.1) |
Oops, something went wrong.