Skip to content

Commit

Permalink
role restrictions
Browse files Browse the repository at this point in the history
  • Loading branch information
dinesh-aot committed Nov 8, 2024
1 parent 176b827 commit 85f9468
Show file tree
Hide file tree
Showing 20 changed files with 433 additions and 129 deletions.
3 changes: 2 additions & 1 deletion compliance-api/src/compliance_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def set_origin():
"COMPLIANCE" in group for group in token_info.get("groups", [])
)
if not is_compliance_in_groups:
raise PermissionDeniedError("Access Denied", HTTPStatus.UNAUTHORIZED)
raise PermissionDeniedError("Access Denied")
# Attempt to validate and decode the token here
g.access_token = auth_header.split(" ")[1]
g.token_info = token_info
else:
Expand Down
52 changes: 10 additions & 42 deletions compliance-api/src/compliance_api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
from flask_jwt_oidc import JwtManager

from compliance_api.exceptions import PermissionDeniedError
from compliance_api.services import CaseFileService, ComplaintService, InspectionService
from compliance_api.utils.constant import GROUP_MAP
from compliance_api.utils.enum import ContextEnum


jwt = (
Expand All @@ -46,45 +44,6 @@ def decorated(*args, **kwargs):

return decorated

@classmethod
def is_allowed(cls, context: ContextEnum, permissions):
"""Check to see if user is allowed to access the function."""

def decorated(f):
@Auth.require
@wraps(f)
def wrapper(*args, **kwargs):
auth_user_guid = g.token_info["preferred_username"]

# Create a context-to-service mapping
context_service_map = {
ContextEnum.INSPECTION: ("inspection_id", InspectionService),
ContextEnum.COMPLAINT: ("complaint_id", ComplaintService),
ContextEnum.CASE_FILE: ("case_file_id", CaseFileService),
}

# Retrieve the corresponding ID and service for the given context
id_field, service = context_service_map.get(context, (None, None))

if id_field and service:
is_allowed = service.is_assigned_user(
kwargs[id_field], auth_user_guid
)
# map the permission enum values to the user groups
mapped_groups = _map_permission_to_groups(permissions)
if not is_allowed and not jwt.contains_role(mapped_groups):
raise PermissionDeniedError(
"Access Denied", HTTPStatus.FORBIDDEN
)
else:
raise PermissionDeniedError("Invalid Context", HTTPStatus.FORBIDDEN)

return f(*args, **kwargs)

return wrapper

return decorated

@classmethod
def has_one_of_roles(cls, permissions):
"""Check that at least one of the realm groups are in the token.
Expand All @@ -101,7 +60,10 @@ def wrapper(*args, **kwargs):
if jwt.contains_role(mapped_groups):
return f(*args, **kwargs)

raise PermissionDeniedError("Access Denied", HTTPStatus.FORBIDDEN)
raise PermissionDeniedError(
"You don't have permission to perform this operation.",
HTTPStatus.FORBIDDEN,
)

return wrapper

Expand All @@ -112,6 +74,12 @@ def has_role(cls, role):
"""Validate the role."""
return jwt.validate_roles(role)

@classmethod
def has_permission(cls, permissions):
"""Check to see if the user has right permissions."""
mapped_groups = _map_permission_to_groups(permissions)
return jwt.contains_role(mapped_groups)


def _map_permission_to_groups(permissions):
"""Map the permissions to user groups in keycloak."""
Expand Down
6 changes: 6 additions & 0 deletions compliance-api/src/compliance_api/resources/agency.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from compliance_api.exceptions import ResourceNotFoundError
from compliance_api.schemas import AgencyCreateSchema, AgencySchema
from compliance_api.services import AgencyService
from compliance_api.utils.enum import PermissionEnum
from compliance_api.utils.util import cors_preflight

from .apihelper import Api as ApiHelper
Expand All @@ -45,6 +46,7 @@ class Agencies(Resource):
@API.response(code=200, description="Success", model=[agency_list_model])
@ApiHelper.swagger_decorators(API, endpoint_description="Fetch all agencies")
@auth.require
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def get():
"""Fetch all agencies."""
agencies = AgencyService.get_all()
Expand All @@ -57,6 +59,7 @@ def get():
@API.expect(agency_request_model)
@API.response(code=201, model=agency_list_model, description="AgencyCreated")
@API.response(400, "Bad Request")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def post():
"""Create a agency."""
agency_data = AgencyCreateSchema().load(API.payload)
Expand All @@ -75,6 +78,7 @@ class Agency(Resource):
@ApiHelper.swagger_decorators(API, endpoint_description="Fetch an agency by id")
@API.response(code=200, model=agency_list_model, description="Success")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def get(agency_id):
"""Fetch an agency by id."""
agency = AgencyService.get_by_id(agency_id)
Expand All @@ -89,6 +93,7 @@ def get(agency_id):
@API.response(code=200, model=agency_list_model, description="Success")
@API.response(400, "Bad Request")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def patch(agency_id):
"""Update an agency by id."""
agency_data = AgencyCreateSchema().load(API.payload)
Expand All @@ -102,6 +107,7 @@ def patch(agency_id):
@ApiHelper.swagger_decorators(API, endpoint_description="Delete an agency by id")
@API.response(code=200, model=agency_list_model, description="Deleted")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def delete(agency_id):
"""Delete an agency by id."""
deleted_agency = AgencyService.delete(agency_id)
Expand Down
2 changes: 2 additions & 0 deletions compliance-api/src/compliance_api/resources/case_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from compliance_api.schemas import (
CaseFileCreateSchema, CaseFileOfficerSchema, CaseFileSchema, CaseFileUpdateSchema, KeyValueSchema, StaffUserSchema)
from compliance_api.services import CaseFileService
from compliance_api.utils.enum import PermissionEnum
from compliance_api.utils.util import cors_preflight

from .apihelper import Api as ApiHelper
Expand Down Expand Up @@ -98,6 +99,7 @@ def get():

@staticmethod
@auth.require
@auth.has_one_of_roles([PermissionEnum.SUPERUSER])
@ApiHelper.swagger_decorators(API, endpoint_description="Create a case file")
@API.expect(case_file_create_model)
@API.response(code=201, model=case_file_list_model, description="CaseFileCreated")
Expand Down
2 changes: 1 addition & 1 deletion compliance-api/src/compliance_api/resources/inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ def get():
return inspection_list_schema.dump(inspections), HTTPStatus.OK

@staticmethod
@auth.require
@ApiHelper.swagger_decorators(API, endpoint_description="Create an inspection")
@API.expect(inspection_create_model)
@API.response(
code=201, model=inspection_list_model, description="InspectionCreated"
)
@API.response(400, "Bad Request")
@auth.require
def post():
"""Create an inspection."""
current_app.logger.info(f"Creating Inspection with payload: {API.payload}")
Expand Down
6 changes: 6 additions & 0 deletions compliance-api/src/compliance_api/resources/staff_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from compliance_api.exceptions import ResourceNotFoundError
from compliance_api.schemas import KeyValueSchema, StaffUserCreateSchema, StaffUserSchema, StaffUserUpdateSchema
from compliance_api.services import StaffUserService
from compliance_api.utils.enum import PermissionEnum
from compliance_api.utils.util import cors_preflight

from .apihelper import Api as ApiHelper
Expand Down Expand Up @@ -51,6 +52,7 @@ class StaffUsers(Resource):
@API.response(code=200, description="Success", model=[user_list_model])
@ApiHelper.swagger_decorators(API, endpoint_description="Fetch all users")
@auth.require
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def get():
"""Fetch all users."""
users = StaffUserService.get_all_staff_users()
Expand All @@ -63,6 +65,7 @@ def get():
@API.expect(user_request_model)
@API.response(code=201, model=user_list_model, description="UserCreated")
@API.response(400, "Bad Request")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def post():
"""Create a user."""
user_data = StaffUserCreateSchema().load(API.payload)
Expand All @@ -82,6 +85,7 @@ class StaffUser(Resource):
@ApiHelper.swagger_decorators(API, endpoint_description="Fetch a user by id")
@API.response(code=200, model=user_list_model, description="Success")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def get(user_id):
"""Fetch a user by id."""
user = StaffUserService.get_user_by_id(user_id)
Expand All @@ -96,6 +100,7 @@ def get(user_id):
@API.response(code=200, model=user_list_model, description="Success")
@API.response(400, "Bad Request")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def patch(user_id):
"""Update a user by id."""
user_data = StaffUserUpdateSchema().load(API.payload)
Expand All @@ -109,6 +114,7 @@ def patch(user_id):
@ApiHelper.swagger_decorators(API, endpoint_description="Delete a user by id")
@API.response(code=200, model=user_list_model, description="Deleted")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def delete(user_id):
"""Delete a user by id."""
deleted_user = StaffUserService.delete_user(user_id)
Expand Down
6 changes: 6 additions & 0 deletions compliance-api/src/compliance_api/resources/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from compliance_api.exceptions import ResourceNotFoundError
from compliance_api.schemas import TopicCreateSchema, TopicSchema
from compliance_api.services import TopicService
from compliance_api.utils.enum import PermissionEnum
from compliance_api.utils.util import cors_preflight

from .apihelper import Api as ApiHelper
Expand All @@ -45,6 +46,7 @@ class Topics(Resource):
@API.response(code=200, description="Success", model=[topic_list_model])
@ApiHelper.swagger_decorators(API, endpoint_description="Fetch all topics")
@auth.require
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def get():
"""Fetch all topics."""
topics = TopicService.get_all()
Expand All @@ -57,6 +59,7 @@ def get():
@API.expect(topic_request_model)
@API.response(code=201, model=topic_list_model, description="topicCreated")
@API.response(400, "Bad Request")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def post():
"""Create a topic."""
topic_data = TopicCreateSchema().load(API.payload)
Expand All @@ -75,6 +78,7 @@ class Topic(Resource):
@ApiHelper.swagger_decorators(API, endpoint_description="Fetch an topic by id")
@API.response(code=200, model=topic_list_model, description="Success")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def get(topic_id):
"""Fetch an topic by id."""
topic = TopicService.get_by_id(topic_id)
Expand All @@ -89,6 +93,7 @@ def get(topic_id):
@API.response(code=200, model=topic_list_model, description="Success")
@API.response(400, "Bad Request")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def patch(topic_id):
"""Update an topic by id."""
topic_data = TopicCreateSchema().load(API.payload)
Expand All @@ -102,6 +107,7 @@ def patch(topic_id):
@ApiHelper.swagger_decorators(API, endpoint_description="Delete an topic by id")
@API.response(code=200, model=topic_list_model, description="Deleted")
@API.response(404, "Not Found")
@auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN])
def delete(topic_id):
"""Delete an topic by id."""
deleted_topic = TopicService.delete(topic_id)
Expand Down
32 changes: 24 additions & 8 deletions compliance-api/src/compliance_api/services/case_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

from datetime import datetime

from compliance_api.exceptions import ResourceExistsError
from flask import g

from compliance_api.auth import auth
from compliance_api.exceptions import PermissionDeniedError, ResourceExistsError
from compliance_api.models import CaseFile as CaseFileModel
from compliance_api.models import CaseFileInitiationOption as CaseFileInitiationOptionModel
from compliance_api.models import CaseFileOfficer as CaseFileOfficerModel
from compliance_api.models import CaseFileStatusEnum
from compliance_api.models.db import session_scope
from compliance_api.utils.enum import PermissionEnum


class CaseFileService:
Expand Down Expand Up @@ -49,6 +53,7 @@ def create(cls, case_file_data: dict):
@classmethod
def update(cls, case_file_id: int, case_file_data: dict):
"""Update case file."""
_access_check_for_update(case_file_id)
case_file_obj = {
"primary_officer_id": case_file_data.get("primary_officer_id", None)
}
Expand Down Expand Up @@ -104,20 +109,31 @@ def get_by_project(cls, project_id: int):
]

@classmethod
def is_assigned_user(cls, case_file_id, auth_user_guid):
"""Check if the given user is an assigned user of the given case file."""
def is_logged_user_primary_or_officer(cls, case_file_id):
"""Check to see if the given user is primary or other officer in the case file."""
auth_user_guid = g.token_info["preferred_username"]
case_file = CaseFileModel.find_by_id(case_file_id)

if not case_file:
return False

# Check if the user is the primary officer or part of other officers
# The logged in user should be primary or officer in the associated
# case file
return case_file.primary_officer.auth_user_guid == auth_user_guid or any(
officer.officer.auth_user_guid == auth_user_guid
for officer in case_file.case_file_officers
)


def _access_check_for_update(case_file_id):
"""Access check for update."""
auth_user_guid = g.token_info["preferred_username"]
case_file = CaseFileModel.find_by_id(case_file_id)
if (
not auth.has_permission([PermissionEnum.SUPERUSER])
and not case_file.primary_officer.auth_user_guid == auth_user_guid
):
raise PermissionDeniedError(
"You don't have the correct permission to perform this operation."
)


def _create_case_file_object(case_file_data: dict):
"""Create a case file object."""
case_file_obj = {
Expand Down
Loading

0 comments on commit 85f9468

Please sign in to comment.