-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
- Loading branch information
1 parent
f0f573d
commit 5b69940
Showing
19 changed files
with
808 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from enum import Enum | ||
|
||
|
||
class UserType(str, Enum): | ||
IDIR = "I" | ||
BCEID = "B" | ||
|
||
|
||
COGNITO_USERNAME_KEY = "username" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
78 changes: 78 additions & 0 deletions
78
server/admin_management/api/app/repositories/application_admin_repository.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import logging | ||
from sqlalchemy.orm import Session | ||
from typing import List | ||
|
||
from api.app.models import model as models | ||
|
||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class ApplicationAdminRepository: | ||
def __init__(self, db: Session): | ||
self.db = db | ||
|
||
def get_application_admin_by_app_and_user_id( | ||
self, application_id: int, user_id: int | ||
) -> models.FamApplicationAdmin: | ||
return ( | ||
self.db.query(models.FamApplicationAdmin) | ||
.filter( | ||
models.FamApplicationAdmin.application_id == application_id, | ||
models.FamApplicationAdmin.user_id == user_id, | ||
) | ||
.one_or_none() | ||
) | ||
|
||
def get_application_admin_by_id( | ||
self, application_admin_id: int | ||
) -> models.FamApplicationAdmin: | ||
return ( | ||
self.db.query(models.FamApplicationAdmin) | ||
.filter( | ||
models.FamApplicationAdmin.application_admin_id == application_admin_id | ||
) | ||
.one_or_none() | ||
) | ||
|
||
def get_application_admin_by_application_id( | ||
self, application_id: int | ||
) -> List[models.FamApplicationAdmin]: | ||
return ( | ||
self.db.query(models.FamApplicationAdmin) | ||
.filter( | ||
models.FamApplicationAdmin.application_id == application_id | ||
) | ||
.all() | ||
) | ||
|
||
def create_application_admin( | ||
self, application_id: int, user_id: int, requester: str | ||
) -> models.FamApplicationAdmin: | ||
new_fam_application_admin: models.FamApplicationAdmin = ( | ||
models.FamApplicationAdmin( | ||
**{ | ||
"user_id": user_id, | ||
"application_id": application_id, | ||
"create_user": requester, | ||
} | ||
) | ||
) | ||
self.db.add(new_fam_application_admin) | ||
self.db.flush() | ||
self.db.refresh(new_fam_application_admin) | ||
LOGGER.debug( | ||
f"New FamApplicationAdmin added for {new_fam_application_admin.__dict__}" | ||
) | ||
return new_fam_application_admin | ||
|
||
def delete_application_admin(self, application_admin_id: int): | ||
record = ( | ||
self.db.query(models.FamApplicationAdmin) | ||
.filter( | ||
models.FamApplicationAdmin.application_admin_id == application_admin_id | ||
) | ||
.one() | ||
) | ||
self.db.delete(record) | ||
self.db.flush() |
19 changes: 19 additions & 0 deletions
19
server/admin_management/api/app/repositories/application_repository.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import logging | ||
from sqlalchemy.orm import Session | ||
|
||
from api.app.models import model as models | ||
|
||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class ApplicationRepository: | ||
def __init__(self, db: Session): | ||
self.db = db | ||
|
||
def get_application(self, application_id: int) -> models.FamApplication: | ||
return ( | ||
self.db.query(models.FamApplication) | ||
.filter(models.FamApplication.application_id == application_id) | ||
.one_or_none() | ||
) |
45 changes: 45 additions & 0 deletions
45
server/admin_management/api/app/repositories/user_repository.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import logging | ||
from sqlalchemy.orm import Session | ||
|
||
from api.app.models import model as models | ||
from api.app import schemas | ||
|
||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class UserRepository: | ||
def __init__(self, db: Session): | ||
self.db = db | ||
|
||
def get_user_by_domain_and_name( | ||
self, user_type_code: str, user_name: str | ||
) -> models.FamUser: | ||
fam_user: models.FamUser = ( | ||
self.db.query(models.FamUser) | ||
.filter( | ||
models.FamUser.user_type_code == user_type_code, | ||
models.FamUser.user_name.ilike(user_name), | ||
) | ||
.one_or_none() | ||
) | ||
LOGGER.debug( | ||
f"fam_user {str(fam_user.user_id) + ' found' if fam_user else 'not found'}." | ||
) | ||
return fam_user | ||
|
||
def get_user_by_cognito_user_id(self, cognito_user_id: str) -> models.FamUser: | ||
return ( | ||
self.db.query(models.FamUser) | ||
.filter(models.FamUser.cognito_user_id == cognito_user_id) | ||
.one_or_none() | ||
) | ||
|
||
def create_user(self, fam_user: schemas.FamUser) -> models.FamUser: | ||
LOGGER.debug(f"Creating fam user: {fam_user}") | ||
|
||
fam_user_dict = fam_user.model_dump() | ||
db_item = models.FamUser(**fam_user_dict) | ||
self.db.add(db_item) | ||
self.db.flush() | ||
return db_item |
164 changes: 164 additions & 0 deletions
164
server/admin_management/api/app/routers/router_application_admin.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import logging | ||
from fastapi import APIRouter, Depends, Request, Response, HTTPException | ||
from sqlalchemy.orm import Session | ||
from typing import List | ||
|
||
|
||
from api.app.models import model as models | ||
from api.app.routers.router_guards import ( | ||
get_current_requester, | ||
authorize_by_fam_admin, | ||
enforce_self_grant_guard, | ||
validate_param_application_admin_id, | ||
validate_param_application_id, | ||
) | ||
from api.app import database, jwt_validation, schemas | ||
from api.app.schemas import Requester | ||
from api.app.services.application_admin_service import ApplicationAdminService | ||
from api.app.services.user_service import UserService | ||
from api.app.services.application_service import ApplicationService | ||
from api.app.utils.audit_util import AuditEventLog, AuditEventOutcome, AuditEventType | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
router = APIRouter() | ||
|
||
|
||
@router.post( | ||
"", | ||
response_model=schemas.FamAppAdminGet, | ||
dependencies=[ | ||
Depends(authorize_by_fam_admin), | ||
Depends(enforce_self_grant_guard), | ||
Depends(validate_param_application_id), | ||
], | ||
) | ||
def create_application_admin( | ||
application_admin_request: schemas.FamAppAdminCreate, | ||
request: Request, | ||
db: Session = Depends(database.get_db), | ||
token_claims: dict = Depends(jwt_validation.authorize), | ||
requester: Requester = Depends(get_current_requester), | ||
): | ||
|
||
LOGGER.debug( | ||
f"Executing 'create_application_admin' " | ||
f"with request: {application_admin_request}, requestor: {token_claims}" | ||
) | ||
|
||
audit_event_log = AuditEventLog( | ||
request=request, | ||
event_type=AuditEventType.CREATE_APPLICATION_ADMIN_ACCESS, | ||
event_outcome=AuditEventOutcome.SUCCESS, | ||
) | ||
|
||
try: | ||
application_admin_service = ApplicationAdminService(db) | ||
application_service = ApplicationService(db) | ||
user_service = UserService(db) | ||
|
||
audit_event_log.requesting_user = user_service.get_user_by_cognito_user_id( | ||
requester.cognito_user_id | ||
) | ||
audit_event_log.application = application_service.get_application( | ||
application_admin_request.application_id | ||
) | ||
audit_event_log.target_user = user_service.get_user_by_domain_and_name( | ||
application_admin_request.user_type_code, | ||
application_admin_request.user_name, | ||
) | ||
|
||
return application_admin_service.create_application_admin( | ||
application_admin_request, requester.cognito_user_id | ||
) | ||
|
||
except Exception as e: | ||
audit_event_log.event_outcome = AuditEventOutcome.FAIL | ||
audit_event_log.exception = e | ||
raise e | ||
|
||
finally: | ||
if audit_event_log.target_user is None: | ||
audit_event_log.target_user = models.FamUser( | ||
user_type_code=application_admin_request.user_type_code, | ||
user_name=application_admin_request.user_name, | ||
user_guid="unknown", | ||
cognito_user_id="unknown", | ||
) | ||
|
||
audit_event_log.log_event() | ||
|
||
|
||
@router.delete( | ||
"/{application_admin_id}", | ||
response_class=Response, | ||
dependencies=[ | ||
Depends(authorize_by_fam_admin), | ||
Depends(enforce_self_grant_guard), | ||
Depends(validate_param_application_admin_id), | ||
], | ||
) | ||
def delete_application_admin( | ||
application_admin_id: int, | ||
request: Request, | ||
db: Session = Depends(database.get_db), | ||
requester: Requester = Depends(get_current_requester), | ||
): | ||
LOGGER.debug( | ||
f"Executing 'delete_application_admin' with request: {application_admin_id}" | ||
) | ||
|
||
audit_event_log = AuditEventLog( | ||
request=request, | ||
event_type=AuditEventType.REMOVE_APPLICATION_ADMIN_ACCESS, | ||
event_outcome=AuditEventOutcome.SUCCESS, | ||
) | ||
|
||
try: | ||
application_admin_service = ApplicationAdminService(db) | ||
user_service = UserService(db) | ||
|
||
application_admin = application_admin_service.get_application_admin_by_id( | ||
application_admin_id | ||
) | ||
audit_event_log.requesting_user = user_service.get_user_by_cognito_user_id( | ||
requester.cognito_user_id | ||
) | ||
audit_event_log.application = application_admin.application | ||
audit_event_log.target_user = application_admin.user | ||
|
||
return application_admin_service.delete_application_admin(application_admin_id) | ||
|
||
except Exception as e: | ||
audit_event_log.event_outcome = AuditEventOutcome.FAIL | ||
audit_event_log.exception = e | ||
raise e | ||
|
||
finally: | ||
audit_event_log.log_event() | ||
|
||
|
||
@router.get( | ||
"/{application_id}/admins", | ||
response_model=List[schemas.FamAppAdminGet], | ||
status_code=200, | ||
dependencies=[Depends(authorize_by_fam_admin)], | ||
) | ||
def get_application_admin_by_applicationid( | ||
application_id: int, | ||
db: Session = Depends(database.get_db), | ||
): | ||
LOGGER.debug( | ||
f"Loading application admin access for application_id: {application_id}" | ||
) | ||
application_admin_service = ApplicationAdminService(db) | ||
application_admin_access = ( | ||
application_admin_service.get_application_admin_by_application_id( | ||
application_id | ||
) | ||
) | ||
LOGGER.debug( | ||
f"Finished loading application admin access for application - # of results = {len(application_admin_access)}" | ||
) | ||
|
||
return application_admin_access |
Oops, something went wrong.