diff --git a/app/auth/enums/__init__.py b/app/auth/enums/__init__.py new file mode 100644 index 00000000..79dc493d --- /dev/null +++ b/app/auth/enums/__init__.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class LoginStatusEnum(str, Enum): + USER_NOT_FOUND = "user_not_found" # User don't exist in the DB + BAD_CREDENTIALS = "bad_credentials" # User exist but the password is wrong + REQUIRE_2FA = "require_2fa" # User exist and the password is correct, but 2FA is required + BAD_OTP = "bad_otp" # User exist and the password is correct, but the OTP is wrong + INACTIVE_EMPLOYEE = "inactive_employee" # User exist but is not an active employee + SUCCESS = "success" # User exist, password and OTP are correct + EMAIL_QUEUE_ERROR = "email_queue_error" # User exist, error in email queueing \ No newline at end of file diff --git a/app/auth/routers/__init__.py b/app/auth/routers/__init__.py new file mode 100644 index 00000000..b46221f6 --- /dev/null +++ b/app/auth/routers/__init__.py @@ -0,0 +1,5 @@ + +from fastapi import APIRouter + + +router = APIRouter(prefix="/auth", tags=["Autenticação"]) \ No newline at end of file diff --git a/app/auth/routers/basic.py b/app/auth/routers/basic.py new file mode 100644 index 00000000..ef4a52ab --- /dev/null +++ b/app/auth/routers/basic.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +import io +from typing import Annotated + +from fastapi import Depends +from fastapi.security import OAuth2PasswordRequestForm +from fastapi.responses import JSONResponse +from loguru import logger + +from app import config +from app.auth.routers import router +from app.types.pydantic_models import Token +from app.utils import authenticate_user, generate_user_token +from app.enums import LoginStatusEnum +from app.types.errors import ( + AuthenticationErrorModel +) + + +@router.post( + "/token", + response_model=Token, + responses={ + 401: {"model": AuthenticationErrorModel} + } +) +async def login_without_2fa( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], +) -> Token: + + login_result = await authenticate_user(form_data.username, form_data.password) + logger.info(f"login_result: {login_result['status']}") + + if login_result['status'] != LoginStatusEnum.SUCCESS: + return JSONResponse( + status_code=401, + content={ + "message": "Something went wrong", + "type": login_result['status'], + }, + ) + + return { + "access_token": generate_user_token(login_result['user']), + "token_type": "bearer", + "token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES), + } \ No newline at end of file diff --git a/app/auth/routers/email.py b/app/auth/routers/email.py new file mode 100644 index 00000000..b08f3c60 --- /dev/null +++ b/app/auth/routers/email.py @@ -0,0 +1,102 @@ +from fastapi import HTTPException +from fastapi.responses import JSONResponse +from loguru import logger + +from app import config +from app.auth.enums import LoginStatusEnum +from app.types.pydantic_models import Token +from app.auth.types import AuthenticationErrorModel +from app.auth.routers import router +from app.auth.types import LoginForm, LoginFormWith2FA +from app.auth.utils import ( + authenticate_user, + generate_user_token +) +from app.auth.utils.email import ( + generate_2fa_code, + store_code, + validate_code, + send_2fa_email_to_user +) + + +@router.post( + "/2fa/email/generate-code/", + response_model=dict, + responses={ + 401: {"model": AuthenticationErrorModel} + } +) +async def generate_2fa_code( + login: LoginForm +): + result = await authenticate_user(login.username, login.password) + + if result['status'] != LoginStatusEnum.SUCCESS: + raise JSONResponse( + status_code=401, + content=AuthenticationErrorModel( + message="Something went wrong", + type=result['status'] + ), + ) + + code = generate_2fa_code() + store_code(result['user'], code) + + try: + success = await send_2fa_email_to_user(result['user'], code) + except Exception as e: + logger.error(f"Error during the email sending process. {e}") + success = False + + if not success: + raise JSONResponse( + status_code=401, + content=AuthenticationErrorModel( + message="Error during the email queueing process. Try Again.", + type=LoginStatusEnum.EMAIL_QUEUE_ERROR + ), + ) + + return { + "message": "Código enviado com sucesso." + } + + +@router.post( + "/2fa/email/login/", + response_model=Token, + responses={ + 401: {"model": AuthenticationErrorModel} + } +) +async def login_with_2fa( + form_data: LoginFormWith2FA, +) -> Token: + + login_result = await authenticate_user( + username=form_data.username, + password=form_data.password, + code=form_data.code, + verificator=validate_code, + ) + logger.info(f"login_result: {login_result['status']}") + + if login_result['status'] == LoginStatusEnum.SUCCESS: + login_result['user'].is_2fa_activated = True + await login_result['user'].save() + + return { + "access_token": generate_user_token(login_result['user']), + "token_type": "bearer", + "token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES), + } + else: + return JSONResponse( + status_code=401, + content={ + "message": "Something went wrong", + "type": login_result['status'], + }, + ) \ No newline at end of file diff --git a/app/routers/auth.py b/app/auth/routers/totp.py similarity index 71% rename from app/routers/auth.py rename to app/auth/routers/totp.py index 92255001..e32a52ec 100644 --- a/app/routers/auth.py +++ b/app/auth/routers/totp.py @@ -1,58 +1,25 @@ # -*- coding: utf-8 -*- import io -from typing import Annotated -from fastapi import APIRouter, Depends, HTTPException, status -from fastapi.security import OAuth2PasswordRequestForm +from fastapi import HTTPException, status from fastapi.responses import StreamingResponse,JSONResponse from loguru import logger from app import config from app.types.frontend import LoginFormWith2FA, LoginForm from app.types.pydantic_models import Token -from app.utils import authenticate_user, generate_user_token from app.security import TwoFactorAuth -from app.dependencies import assert_user_is_active from app.enums import LoginStatusEnum from app.types.errors import ( AuthenticationErrorModel ) - -router = APIRouter(prefix="/auth", tags=["Autenticação"]) - - -@router.post( - "/token", - response_model=Token, - responses={ - 401: {"model": AuthenticationErrorModel} - } -) -async def login_without_2fa( - form_data: Annotated[OAuth2PasswordRequestForm, Depends()], -) -> Token: - - login_result = await authenticate_user(form_data.username, form_data.password) - logger.info(f"login_result: {login_result['status']}") - - if login_result['status'] != LoginStatusEnum.SUCCESS: - return JSONResponse( - status_code=401, - content={ - "message": "Something went wrong", - "type": login_result['status'], - }, - ) - - return { - "access_token": generate_user_token(login_result['user']), - "token_type": "bearer", - "token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES), - } +from app.auth.routers import router +from app.auth.utils import authenticate_user, generate_user_token +from app.auth.utils.totp import validate_code @router.post( - "/2fa/is-2fa-active/", + "/2fa/totp/is-2fa-active/", response_model=bool, responses={ 401: {"model": AuthenticationErrorModel} @@ -61,7 +28,7 @@ async def login_without_2fa( async def is_2fa_active( form_data: LoginForm, ) -> bool: - login_result = await authenticate_user(form_data.username, form_data.password) + login_result = await authenticate_user(form_data.username,form_data.password) logger.info(f"login_result: {login_result['status']}") if login_result['status'] in [ @@ -81,7 +48,7 @@ async def is_2fa_active( @router.post( - "/2fa/login/", + "/2fa/totp/login/", response_model=Token, responses={ 401: {"model": AuthenticationErrorModel} @@ -94,7 +61,8 @@ async def login_with_2fa( login_result = await authenticate_user( form_data.username, form_data.password, - form_data.totp_code, + form_data.code, + verificator=validate_code, ) logger.info(f"login_result: {login_result['status']}") @@ -118,7 +86,7 @@ async def login_with_2fa( @router.post( - "/2fa/generate-qrcode/", + "/2fa/totp/generate-qrcode/", response_model=bytes, responses={ 400: {"model": str}, diff --git a/app/auth/types/__init__.py b/app/auth/types/__init__.py new file mode 100644 index 00000000..1ef058a1 --- /dev/null +++ b/app/auth/types/__init__.py @@ -0,0 +1,27 @@ +from pydantic import BaseModel + +from app.enums import ( + LoginStatusEnum, + AccessErrorEnum, +) + + +class AuthenticationErrorModel(BaseModel): + message: str + type: LoginStatusEnum + + +class AccessErrorModel(BaseModel): + message: str + type: AccessErrorEnum + + +class LoginForm(BaseModel): + username: str + password: str + + +class LoginFormWith2FA(BaseModel): + username: str + password: str + code: str \ No newline at end of file diff --git a/app/auth/types/email.py b/app/auth/types/email.py new file mode 100644 index 00000000..9a5a3f6a --- /dev/null +++ b/app/auth/types/email.py @@ -0,0 +1,10 @@ +from pydantic import BaseModel + + +class Request2FACode(BaseModel): + user_id: str + email: str + +class Validate2FACode(BaseModel): + user_id: str + code: str \ No newline at end of file diff --git a/app/auth/types/totp.py b/app/auth/types/totp.py new file mode 100644 index 00000000..117ae604 --- /dev/null +++ b/app/auth/types/totp.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel + + +class User2FA(BaseModel): + id: int + username: str + is_2fa_required: bool + is_2fa_activated: bool + + +class Enable2FA(BaseModel): + secret_key: str diff --git a/app/auth/utils/__init__.py b/app/auth/utils/__init__.py new file mode 100644 index 00000000..f518c9c9 --- /dev/null +++ b/app/auth/utils/__init__.py @@ -0,0 +1,128 @@ +from passlib.context import CryptContext +from datetime import datetime, timedelta +from typing import Optional, Tuple +import jwt + +from app.config import config +from app.models import User +from app.enums import LoginStatusEnum +from app.security import employee_verify, totp_verify + + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + + +def generate_user_token(user: User) -> str: + """ + Generates a JWT access token for a given user. + Args: + user (User): The user object for which the token is being generated. + Returns: + str: The generated JWT access token. + """ + + access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) + + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + + return access_token + +async def authenticate_user( + username: str, + password: str, + verificator, # Async Callable[User, str] + code: Optional[str] = None, +) -> Tuple[User, bool, str]: + user = await User.get_or_none(username=username).prefetch_related("role") + + # USER EXISTS + if not user: + return { + "user": None, + "status": LoginStatusEnum.USER_NOT_FOUND, + } + + # CORRECT PASSWORD + is_password_correct = password_verify(password, user.password) + if not is_password_correct: + return { + "user": None, + "status": LoginStatusEnum.BAD_CREDENTIALS, + } + + # ERGON VALIDATION + is_employee = await employee_verify(user) + if not is_employee: + return { + "user": user, + "status": LoginStatusEnum.INACTIVE_EMPLOYEE, + } + + # 2FA TOTP SENT + if user.is_2fa_required and not code: + return { + "user": user, + "status": LoginStatusEnum.REQUIRE_2FA, + } + + # 2FA TOTP VALIDATION + if user.is_2fa_required and code: + is_2fa_valid = await verificator(user, code) + if not is_2fa_valid: + return { + "user": user, + "status": LoginStatusEnum.BAD_OTP, + } + + return { + "user": user, + "status": LoginStatusEnum.SUCCESS, + } + + +def create_access_token(data: dict, expires_delta: timedelta | None = None): + """ + Create an access token. + + Args: + data (dict): The data to encode into the token. + expires_delta (timedelta, optional): The expiry time of the token. + + Returns: + str: The encoded token. + """ + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM) + return encoded_jwt + + +def password_hash(password: str) -> str: + """Hash a password. + + Args: + password (str): The password to hash. + + Returns: + str: The hashed password. + """ + return pwd_context.hash(password) + + +def password_verify(password: str, hashed: str) -> bool: + """Verify a password against a hash. + + Args: + password (str): The password to verify. + hashed (str): The hashed password to verify against. + + Returns: + bool: True if the password matches the hash, False otherwise. + """ + return pwd_context.verify(password, hashed) \ No newline at end of file diff --git a/app/auth/utils/email.py b/app/auth/utils/email.py new file mode 100644 index 00000000..dbc03416 --- /dev/null +++ b/app/auth/utils/email.py @@ -0,0 +1,69 @@ +from loguru import logger +import requests +import redis +import secrets + +from app.models import User +from app.config import ( + REDIS_HOST, + REDIS_PASSWORD, + REDIS_PORT, + DATARELAY_URL, + DATARELAY_MAILMAN_TOKEN, + EMAIL_BODY_2FA, + EMAIL_SUBJECT_2FA, +) + + +redis_client = redis.StrictRedis( + host=REDIS_HOST, + port=REDIS_PORT, + password=REDIS_PASSWORD, + db=0 +) + + +def store_code(user: User, code: str, ttl: int = 300): + redis_client.setex(f"2fa:{user.id}", ttl, code) + + +async def validate_code(user: User, code: str): + stored_code = redis_client.get(f"2fa:{user.id}") + + if store_code is None: + return False + + if stored_code.decode() == code: + redis_client.delete(f"2fa:{user.id}") + return True + + return False + + +def generate_2fa_code(): + return f"{secrets.randbelow(1000000):06}" + + +async def send_2fa_email_to_user(user: User, code: str): + logger.info(f"Sending 2FA code {code} to {user.email}") + response = requests.post( + url=DATARELAY_URL, + headers={ + 'x-api-key': DATARELAY_MAILMAN_TOKEN + }, + body={ + "to_addresses": [user.email], + "cc_addresses": [], + "bcc_addresses": [], + "subject": EMAIL_SUBJECT_2FA, + "body": EMAIL_BODY_2FA.format(code=code), + "is_html_body": True + } + ) + if response.status_code == 200: + return True + else: + return False + + + diff --git a/app/auth/utils/totp.py b/app/auth/utils/totp.py new file mode 100644 index 00000000..b3ceb1a0 --- /dev/null +++ b/app/auth/utils/totp.py @@ -0,0 +1,12 @@ +from app.models import User +from app.security import TwoFactorAuth + + +async def validate_code(user: User, code: str) -> bool: + if user.is_2fa_required is False: + return True + + secret_key = await TwoFactorAuth.get_or_create_secret_key(user) + two_factor_auth = TwoFactorAuth(user, secret_key) + + return two_factor_auth.verify_totp_code(code) diff --git a/app/config/base.py b/app/config/base.py index 165a9de7..6cd88af3 100644 --- a/app/config/base.py +++ b/app/config/base.py @@ -26,6 +26,12 @@ getenv_or_action("JWT_ACCESS_TOKEN_EXPIRE_MINUTES", default="30") ) +# 2FA +DATARELAY_URL = getenv_or_action("DATARELAY_URL", action="raise") +DATARELAY_MAILMAN_TOKEN = getenv_or_action("DATARELAY_MAILMAN_TOKEN", action="raise") +EMAIL_SUBJECT_2FA = getenv_or_action("EMAIL_SUBJECT_2FA", action="raise") +EMAIL_BODY_2FA = getenv_or_action("EMAIL_BODY_2FA", action="raise") + # Request Limit Configuration REQUEST_LIMIT_MAX = int(getenv_or_action("REQUEST_LIMIT_MAX", action="raise")) REQUEST_LIMIT_WINDOW_SIZE = int(getenv_or_action("REQUEST_LIMIT_WINDOW_SIZE", action="raise")) diff --git a/app/types/__init__.py b/app/types/__init__.py new file mode 100644 index 00000000..07009cb9 --- /dev/null +++ b/app/types/__init__.py @@ -0,0 +1,12 @@ +from typing import Optional +from pydantic import BaseModel + + +class Token(BaseModel): + access_token: str + token_type: str + token_expire_minutes: int + + +class TokenData(BaseModel): + username: Optional[str] \ No newline at end of file diff --git a/app/types/errors.py b/app/types/errors.py index 38c44239..e6c71058 100644 --- a/app/types/errors.py +++ b/app/types/errors.py @@ -2,22 +2,10 @@ from pydantic import BaseModel from app.enums import ( - LoginStatusEnum, - AccessErrorEnum, AcceptTermsEnum ) -class AuthenticationErrorModel(BaseModel): - message: str - type: LoginStatusEnum - - -class AccessErrorModel(BaseModel): - message: str - type: AccessErrorEnum - - class TermAcceptanceErrorModel(BaseModel): message: str type: AcceptTermsEnum diff --git a/app/types/frontend.py b/app/types/frontend.py index 034406bc..ff62e165 100644 --- a/app/types/frontend.py +++ b/app/types/frontend.py @@ -3,17 +3,6 @@ from pydantic import BaseModel -class LoginForm(BaseModel): - username: str - password: str - - -class LoginFormWith2FA(BaseModel): - username: str - password: str - totp_code: str - - # Clinic Family model class FamilyClinic(BaseModel): cnes: Optional[str] diff --git a/app/types/pydantic_models.py b/app/types/pydantic_models.py index 8727020a..2be1f40f 100644 --- a/app/types/pydantic_models.py +++ b/app/types/pydantic_models.py @@ -4,35 +4,6 @@ from pydantic import BaseModel - -class User2FA(BaseModel): - id: int - username: str - is_2fa_required: bool - is_2fa_activated: bool - - -class AddressModel(BaseModel): - use: Optional[str] - type: Optional[str] - line: str - city: str - country: str - state: str - postal_code: Optional[str] - start: Optional[date] - end: Optional[date] - - -class TelecomModel(BaseModel): - system: Optional[str] - use: Optional[str] - value: str - rank: Optional[int] - start: Optional[date] - end: Optional[date] - - class DataSourceModel(BaseModel): system: str cnes: str @@ -53,21 +24,6 @@ class UserRegisterOutputModel(BaseModel): data_source: DataSourceModel -class CnsModel(BaseModel): - value: str - is_main: bool - - -class Token(BaseModel): - access_token: str - token_type: str - token_expire_minutes: int - - -class TokenData(BaseModel): - username: Optional[str] - - class RawDataModel(BaseModel): id: Optional[int] patient_cpf: str @@ -91,248 +47,3 @@ class BulkInsertOutputModel(BaseModel): count: int datalake_status: Optional[UploadToDatalakeStatusModel] - -class ConditionListModel(BaseModel): - code: str - clinical_status: Optional[str] - category: Optional[str] - date: datetime - - -class PatientConditionListModel(BaseModel): - patient_cpf: str - patient_code: str - conditions: List[ConditionListModel] - - -class PatientModel(BaseModel): - active: Optional[bool] = True - birth_city: Optional[str] - birth_state: Optional[str] - birth_country: Optional[str] - birth_date: date - patient_cpf: str - patient_code: str - deceased: Optional[bool] = False - deceased_date: Optional[date] - father_name: Optional[str] - gender: str - mother_name: Optional[str] - name: str - nationality: Optional[str] - protected_person: Optional[bool] - race: Optional[str] - cns_list: Optional[List[CnsModel]] - telecom_list: Optional[List[TelecomModel]] - address_list: Optional[List[AddressModel]] - - -class CompletePatientModel(BaseModel): - birth_date: date - patient_cpf: str - patient_code: str - gender: str - name: str - cns_list: List[CnsModel] - telecom_list: List[TelecomModel] - address_list: List[AddressModel] - active: Optional[bool] = True - birth_city: Optional[str] - birth_state: Optional[str] - birth_country: Optional[str] - deceased: Optional[bool] = False - deceased_date: Optional[date] - father_name: Optional[str] - mother_name: Optional[str] - nationality: Optional[str] - protected_person: Optional[bool] - race: Optional[str] - - -class StandardizedAddressModel(BaseModel): - use: Optional[str] - type: Optional[str] - line: str - city: str - country: str - state: str - postal_code: Optional[str] - start: Optional[str] - end: Optional[str] - - -class StandardizedTelecomModel(BaseModel): - system: Optional[str] - use: Optional[str] - value: str - rank: Optional[int] - start: Optional[str] - end: Optional[str] - - -class StandardizedPatientRecordModel(BaseModel): - active: Optional[bool] = True - birth_city_cod: Optional[str] - birth_state_cod: Optional[str] - birth_country_cod: Optional[str] - birth_date: date - patient_cpf: str - patient_code: str - deceased: Optional[bool] = False - deceased_date: Optional[date] - father_name: Optional[str] - gender: str - mother_name: Optional[str] - name: str - nationality: Optional[str] - protected_person: Optional[bool] - race: Optional[str] - cns_list: Optional[List[CnsModel]] - address_list: Optional[List[StandardizedAddressModel]] - telecom_list: Optional[List[StandardizedTelecomModel]] - raw_source_id: str - is_valid: Optional[bool] - - -class StandardizedPatientConditionModel(BaseModel): - patient_cpf: str - patient_code: str - cid: str - ciap: Optional[str] - clinical_status: Optional[str] - category: Optional[str] - date: datetime - raw_source_id: str - is_valid: Optional[bool] - - -PatientData = TypeVar( - "PatientData", StandardizedPatientRecordModel, StandardizedPatientConditionModel -) - - -class MergeableRecord(Generic[PatientData], BaseModel): - standardized_record: PatientData - source: DataSourceModel - event_moment: datetime - ingestion_moment: datetime - - -class PatientMergeableRecord(Generic[PatientData], BaseModel): - patient_code: str - mergeable_records: List[MergeableRecord[PatientData]] - - -class RecordListModel(Generic[PatientData], BaseModel): - records: List[PatientData] - - -class StandardizedPatientRecordListModel(BaseModel): - records: List[StandardizedPatientRecordModel] - - -class StandardizedPatientConditionListModel(BaseModel): - conditions: List[StandardizedPatientConditionModel] - - -PageableData = TypeVar("PageableData", PatientMergeableRecord, Any) - - -class Page(Generic[PageableData], BaseModel): - items: List[PageableData] - current_page: int - page_count: int - - -# ====================================== -# MERGE INPUT MODELS -# ====================================== -class MergedPatient(BaseModel): - active: Optional[bool] = True - birth_city: Optional[str] - birth_state: Optional[str] - birth_country: Optional[str] - birth_date: date - patient_cpf: str - patient_code: str - deceased: Optional[bool] = False - deceased_date: Optional[date] - father_name: Optional[str] - gender: str - mother_name: Optional[str] - name: str - nationality: Optional[str] - protected_person: Optional[bool] - race: Optional[str] - - -class MergedPatientAddress(BaseModel): - patient_code: str - line: str - city: str - use: Optional[str] - type: Optional[str] - postal_code: Optional[str] - start: Optional[date] - end: Optional[date] - - -class MergedPatientCns(BaseModel): - patient_code: str - value: str - is_main: bool - - -class MergedPatientTelecom(BaseModel): - patient_code: str - value: str - system: Optional[str] - use: Optional[str] - rank: Optional[int] - start: Optional[date] - end: Optional[date] - - -class OccupationModel(BaseModel): - id_cbo: str - cbo: str - id_cbo_familia: Optional[str] - cbo_familia: Optional[str] - - -class RegistryModel(BaseModel): - id_registro_conselho: str - id_tipo_conselho: Optional[str] - - -class ProfessionalModel(BaseModel): - id_profissional_sus: str - cns: Optional[str] - cpf: Optional[str] - nome: str - cbo: List[OccupationModel] - conselho: List[RegistryModel] - - -class TeamModel(BaseModel): - id_ine: str - nome_referencia: str - id_cnes: str - id_equipe_tipo: str - equipe_tipo_descricao: str - id_area: str - area_descricao: str - telefone: Optional[str] - medicos: List[str] - enfermeiros: List[str] - auxiliares_tecnicos_enfermagem: List[str] - agentes_comunitarios: List[str] - auxiliares_tecnico_saude_bucal: List[str] - dentista: List[str] - outros_profissionais: List[str] - ultima_atualizacao_profissionais: date - ultima_atualizacao_infos_equipe: date - - -class Enable2FA(BaseModel): - secret_key: str diff --git a/app/utils.py b/app/utils.py index 3f3d6b87..74233e48 100644 --- a/app/utils.py +++ b/app/utils.py @@ -1,11 +1,8 @@ # -*- coding: utf-8 -*- -from datetime import datetime, timedelta import hashlib import json import os import base64 -from typing import Optional, Tuple -import jwt from google.cloud import bigquery from google.oauth2 import service_account @@ -13,38 +10,14 @@ from loguru import logger from fastapi import Request from fastapi.responses import JSONResponse -from passlib.context import CryptContext - -from app import config from app.models import User -from app.security import TwoFactorAuth -from app.enums import AccessErrorEnum, LoginStatusEnum +from app.enums import AccessErrorEnum from app.config import ( BIGQUERY_PROJECT, BIGQUERY_PATIENT_HEADER_TABLE_ID, BIGQUERY_ERGON_TABLE_ID, ) -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") - - -def generate_user_token(user: User) -> str: - """ - Generates a JWT access token for a given user. - Args: - user (User): The user object for which the token is being generated. - Returns: - str: The generated JWT access token. - """ - - access_token_expires = timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) - - access_token = create_access_token( - data={"sub": user.username}, expires_delta=access_token_expires - ) - - return access_token - async def employee_verify(user: User) -> bool: if not user.is_ergon_validation_required: @@ -65,123 +38,6 @@ async def employee_verify(user: User) -> bool: return True -async def totp_verify(user: User, totp_code: str) -> bool: - if user.is_2fa_required is False: - return True - - secret_key = await TwoFactorAuth.get_or_create_secret_key(user) - two_factor_auth = TwoFactorAuth(user, secret_key) - - return two_factor_auth.verify_totp_code(totp_code) - - -async def authenticate_user( - username: str, - password: str, - totp_code: Optional[str] = None -) -> Tuple[User, bool, str]: - """Authenticate a user. - - Args: - username (str): The username of the user to authenticate. - password (str): The password of the user to authenticate. - - Returns: - User: The authenticated user. - """ - user = await User.get_or_none(username=username).prefetch_related("role") - - # USER EXISTS - if not user: - return { - "user": None, - "status": LoginStatusEnum.USER_NOT_FOUND, - } - - # CORRECT PASSWORD - is_password_correct = password_verify(password, user.password) - if not is_password_correct: - return { - "user": None, - "status": LoginStatusEnum.BAD_CREDENTIALS, - } - - # ERGON VALIDATION - is_employee = await employee_verify(user) - if not is_employee: - return { - "user": user, - "status": LoginStatusEnum.INACTIVE_EMPLOYEE, - } - - # 2FA TOTP SENT - if user.is_2fa_required and not totp_code: - return { - "user": user, - "status": LoginStatusEnum.REQUIRE_2FA, - } - - # 2FA TOTP VALIDATION - if user.is_2fa_required and totp_code: - is_2fa_valid = await totp_verify(user, totp_code) - if not is_2fa_valid: - return { - "user": user, - "status": LoginStatusEnum.BAD_OTP, - } - - return { - "user": user, - "status": LoginStatusEnum.SUCCESS, - } - - -def create_access_token(data: dict, expires_delta: timedelta | None = None): - """ - Create an access token. - - Args: - data (dict): The data to encode into the token. - expires_delta (timedelta, optional): The expiry time of the token. - - Returns: - str: The encoded token. - """ - to_encode = data.copy() - if expires_delta: - expire = datetime.utcnow() + expires_delta - else: - expire = datetime.utcnow() + timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES) - to_encode.update({"exp": expire}) - encoded_jwt = jwt.encode(to_encode, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM) - return encoded_jwt - - -def password_hash(password: str) -> str: - """Hash a password. - - Args: - password (str): The password to hash. - - Returns: - str: The hashed password. - """ - return pwd_context.hash(password) - - -def password_verify(password: str, hashed: str) -> bool: - """Verify a password against a hash. - - Args: - password (str): The password to verify. - hashed (str): The hashed password to verify against. - - Returns: - bool: True if the password matches the hash, False otherwise. - """ - return pwd_context.verify(password, hashed) - - def generate_dictionary_fingerprint(dict_obj: dict) -> str: """ Generate a fingerprint for a dictionary object.