Skip to content

Commit

Permalink
[wip] feat: refactoring code to allow email 2fa option
Browse files Browse the repository at this point in the history
  • Loading branch information
TanookiVerde committed Dec 3, 2024
1 parent dd2b549 commit d9faa0d
Show file tree
Hide file tree
Showing 17 changed files with 452 additions and 499 deletions.
11 changes: 11 additions & 0 deletions app/auth/enums/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum


class LoginStatusEnum(str, Enum):
USER_NOT_FOUND = "user_not_found" # User don't exist in the DB
BAD_CREDENTIALS = "bad_credentials" # User exist but the password is wrong
REQUIRE_2FA = "require_2fa" # User exist and the password is correct, but 2FA is required
BAD_OTP = "bad_otp" # User exist and the password is correct, but the OTP is wrong
INACTIVE_EMPLOYEE = "inactive_employee" # User exist but is not an active employee
SUCCESS = "success" # User exist, password and OTP are correct
EMAIL_QUEUE_ERROR = "email_queue_error" # User exist, error in email queueing
5 changes: 5 additions & 0 deletions app/auth/routers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

from fastapi import APIRouter


router = APIRouter(prefix="/auth", tags=["Autenticação"])
47 changes: 47 additions & 0 deletions app/auth/routers/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
import io
from typing import Annotated

from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm
from fastapi.responses import JSONResponse
from loguru import logger

from app import config
from app.auth.routers import router
from app.types.pydantic_models import Token
from app.utils import authenticate_user, generate_user_token
from app.enums import LoginStatusEnum
from app.types.errors import (
AuthenticationErrorModel
)


@router.post(
"/token",
response_model=Token,
responses={
401: {"model": AuthenticationErrorModel}
}
)
async def login_without_2fa(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:

login_result = await authenticate_user(form_data.username, form_data.password)
logger.info(f"login_result: {login_result['status']}")

if login_result['status'] != LoginStatusEnum.SUCCESS:
return JSONResponse(
status_code=401,
content={
"message": "Something went wrong",
"type": login_result['status'],
},
)

return {
"access_token": generate_user_token(login_result['user']),
"token_type": "bearer",
"token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
}
102 changes: 102 additions & 0 deletions app/auth/routers/email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from loguru import logger

from app import config
from app.auth.enums import LoginStatusEnum
from app.types.pydantic_models import Token
from app.auth.types import AuthenticationErrorModel
from app.auth.routers import router
from app.auth.types import LoginForm, LoginFormWith2FA
from app.auth.utils import (
authenticate_user,
generate_user_token
)
from app.auth.utils.email import (
generate_2fa_code,
store_code,
validate_code,
send_2fa_email_to_user
)


@router.post(
"/2fa/email/generate-code/",
response_model=dict,
responses={
401: {"model": AuthenticationErrorModel}
}
)
async def generate_2fa_code(
login: LoginForm
):
result = await authenticate_user(login.username, login.password)

if result['status'] != LoginStatusEnum.SUCCESS:
raise JSONResponse(
status_code=401,
content=AuthenticationErrorModel(
message="Something went wrong",
type=result['status']
),
)

code = generate_2fa_code()
store_code(result['user'], code)

try:
success = await send_2fa_email_to_user(result['user'], code)
except Exception as e:
logger.error(f"Error during the email sending process. {e}")
success = False

if not success:
raise JSONResponse(
status_code=401,
content=AuthenticationErrorModel(
message="Error during the email queueing process. Try Again.",
type=LoginStatusEnum.EMAIL_QUEUE_ERROR
),
)

return {
"message": "Código enviado com sucesso."
}


@router.post(
"/2fa/email/login/",
response_model=Token,
responses={
401: {"model": AuthenticationErrorModel}
}
)
async def login_with_2fa(
form_data: LoginFormWith2FA,
) -> Token:

login_result = await authenticate_user(
username=form_data.username,
password=form_data.password,
code=form_data.code,
verificator=validate_code,
)
logger.info(f"login_result: {login_result['status']}")

if login_result['status'] == LoginStatusEnum.SUCCESS:
login_result['user'].is_2fa_activated = True
await login_result['user'].save()

return {
"access_token": generate_user_token(login_result['user']),
"token_type": "bearer",
"token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
}
else:
return JSONResponse(
status_code=401,
content={
"message": "Something went wrong",
"type": login_result['status'],
},
)
52 changes: 10 additions & 42 deletions app/routers/auth.py → app/auth/routers/totp.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,25 @@
# -*- coding: utf-8 -*-
import io
from typing import Annotated

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from fastapi import HTTPException, status
from fastapi.responses import StreamingResponse,JSONResponse
from loguru import logger

from app import config
from app.types.frontend import LoginFormWith2FA, LoginForm
from app.types.pydantic_models import Token
from app.utils import authenticate_user, generate_user_token
from app.security import TwoFactorAuth
from app.dependencies import assert_user_is_active
from app.enums import LoginStatusEnum
from app.types.errors import (
AuthenticationErrorModel
)

router = APIRouter(prefix="/auth", tags=["Autenticação"])


@router.post(
"/token",
response_model=Token,
responses={
401: {"model": AuthenticationErrorModel}
}
)
async def login_without_2fa(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:

login_result = await authenticate_user(form_data.username, form_data.password)
logger.info(f"login_result: {login_result['status']}")

if login_result['status'] != LoginStatusEnum.SUCCESS:
return JSONResponse(
status_code=401,
content={
"message": "Something went wrong",
"type": login_result['status'],
},
)

return {
"access_token": generate_user_token(login_result['user']),
"token_type": "bearer",
"token_expire_minutes": int(config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
}
from app.auth.routers import router
from app.auth.utils import authenticate_user, generate_user_token
from app.auth.utils.totp import validate_code


@router.post(
"/2fa/is-2fa-active/",
"/2fa/totp/is-2fa-active/",
response_model=bool,
responses={
401: {"model": AuthenticationErrorModel}
Expand All @@ -61,7 +28,7 @@ async def login_without_2fa(
async def is_2fa_active(
form_data: LoginForm,
) -> bool:
login_result = await authenticate_user(form_data.username, form_data.password)
login_result = await authenticate_user(form_data.username,form_data.password)
logger.info(f"login_result: {login_result['status']}")

if login_result['status'] in [
Expand All @@ -81,7 +48,7 @@ async def is_2fa_active(


@router.post(
"/2fa/login/",
"/2fa/totp/login/",
response_model=Token,
responses={
401: {"model": AuthenticationErrorModel}
Expand All @@ -94,7 +61,8 @@ async def login_with_2fa(
login_result = await authenticate_user(
form_data.username,
form_data.password,
form_data.totp_code,
form_data.code,
verificator=validate_code,
)
logger.info(f"login_result: {login_result['status']}")

Expand All @@ -118,7 +86,7 @@ async def login_with_2fa(


@router.post(
"/2fa/generate-qrcode/",
"/2fa/totp/generate-qrcode/",
response_model=bytes,
responses={
400: {"model": str},
Expand Down
27 changes: 27 additions & 0 deletions app/auth/types/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pydantic import BaseModel

from app.enums import (
LoginStatusEnum,
AccessErrorEnum,
)


class AuthenticationErrorModel(BaseModel):
message: str
type: LoginStatusEnum


class AccessErrorModel(BaseModel):
message: str
type: AccessErrorEnum


class LoginForm(BaseModel):
username: str
password: str


class LoginFormWith2FA(BaseModel):
username: str
password: str
code: str
10 changes: 10 additions & 0 deletions app/auth/types/email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pydantic import BaseModel


class Request2FACode(BaseModel):
user_id: str
email: str

class Validate2FACode(BaseModel):
user_id: str
code: str
12 changes: 12 additions & 0 deletions app/auth/types/totp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pydantic import BaseModel


class User2FA(BaseModel):
id: int
username: str
is_2fa_required: bool
is_2fa_activated: bool


class Enable2FA(BaseModel):
secret_key: str
Loading

0 comments on commit d9faa0d

Please sign in to comment.