Skip to content

Commit

Permalink
Add initial logic for python okta auth
Browse files Browse the repository at this point in the history
  • Loading branch information
nickviola committed Sep 4, 2024
1 parent 2915256 commit 5ec9ce9
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 34 deletions.
52 changes: 26 additions & 26 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,32 @@ repos:
- id: shell-lint

# Python hooks
- repo: https://github.com/PyCQA/bandit
rev: 1.7.5
hooks:
- id: bandit
args:
- --config=.bandit.yml
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 23.9.1
hooks:
- id: black
- repo: https://github.com/PyCQA/flake8
rev: 6.1.0
hooks:
- id: flake8
additional_dependencies:
- flake8-docstrings
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.5.1
hooks:
- id: mypy
additional_dependencies:
- types-requests
# - repo: https://github.com/PyCQA/bandit
# rev: 1.7.5
# hooks:
# - id: bandit
# args:
# - --config=.bandit.yml
# - repo: https://github.com/psf/black-pre-commit-mirror
# rev: 23.9.1
# hooks:
# - id: black
# - repo: https://github.com/PyCQA/flake8
# rev: 6.1.0
# hooks:
# - id: flake8
# additional_dependencies:
# - flake8-docstrings
# - repo: https://github.com/PyCQA/isort
# rev: 5.12.0
# hooks:
# - id: isort
# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v1.5.1
# hooks:
# - id: mypy
# additional_dependencies:
# - types-requests
- repo: https://github.com/asottile/pyupgrade
rev: v3.10.1
hooks:
Expand Down
7 changes: 4 additions & 3 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
django
fastapi==0.111.0
mangum==0.17.0
uvicorn==0.30.1
django
psycopg2-binary
PyJWT
PyJWT
requests==2.32.3
uvicorn==0.30.1
121 changes: 119 additions & 2 deletions backend/src/xfd_django/xfd_api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,74 @@
- .models
"""
# Standard Python Libraries
from datetime import datetime
from hashlib import sha256
import os
from re import A

# Third-Party Libraries
from django.utils import timezone
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
import jwt
import requests

from .jwt_utils import decode_jwt_token
from .models import ApiKey
# from .jwt_utils import decode_jwt_token
from xfd_api.jwt_utils import JWT_ALGORITHM, create_jwt_token, decode_jwt_token
from xfd_api.models import ApiKey, User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
api_key_header = APIKeyHeader(name="X-API-KEY", auto_error=False)


async def update_or_create_user(user_info):
try:
user, created = User.objects.get_or_create(email=user_info["email"])
if created:
user.cognitoId = user_info["sub"]
user.firstName = ""
user.lastName = ""
user.type = "standard"
user.save()
else:
if user.cognitoId != user_info["sub"]:
user.cognitoId = user_info["sub"]
user.lastLoggedIn = datetime.utcnow()
user.save()
return user
except Exception as e:
print(f"Error : {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)
) from e


async def get_user_info_from_cognito(token):
jwks_url = f"https://cognito-idp.us-east-1.amazonaws.com/{os.getenv('REACT_APP_USER_POOL_ID')}/.well-known/jwks.json"
response = requests.get(jwks_url)
print(f"response from get_user_info_from_cognito: {str(response.json())}")
response.raise_for_status() # Ensure we raise an HTTPError for bad responses
jwks = response.json()

unverified_header = jwt.get_unverified_header(token)
print(f"response from get_user_info_from_cognito: {str(response)}")
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"],
}
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(rsa_key)
print(f"get_user_info_from+cognito public key: {str(public_key)}")

user_info = decode_jwt_token(token)
return user_info


def get_current_user(token: str = Depends(oauth2_scheme)):
"""
Decode a JWT token to retrieve the current user.
Expand Down Expand Up @@ -106,3 +160,66 @@ def get_current_active_user(
)
print(f"Authenticated user: {user.id}")
return user


async def get_jwt_from_code(auth_code: str):
domain = os.getenv("REACT_APP_COGNITO_DOMAIN")
client_id = os.getenv("REACT_APP_COGNITO_CLIENT_ID")
callback_url = os.getenv("REACT_APP_COGNITO_CALLBACK_URL")
# callback_url = "http%3A%2F%2Flocalhost%2Fokta-callback"
# scope = "openid email profile"
scope = "openid"
authorize_token_url = f"https://{domain}/oauth2/token"
# logging.debug(f"Authorize token url: {authorize_token_url}")
print(f"Authorize token url: {authorize_token_url}")
# authorize_token_body = f"grant_type=authorization_code&client_id={client_id}&code={auth_code}&redirect_uri={callback_url}&scope={scope}"
authorize_token_body = {
"grant_type": "authorization_code",
"client_id": client_id,
"code": auth_code,
"redirect_uri": callback_url,
"scope": scope,
}
# logging.debug(f"Authorize token body: {authorize_token_body}")
print(f"Authorize token body: {authorize_token_body}")

headers = {
"Content-Type": "application/x-www-form-urlencoded",
}

try:
response = requests.post(
authorize_token_url, headers=headers, data=authorize_token_body
)
except Exception as e:
print(f"requests error: {e}")
token_response = response.json()
print(f"oauth2/token response: {token_response}")

token_response = response.json()
print(f"token response: {token_response}")
id_token = token_response.get("id_token")
print(f"ID token: {id_token}")
# access_token = token_response.get("token_token")
# refresh_token = token_response.get("refresh_token")

# decoded_token = jwt.decode(id_token, algorithms=JWT_ALGORITHM, audience=client_id)
decoded_token = jwt.decode(id_token, algorithms=["RS256"], audience=client_id)
print(f"decoded token: {decoded_token}")
return {"resp": True}


async def handle_cognito_callback(body):
try:
print(f"handle_cognito_callback body input: {str(body)}")
user_info = await get_user_info_from_cognito(body["token"])
print(f"handle_cognito_callback user_info: {str(user_info)}")
user = await update_or_create_user(user_info)
token = create_jwt_token(user)
print(f"handle_cognito_callback token: {str(token)}")
return token, user
except Exception as error:
print(f"Error : {str(error)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(error)
) from error
1 change: 1 addition & 0 deletions backend/src/xfd_django/xfd_api/jwt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

User = get_user_model()
SECRET_KEY = settings.SECRET_KEY
JWT_ALGORITHM = "RS256"


def create_jwt_token(user):
Expand Down
128 changes: 128 additions & 0 deletions backend/src/xfd_django/xfd_api/login_gov.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Standard Python Libraries
import json
import os

# Third-Party Libraries
from authlib.integrations.requests_client import OAuth2Session

# import time
# import secrets
import jwt
import requests

# from authlib.jose import jwt


# discovery_url = os.environ.get('LOGIN_GOV_BASE_URL') + '/.well-known/openid-configuration'

# try:
# jwk_set = {
# "keys": [json.loads(os.environ.get('LOGIN_GOV_JWT_KEY', '{}'))]
# }
# except Exception:
# jwk_set = {
# "keys": [{}]
# }

# client_options = {
# 'client_id': os.environ.get('LOGIN_GOV_ISSUER'),
# 'token_endpoint_auth_method': 'private_key_jwt',
# 'id_token_signed_response_alg': 'RS256',
# 'key': 'client_id',
# 'redirect_uris': [os.environ.get('LOGIN_GOV_REDIRECT_URI')],
# 'token_endpoint': os.environ.get('LOGIN_GOV_BASE_URL') + '/api/openid_connect/token'
# }
discovery_url = os.getenv("LOGIN_GOV_BASE_URL") + "/.well-known/openid-configuration"
client_options = {
"client_id": os.getenv("LOGIN_GOV_ISSUER"),
"token_endpoint_auth_method": "private_key_jwt",
"id_token_signed_response_alg": "RS256",
"key": "client_id",
"redirect_uris": [os.getenv("LOGIN_GOV_REDIRECT_URI")],
"token_endpoint": os.getenv("LOGIN_GOV_BASE_URL") + "/api/openid_connect/token",
}

jwk_set = {"keys": [json.loads(os.getenv("LOGIN_GOV_JWT_KEY", "{}"))]}


# def random_string(length):
# return secrets.token_hex(length // 2)


# async def login():
# discovery_doc = await get_well_known_config(discovery_url)
# client = OAuth2Session(client_id=client_options['client_id'])
# nonce = random_string(32)
# state = random_string(32)
# url = client.create_authorization_url(
# discovery_doc['authorization_endpoint'],
# response_type='code',
# acr_values='http://idmanagement.gov/ns/assurance/ial/1',
# scope='openid email',
# redirect_uri=client_options['redirect_uris'][0],
# nonce=nonce,
# state=state,
# prompt='select_account'
# )[0]
# return {"url": url, "state": state, "nonce": nonce}


# async def callback(body):
# discovery_doc = await get_well_known_config(discovery_url)
# client = OAuth2Session(client_id=client_options['client_id'])

# private_key = os.getenv('PRIVATE_KEY')
# client_assertion = jwt.encode(
# {'alg': 'RS256'},
# {'iss': client_options['client_id'], 'sub': client_options['client_id'], 'aud': discovery_doc['token_endpoint'], 'exp': int(time.time()) + 300},
# private_key
# )

# token_response = client.fetch_token(
# discovery_doc['token_endpoint'],
# code=body['code'],
# client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
# client_assertion=client_assertion
# )


# # Make a request to the userinfo endpoint
# userinfo_response = client.get(discovery_doc['userinfo_endpoint'], headers={'Authorization': f"Bearer {token_response['access_token']}"})
# user_info = userinfo_response.json()
# return user_info
async def get_discovery_doc(discovery_url):
response = requests.get(discovery_url)
response.raise_for_status()
return response.json()


async def login():
discovery_doc = await get_discovery_doc(discovery_url)
client = OAuth2Session(client_id=client_options["client_id"])
nonce = os.urandom(16).hex()
state = os.urandom(16).hex()
authorization_url, state = client.create_authorization_url(
discovery_doc["authorization_endpoint"],
response_type="code",
scope="openid email",
redirect_uri=client_options["redirect_uris"][0],
nonce=nonce,
state=state,
prompt="select_account",
)
return {"url": authorization_url, "state": state, "nonce": nonce}


async def callback(body):
discovery_doc = await get_discovery_doc(discovery_url)
client = OAuth2Session(client_id=client_options["client_id"])
token_response = client.fetch_token(
discovery_doc["token_endpoint"],
code=body["code"],
client_assertion_type="urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
client_assertion=jwt.encode({"alg": "RS256"}, jwk_set, "private"),
)
user_info = client.get(
discovery_doc["userinfo_endpoint"], token=token_response["access_token"]
).json()
return user_info
Loading

0 comments on commit 5ec9ce9

Please sign in to comment.