diff --git a/compliance-api/src/compliance_api/__init__.py b/compliance-api/src/compliance_api/__init__.py index b716916f..1c25d49a 100644 --- a/compliance-api/src/compliance_api/__init__.py +++ b/compliance-api/src/compliance_api/__init__.py @@ -77,7 +77,8 @@ def set_origin(): "COMPLIANCE" in group for group in token_info.get("groups", []) ) if not is_compliance_in_groups: - raise PermissionDeniedError("Access Denied", HTTPStatus.UNAUTHORIZED) + raise PermissionDeniedError("Access Denied") + # Attempt to validate and decode the token here g.access_token = auth_header.split(" ")[1] g.token_info = token_info else: diff --git a/compliance-api/src/compliance_api/auth.py b/compliance-api/src/compliance_api/auth.py index efb85a46..fa38eca5 100644 --- a/compliance-api/src/compliance_api/auth.py +++ b/compliance-api/src/compliance_api/auth.py @@ -19,9 +19,7 @@ from flask_jwt_oidc import JwtManager from compliance_api.exceptions import PermissionDeniedError -from compliance_api.services import CaseFileService, ComplaintService, InspectionService from compliance_api.utils.constant import GROUP_MAP -from compliance_api.utils.enum import ContextEnum jwt = ( @@ -46,45 +44,6 @@ def decorated(*args, **kwargs): return decorated - @classmethod - def is_allowed(cls, context: ContextEnum, permissions): - """Check to see if user is allowed to access the function.""" - - def decorated(f): - @Auth.require - @wraps(f) - def wrapper(*args, **kwargs): - auth_user_guid = g.token_info["preferred_username"] - - # Create a context-to-service mapping - context_service_map = { - ContextEnum.INSPECTION: ("inspection_id", InspectionService), - ContextEnum.COMPLAINT: ("complaint_id", ComplaintService), - ContextEnum.CASE_FILE: ("case_file_id", CaseFileService), - } - - # Retrieve the corresponding ID and service for the given context - id_field, service = context_service_map.get(context, (None, None)) - - if id_field and service: - is_allowed = service.is_assigned_user( - kwargs[id_field], auth_user_guid - ) - # map the permission enum values to the user groups - mapped_groups = _map_permission_to_groups(permissions) - if not is_allowed and not jwt.contains_role(mapped_groups): - raise PermissionDeniedError( - "Access Denied", HTTPStatus.FORBIDDEN - ) - else: - raise PermissionDeniedError("Invalid Context", HTTPStatus.FORBIDDEN) - - return f(*args, **kwargs) - - return wrapper - - return decorated - @classmethod def has_one_of_roles(cls, permissions): """Check that at least one of the realm groups are in the token. @@ -101,7 +60,10 @@ def wrapper(*args, **kwargs): if jwt.contains_role(mapped_groups): return f(*args, **kwargs) - raise PermissionDeniedError("Access Denied", HTTPStatus.FORBIDDEN) + raise PermissionDeniedError( + "You don't have permission to perform this operation.", + HTTPStatus.FORBIDDEN, + ) return wrapper @@ -112,6 +74,12 @@ def has_role(cls, role): """Validate the role.""" return jwt.validate_roles(role) + @classmethod + def has_permission(cls, permissions): + """Check to see if the user has right permissions.""" + mapped_groups = _map_permission_to_groups(permissions) + return jwt.contains_role(mapped_groups) + def _map_permission_to_groups(permissions): """Map the permissions to user groups in keycloak.""" diff --git a/compliance-api/src/compliance_api/resources/agency.py b/compliance-api/src/compliance_api/resources/agency.py index 4d9d96d3..62a9cc32 100644 --- a/compliance-api/src/compliance_api/resources/agency.py +++ b/compliance-api/src/compliance_api/resources/agency.py @@ -21,6 +21,7 @@ from compliance_api.exceptions import ResourceNotFoundError from compliance_api.schemas import AgencyCreateSchema, AgencySchema from compliance_api.services import AgencyService +from compliance_api.utils.enum import PermissionEnum from compliance_api.utils.util import cors_preflight from .apihelper import Api as ApiHelper @@ -45,6 +46,7 @@ class Agencies(Resource): @API.response(code=200, description="Success", model=[agency_list_model]) @ApiHelper.swagger_decorators(API, endpoint_description="Fetch all agencies") @auth.require + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def get(): """Fetch all agencies.""" agencies = AgencyService.get_all() @@ -57,6 +59,7 @@ def get(): @API.expect(agency_request_model) @API.response(code=201, model=agency_list_model, description="AgencyCreated") @API.response(400, "Bad Request") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def post(): """Create a agency.""" agency_data = AgencyCreateSchema().load(API.payload) @@ -75,6 +78,7 @@ class Agency(Resource): @ApiHelper.swagger_decorators(API, endpoint_description="Fetch an agency by id") @API.response(code=200, model=agency_list_model, description="Success") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def get(agency_id): """Fetch an agency by id.""" agency = AgencyService.get_by_id(agency_id) @@ -89,6 +93,7 @@ def get(agency_id): @API.response(code=200, model=agency_list_model, description="Success") @API.response(400, "Bad Request") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def patch(agency_id): """Update an agency by id.""" agency_data = AgencyCreateSchema().load(API.payload) @@ -102,6 +107,7 @@ def patch(agency_id): @ApiHelper.swagger_decorators(API, endpoint_description="Delete an agency by id") @API.response(code=200, model=agency_list_model, description="Deleted") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def delete(agency_id): """Delete an agency by id.""" deleted_agency = AgencyService.delete(agency_id) diff --git a/compliance-api/src/compliance_api/resources/case_file.py b/compliance-api/src/compliance_api/resources/case_file.py index 17f79f05..9a9b4cc4 100644 --- a/compliance-api/src/compliance_api/resources/case_file.py +++ b/compliance-api/src/compliance_api/resources/case_file.py @@ -23,6 +23,7 @@ from compliance_api.schemas import ( CaseFileCreateSchema, CaseFileOfficerSchema, CaseFileSchema, CaseFileUpdateSchema, KeyValueSchema, StaffUserSchema) from compliance_api.services import CaseFileService +from compliance_api.utils.enum import PermissionEnum from compliance_api.utils.util import cors_preflight from .apihelper import Api as ApiHelper @@ -98,6 +99,7 @@ def get(): @staticmethod @auth.require + @auth.has_one_of_roles([PermissionEnum.SUPERUSER]) @ApiHelper.swagger_decorators(API, endpoint_description="Create a case file") @API.expect(case_file_create_model) @API.response(code=201, model=case_file_list_model, description="CaseFileCreated") diff --git a/compliance-api/src/compliance_api/resources/inspection.py b/compliance-api/src/compliance_api/resources/inspection.py index 1af59401..5af08d50 100644 --- a/compliance-api/src/compliance_api/resources/inspection.py +++ b/compliance-api/src/compliance_api/resources/inspection.py @@ -152,13 +152,13 @@ def get(): return inspection_list_schema.dump(inspections), HTTPStatus.OK @staticmethod - @auth.require @ApiHelper.swagger_decorators(API, endpoint_description="Create an inspection") @API.expect(inspection_create_model) @API.response( code=201, model=inspection_list_model, description="InspectionCreated" ) @API.response(400, "Bad Request") + @auth.require def post(): """Create an inspection.""" current_app.logger.info(f"Creating Inspection with payload: {API.payload}") diff --git a/compliance-api/src/compliance_api/resources/staff_user.py b/compliance-api/src/compliance_api/resources/staff_user.py index 0a4ebb8c..149aacff 100644 --- a/compliance-api/src/compliance_api/resources/staff_user.py +++ b/compliance-api/src/compliance_api/resources/staff_user.py @@ -21,6 +21,7 @@ from compliance_api.exceptions import ResourceNotFoundError from compliance_api.schemas import KeyValueSchema, StaffUserCreateSchema, StaffUserSchema, StaffUserUpdateSchema from compliance_api.services import StaffUserService +from compliance_api.utils.enum import PermissionEnum from compliance_api.utils.util import cors_preflight from .apihelper import Api as ApiHelper @@ -51,6 +52,7 @@ class StaffUsers(Resource): @API.response(code=200, description="Success", model=[user_list_model]) @ApiHelper.swagger_decorators(API, endpoint_description="Fetch all users") @auth.require + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def get(): """Fetch all users.""" users = StaffUserService.get_all_staff_users() @@ -63,6 +65,7 @@ def get(): @API.expect(user_request_model) @API.response(code=201, model=user_list_model, description="UserCreated") @API.response(400, "Bad Request") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def post(): """Create a user.""" user_data = StaffUserCreateSchema().load(API.payload) @@ -82,6 +85,7 @@ class StaffUser(Resource): @ApiHelper.swagger_decorators(API, endpoint_description="Fetch a user by id") @API.response(code=200, model=user_list_model, description="Success") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def get(user_id): """Fetch a user by id.""" user = StaffUserService.get_user_by_id(user_id) @@ -96,6 +100,7 @@ def get(user_id): @API.response(code=200, model=user_list_model, description="Success") @API.response(400, "Bad Request") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def patch(user_id): """Update a user by id.""" user_data = StaffUserUpdateSchema().load(API.payload) @@ -109,6 +114,7 @@ def patch(user_id): @ApiHelper.swagger_decorators(API, endpoint_description="Delete a user by id") @API.response(code=200, model=user_list_model, description="Deleted") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def delete(user_id): """Delete a user by id.""" deleted_user = StaffUserService.delete_user(user_id) diff --git a/compliance-api/src/compliance_api/resources/topic.py b/compliance-api/src/compliance_api/resources/topic.py index 7dad5c02..68966461 100644 --- a/compliance-api/src/compliance_api/resources/topic.py +++ b/compliance-api/src/compliance_api/resources/topic.py @@ -21,6 +21,7 @@ from compliance_api.exceptions import ResourceNotFoundError from compliance_api.schemas import TopicCreateSchema, TopicSchema from compliance_api.services import TopicService +from compliance_api.utils.enum import PermissionEnum from compliance_api.utils.util import cors_preflight from .apihelper import Api as ApiHelper @@ -45,6 +46,7 @@ class Topics(Resource): @API.response(code=200, description="Success", model=[topic_list_model]) @ApiHelper.swagger_decorators(API, endpoint_description="Fetch all topics") @auth.require + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def get(): """Fetch all topics.""" topics = TopicService.get_all() @@ -57,6 +59,7 @@ def get(): @API.expect(topic_request_model) @API.response(code=201, model=topic_list_model, description="topicCreated") @API.response(400, "Bad Request") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def post(): """Create a topic.""" topic_data = TopicCreateSchema().load(API.payload) @@ -75,6 +78,7 @@ class Topic(Resource): @ApiHelper.swagger_decorators(API, endpoint_description="Fetch an topic by id") @API.response(code=200, model=topic_list_model, description="Success") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def get(topic_id): """Fetch an topic by id.""" topic = TopicService.get_by_id(topic_id) @@ -89,6 +93,7 @@ def get(topic_id): @API.response(code=200, model=topic_list_model, description="Success") @API.response(400, "Bad Request") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def patch(topic_id): """Update an topic by id.""" topic_data = TopicCreateSchema().load(API.payload) @@ -102,6 +107,7 @@ def patch(topic_id): @ApiHelper.swagger_decorators(API, endpoint_description="Delete an topic by id") @API.response(code=200, model=topic_list_model, description="Deleted") @API.response(404, "Not Found") + @auth.has_one_of_roles([PermissionEnum.SUPERUSER, PermissionEnum.ADMIN]) def delete(topic_id): """Delete an topic by id.""" deleted_topic = TopicService.delete(topic_id) diff --git a/compliance-api/src/compliance_api/services/case_file.py b/compliance-api/src/compliance_api/services/case_file.py index d4186eb4..27382cb0 100644 --- a/compliance-api/src/compliance_api/services/case_file.py +++ b/compliance-api/src/compliance_api/services/case_file.py @@ -2,12 +2,16 @@ from datetime import datetime -from compliance_api.exceptions import ResourceExistsError +from flask import g + +from compliance_api.auth import auth +from compliance_api.exceptions import PermissionDeniedError, ResourceExistsError from compliance_api.models import CaseFile as CaseFileModel from compliance_api.models import CaseFileInitiationOption as CaseFileInitiationOptionModel from compliance_api.models import CaseFileOfficer as CaseFileOfficerModel from compliance_api.models import CaseFileStatusEnum from compliance_api.models.db import session_scope +from compliance_api.utils.enum import PermissionEnum class CaseFileService: @@ -49,6 +53,7 @@ def create(cls, case_file_data: dict): @classmethod def update(cls, case_file_id: int, case_file_data: dict): """Update case file.""" + _access_check_for_update(case_file_id) case_file_obj = { "primary_officer_id": case_file_data.get("primary_officer_id", None) } @@ -104,20 +109,31 @@ def get_by_project(cls, project_id: int): ] @classmethod - def is_assigned_user(cls, case_file_id, auth_user_guid): - """Check if the given user is an assigned user of the given case file.""" + def is_logged_user_primary_or_officer(cls, case_file_id): + """Check to see if the given user is primary or other officer in the case file.""" + auth_user_guid = g.token_info["preferred_username"] case_file = CaseFileModel.find_by_id(case_file_id) - - if not case_file: - return False - - # Check if the user is the primary officer or part of other officers + # The logged in user should be primary or officer in the associated + # case file return case_file.primary_officer.auth_user_guid == auth_user_guid or any( officer.officer.auth_user_guid == auth_user_guid for officer in case_file.case_file_officers ) +def _access_check_for_update(case_file_id): + """Access check for update.""" + auth_user_guid = g.token_info["preferred_username"] + case_file = CaseFileModel.find_by_id(case_file_id) + if ( + not auth.has_permission([PermissionEnum.SUPERUSER]) + and not case_file.primary_officer.auth_user_guid == auth_user_guid + ): + raise PermissionDeniedError( + "You don't have the correct permission to perform this operation." + ) + + def _create_case_file_object(case_file_data: dict): """Create a case file object.""" case_file_obj = { diff --git a/compliance-api/src/compliance_api/services/complaint.py b/compliance-api/src/compliance_api/services/complaint.py index aa7e38e2..69328a97 100644 --- a/compliance-api/src/compliance_api/services/complaint.py +++ b/compliance-api/src/compliance_api/services/complaint.py @@ -1,6 +1,9 @@ """Service for managing complaint.""" -from compliance_api.exceptions import ResourceNotFoundError, UnprocessableEntityError +from flask import g + +from compliance_api.auth import auth +from compliance_api.exceptions import PermissionDeniedError, ResourceNotFoundError, UnprocessableEntityError from compliance_api.models.complaint import Complaint as ComplaintModel from compliance_api.models.complaint import ComplaintReqEACDetail as ComplaintReqEACDetailModel from compliance_api.models.complaint import ComplaintReqOrderDetail as ComplaintReqOrderDetailModel @@ -15,6 +18,7 @@ from compliance_api.services.case_file import CaseFileService from compliance_api.services.epic_track_service.track_service import TrackService from compliance_api.utils.constant import UNAPPROVED_PROJECT_CODE, UNAPPROVED_PROJECT_NAME +from compliance_api.utils.enum import PermissionEnum class ComplaintService: @@ -86,6 +90,7 @@ def get_by_case_file_id(cls, case_file_id): @classmethod def update(cls, complaint_id: int, complaint_data: dict): """Update complaint.""" + _acces_check_update(complaint_id) complaint_obj = _create_complaint_update_object(complaint_data) with session_scope() as session: complaint = ComplaintModel.find_by_id(complaint_id) @@ -127,6 +132,7 @@ def update(cls, complaint_id: int, complaint_data: dict): @classmethod def create(cls, complaint_data: dict): """Create complaint.""" + _access_check_create(complaint_data) complaint_obj = _create_complaint_object(complaint_data) with session_scope() as session: created_complaint = ComplaintModel.create_complaint(complaint_obj, session) @@ -155,14 +161,30 @@ def create(cls, complaint_data: dict): ) return created_complaint - @classmethod - def is_assigned_user(cls, complaint_id, auth_user_guid): - """Check if the given user is an assigned user of the given complaint.""" - complaint = ComplaintModel.find_by_id(complaint_id) - if not complaint: - return False - return complaint.primary_officer.auth_user_guid == auth_user_guid +def _access_check_create(complaint_data: dict): + """Access check.""" + if not auth.has_permission( + [PermissionEnum.SUPERUSER] + ) and not CaseFileService.is_logged_user_primary_or_officer( + complaint_data.get("case_file_id") + ): + raise PermissionDeniedError( + "You don't have the correct permission to perform this operation." + ) + + +def _acces_check_update(complaint_id): + """Acces check create.""" + auth_user_guid = g.token_info["preferred_username"] + complaint = ComplaintModel.find_by_id(complaint_id) + if ( + not auth.has_permission([PermissionEnum.SUPERUSER]) + and not complaint.primary_officer.auth_user_guid == auth_user_guid + ): + raise PermissionDeniedError( + "You don't have the correct permission to perform this operation." + ) def _remove_existing_requirement_details( diff --git a/compliance-api/src/compliance_api/services/continuation_report.py b/compliance-api/src/compliance_api/services/continuation_report.py index 4400ec41..0ff6bb59 100644 --- a/compliance-api/src/compliance_api/services/continuation_report.py +++ b/compliance-api/src/compliance_api/services/continuation_report.py @@ -1,8 +1,12 @@ """ContinuationReport Service.""" +from compliance_api.auth import auth +from compliance_api.exceptions import PermissionDeniedError from compliance_api.models.continuation_report import ContinuationReport as ContinuationReportModel from compliance_api.models.continuation_report import ContinuationReportKey as ContinuationReportKeyModel from compliance_api.models.db import session_scope +from compliance_api.services.case_file import CaseFileService +from compliance_api.utils.enum import PermissionEnum class ContinuationReportService: @@ -11,6 +15,7 @@ class ContinuationReportService: @classmethod def create(cls, report_entry: dict, system_generated=False): """Create continuation report entry.""" + _access_check(report_entry) report_entry_obj = _create_report_entry(report_entry, system_generated) with session_scope() as session: created_entry = ContinuationReportModel.create_entry( @@ -26,6 +31,18 @@ def get_by_case_file_id(cls, case_file_id): return ContinuationReportModel.get_by_case_file(case_file_id) +def _access_check(report_entry: dict): + """Access check.""" + if not auth.has_permission( + [PermissionEnum.SUPERUSER] + ) and not CaseFileService.is_logged_user_primary_or_officer( + report_entry.get("case_file_id") + ): + raise PermissionDeniedError( + "You don't have the correct permission to perform this operation." + ) + + def _insert_or_update_keys(report_id, keys, session=None): """Insert or update keys for continuatino report.""" if keys: diff --git a/compliance-api/src/compliance_api/services/inspection.py b/compliance-api/src/compliance_api/services/inspection.py index 49938e8f..8638b931 100644 --- a/compliance-api/src/compliance_api/services/inspection.py +++ b/compliance-api/src/compliance_api/services/inspection.py @@ -1,6 +1,9 @@ """Service for managing Inspection.""" -from compliance_api.exceptions import ResourceNotFoundError, UnprocessableEntityError +from flask import g + +from compliance_api.auth import auth +from compliance_api.exceptions import PermissionDeniedError, ResourceNotFoundError, UnprocessableEntityError from compliance_api.models import Inspection as InspectionModel from compliance_api.models import InspectionAgency as InspectionAgencyModel from compliance_api.models import InspectionAttendance as InspectionAttendanceModel @@ -16,6 +19,7 @@ from compliance_api.models.db import session_scope from compliance_api.models.inspection.inspection_enum import InspectionAttendanceOptionEnum, InspectionStatusEnum from compliance_api.utils.constant import UNAPPROVED_PROJECT_CODE, UNAPPROVED_PROJECT_NAME +from compliance_api.utils.enum import PermissionEnum from .case_file import CaseFileService from .epic_track_service.track_service import TrackService @@ -140,6 +144,7 @@ def get_attendance_options(cls, inspection_id): @classmethod def create(cls, inspection_data: dict): """Create inspection.""" + _access_check_create(inspection_data) inspection_obj = _create_inspection_object(inspection_data) with session_scope() as session: created_inspection = InspectionModel.create_inspection( @@ -204,6 +209,7 @@ def create(cls, inspection_data: dict): @classmethod def update(cls, inspection_id: int, inspection_data: dict): """Update inspection.""" + _access_check_update(inspection_id) inspection_obj = _create_inspection_update_obj(inspection_data) with session_scope() as session: updated_case_file = InspectionModel.update_inspection( @@ -257,18 +263,47 @@ def update(cls, inspection_id: int, inspection_data: dict): ) return updated_case_file - @classmethod - def is_assigned_user(cls, inspection_id, auth_user_guid): - """Check if the given user is an assigned user of the given inspection.""" - inspection = InspectionModel.find_by_id(inspection_id) +# def _is_access_allowed(case_file_id, inspection_id=None): +# """Check if the given user is an assigned user of the given inspection.""" +# auth_user_guid = g.token_info["preferred_username"] +# if _is_primary_in_case_file(case_file_id): +# return True +# # or should be primary or other officer of the inspection +# if inspection_id: +# inspection = InspectionModel.find_by_id(inspection_id) +# # Check if the user is the primary officer or part of other officers +# is_primary_or_assigned_on_inspection = ( +# inspection.primary_officer.auth_user_guid == auth_user_guid +# or any( +# officer.officer.auth_user_guid == auth_user_guid +# for officer in inspection.other_officers +# ) +# ) +# if is_primary_or_assigned_on_inspection: +# return True +# return False + + +def _access_check_create(inspection_data: dict): + """Access check.""" + if not auth.has_permission( + [PermissionEnum.SUPERUSER] + ) and not CaseFileService.is_logged_user_primary_or_officer(inspection_data.get("case_file_id")): + raise PermissionDeniedError( + "You don't have the correct permission to perform this operation." + ) - if not inspection: - return False - # Check if the user is the primary officer or part of other officers - return inspection.primary_officer.auth_user_guid == auth_user_guid or any( - officer.officer.auth_user_guid == auth_user_guid - for officer in inspection.other_officers +def _access_check_update(inspection_id: dict): + """Access check for update.""" + auth_user_guid = g.token_info["preferred_username"] + inspection = InspectionModel.find_by_id(inspection_id) + if ( + not auth.has_permission([PermissionEnum.SUPERUSER]) + and not inspection.primary_officer.auth_user_guid == auth_user_guid + ): + raise PermissionDeniedError( + "You don't have the correct permission to perform this operation." ) diff --git a/compliance-api/src/compliance_api/utils/constant.py b/compliance-api/src/compliance_api/utils/constant.py index 7c8945bc..7641e1d2 100644 --- a/compliance-api/src/compliance_api/utils/constant.py +++ b/compliance-api/src/compliance_api/utils/constant.py @@ -10,4 +10,5 @@ PermissionEnum.SUPERUSER: "/COMPLIANCE/SUPERUSER", PermissionEnum.VIEWER: "/COMPLIANCE/VIEWER", PermissionEnum.USER: "/COMPLIANCE/USER", + PermissionEnum.ADMIN: "/COMPLIANCE/ADMIN" } diff --git a/compliance-api/src/compliance_api/utils/enum.py b/compliance-api/src/compliance_api/utils/enum.py index 264fc8d8..0f8e2e65 100644 --- a/compliance-api/src/compliance_api/utils/enum.py +++ b/compliance-api/src/compliance_api/utils/enum.py @@ -40,3 +40,4 @@ class PermissionEnum(Enum): VIEWER = "Viewer" USER = "User" SUPERUSER = "Superuser" + ADMIN = "Admin" diff --git a/compliance-api/tests/conftest.py b/compliance-api/tests/conftest.py index b48f1601..bfc71de9 100644 --- a/compliance-api/tests/conftest.py +++ b/compliance-api/tests/conftest.py @@ -198,6 +198,14 @@ def client_id(): @pytest.fixture() def auth_header(jwt): """Create a basic admin header for tests.""" - default_claims = TokenJWTClaims.default + default_claims = TokenJWTClaims.default.value headers = factory_auth_header(jwt=jwt, claims=default_claims) return headers + + +@pytest.fixture() +def auth_header_super_user(jwt): + """Create a super user header.""" + super_user_claims = TokenJWTClaims.super_user.value + headers = factory_auth_header(jwt=jwt, claims=super_user_claims) + return headers diff --git a/compliance-api/tests/integration/api/test_agency.py b/compliance-api/tests/integration/api/test_agency.py index c4501cfd..6ab21d1e 100644 --- a/compliance-api/tests/integration/api/test_agency.py +++ b/compliance-api/tests/integration/api/test_agency.py @@ -1,4 +1,5 @@ """test suit for agency.""" + import json from http import HTTPStatus from urllib.parse import urljoin @@ -10,66 +11,98 @@ API_BASE_URL = "/api/" -def test_get_agencies(app, client, auth_header): +def test_get_agencies(app, client, auth_header_super_user): """Get agencies.""" url = urljoin(API_BASE_URL, "agencies") # create agencies AgencyScenario.create(AgencyScenario.agency1.value) AgencyScenario.create(AgencyScenario.agency2.value) - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) assert len(result.json) == 8 assert result.status_code == HTTPStatus.OK -def test_get_specific_agency(app, client, auth_header): +def test_get_specific_agency(app, client, auth_header_super_user): """Get agency by id.""" # create agencies created_agency = AgencyScenario.create(AgencyScenario.agency1.value) url = urljoin(API_BASE_URL, f"agencies/{created_agency.id}") - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.OK assert result.json["id"] == created_agency.id assert result.json["name"] == created_agency.name -def test_create_agencies(client, auth_header): +def test_create_agencies(client, auth_header_super_user): """Create agency.""" url = urljoin(API_BASE_URL, "agencies") result = client.post( - url, data=json.dumps(AgencyScenario.default_agency.value), headers=auth_header + url, + data=json.dumps(AgencyScenario.default_agency.value), + headers=auth_header_super_user, ) assert result.json["name"] == AgencyScenario.default_agency.value["name"] assert result.status_code == HTTPStatus.CREATED -def test_create_agencies_with_mandatory_missing(client, auth_header): +def test_create_agencies_with_non_super_user(client, auth_header): + """Create agency.""" + url = urljoin(API_BASE_URL, "agencies") + result = client.post( + url, + data=json.dumps(AgencyScenario.default_agency.value), + headers=auth_header, + ) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_create_agencies_with_mandatory_missing(client, auth_header_super_user): """Create agency.""" url = urljoin(API_BASE_URL, "agencies") create_dict = AgencyScenario.default_agency.value create_dict.pop("name") - result = client.post(url, data=json.dumps(create_dict), headers=auth_header) + result = client.post( + url, data=json.dumps(create_dict), headers=auth_header_super_user + ) response = json.loads(result.json["message"]) assert "name" in response assert result.status_code == HTTPStatus.BAD_REQUEST -def test_update_agency(client, auth_header): +def test_update_agency(client, auth_header_super_user): """Update agency.""" agency = AgencyScenario.create(AgencyScenario.agency1.value) url = urljoin(API_BASE_URL, f"agencies/{agency.id}") update_dict = AgencyScenario.agency1.value update_dict["name"] = "changed" - result = client.patch(url, data=json.dumps(update_dict), headers=auth_header) + result = client.patch( + url, data=json.dumps(update_dict), headers=auth_header_super_user + ) assert result.json["name"] == update_dict["name"] assert result.status_code == HTTPStatus.OK -def test_delete_agency(client, auth_header): +def test_update_agency_with_non_super_user(client, auth_header): + """Update agency.""" + url = urljoin(API_BASE_URL, "agencies/1") + update_dict = AgencyScenario.agency1.value + result = client.patch(url, data=json.dumps(update_dict), headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_delete_agency(client, auth_header_super_user): """Update agency.""" agency = AgencyScenario.create(AgencyScenario.agency1.value) url = urljoin(API_BASE_URL, f"agencies/{agency.id}") - result = client.delete(url, headers=auth_header) + result = client.delete(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.OK agency_get = AgencyModel.find_by_id(agency.id) print(agency.is_deleted) assert agency_get is None + + +def test_delete_agency_with_non_super_user(client, auth_header): + """Update agency.""" + url = urljoin(API_BASE_URL, "agencies/1") + result = client.delete(url, headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN diff --git a/compliance-api/tests/integration/api/test_case_file.py b/compliance-api/tests/integration/api/test_case_file.py index d911a54c..a1ea1e97 100644 --- a/compliance-api/tests/integration/api/test_case_file.py +++ b/compliance-api/tests/integration/api/test_case_file.py @@ -11,7 +11,8 @@ from compliance_api.models.case_file import CaseFileStatusEnum from compliance_api.services.case_file import CaseFileService -from tests.utilities.factory_scenario import CasefileScenario, StaffScenario +from tests.utilities.factory_scenario import CasefileScenario, StaffScenario, TokenJWTClaims +from tests.utilities.factory_utils import factory_auth_header API_BASE_URL = "/api/" @@ -56,20 +57,35 @@ def test_get_case_file_initiation_options(client, auth_header): assert result.status_code == HTTPStatus.OK -def test_create_case_file_without_file_number(client, auth_header, created_staff): +def test_create_case_file_without_file_number( + client, auth_header_super_user, created_staff +): """Create case file with basic fields.""" url = urljoin(API_BASE_URL, "case-files") case_file_data = CasefileScenario.default_value.value case_file_data["primary_officer_id"] = created_staff.id result = client.post( - url, data=json.dumps(CasefileScenario.default_value.value), headers=auth_header + url, + data=json.dumps(CasefileScenario.default_value.value), + headers=auth_header_super_user, ) assert result.status_code == HTTPStatus.CREATED assert result.json["case_file_number"] == f"{datetime.now().year}0001" assert result.json["case_file_status"] == CaseFileStatusEnum.OPEN.value -def test_create_case_file_with_file_number(client, auth_header): +def test_create_case_file_with_non_superuser(client, auth_header, created_staff): + """Create case file with basic fields.""" + url = urljoin(API_BASE_URL, "case-files") + case_file_data = CasefileScenario.default_value.value + case_file_data["primary_officer_id"] = created_staff.id + result = client.post( + url, data=json.dumps(CasefileScenario.default_value.value), headers=auth_header + ) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_create_case_file_with_file_number(client, auth_header_super_user): """Create case file with file number.""" user_data = StaffScenario.default_data.value auth_user_guid = fake.word() @@ -80,7 +96,9 @@ def test_create_case_file_with_file_number(client, auth_header): case_file_data["case_file_number"] = "XYZ" case_file_data["primary_officer_id"] = new_user.id case_file_data["officer_ids"] = [new_user.id] - result = client.post(url, data=json.dumps(case_file_data), headers=auth_header) + result = client.post( + url, data=json.dumps(case_file_data), headers=auth_header_super_user + ) assert result.json["case_file_number"] == "XYZ" assert result.json["case_file_status"] == CaseFileStatusEnum.OPEN.value assert result.json["primary_officer_id"] == new_user.id @@ -92,14 +110,16 @@ def test_create_case_file_with_file_number(client, auth_header): def test_create_case_file_with_existing_case_file_number( - client, auth_header, created_staff + client, auth_header_super_user, created_staff ): """Create case file with basic fields.""" url = urljoin(API_BASE_URL, "case-files") case_file_data = copy.copy(CasefileScenario.default_value.value) case_file_data["case_file_number"] = "XYZ" case_file_data["primary_officer_id"] = created_staff.id - result = client.post(url, data=json.dumps(case_file_data), headers=auth_header) + result = client.post( + url, data=json.dumps(case_file_data), headers=auth_header_super_user + ) print(result.json) assert result.status_code == HTTPStatus.CONFLICT @@ -175,7 +195,7 @@ def test_get_case_file_by_number(client, auth_header): assert result.json["case_file_number"] == case_file_data["case_file_number"] -def test_case_file_update(client, auth_header, created_staff): +def test_case_file_update(client, auth_header_super_user, created_staff): """Update case file.""" # creating case file without officers or primary officer case_file_data = copy.copy(CasefileScenario.default_value.value) @@ -186,8 +206,7 @@ def test_case_file_update(client, auth_header, created_staff): API_BASE_URL, f"case-files/case-file-numbers/{case_file_data['case_file_number']}", ) - result = client.get(url, headers=auth_header) - print(result.json) + result = client.get(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.OK officers = CaseFileService.get_other_officers(result.json["id"]) assert len(officers) == 0 @@ -200,7 +219,9 @@ def test_case_file_update(client, auth_header, created_staff): case_file_data["primary_officer_id"] = new_user.id case_file_data["officer_ids"] = [new_user.id] url = urljoin(API_BASE_URL, f"case-files/{created_case_file.id}") - result = client.patch(url, data=json.dumps(case_file_data), headers=auth_header) + result = client.patch( + url, data=json.dumps(case_file_data), headers=auth_header_super_user + ) assert result.status_code == HTTPStatus.OK assert result.json["primary_officer_id"] == new_user.id @@ -211,9 +232,38 @@ def test_case_file_update(client, auth_header, created_staff): case_file_data["primary_officer_id"] = new_user.id case_file_data["officer_ids"] = [] url = urljoin(API_BASE_URL, f"case-files/{created_case_file.id}") - result = client.patch(url, data=json.dumps(case_file_data), headers=auth_header) + result = client.patch( + url, data=json.dumps(case_file_data), headers=auth_header_super_user + ) assert result.status_code == HTTPStatus.OK assert result.json["primary_officer_id"] == new_user.id officers = CaseFileService.get_other_officers(result.json["id"]) assert len(officers) == 0 + + +def test_case_file_update_viewer_fails(client, auth_header, created_staff): + """Update as Viewer.""" + case_file_data = copy.copy(CasefileScenario.default_value.value) + case_file_data["case_file_number"] = fake.word() + case_file_data["primary_officer_id"] = created_staff.id + created_case_file = CaseFileService.create(case_file_data) + url = urljoin(API_BASE_URL, f"case-files/{created_case_file.id}") + result = client.patch(url, data=json.dumps(case_file_data), headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_case_file_update_with_primary(client, jwt, created_staff): + """Update as primary.""" + case_file_data = copy.copy(CasefileScenario.default_value.value) + case_file_data["case_file_number"] = fake.word() + case_file_data["primary_officer_id"] = created_staff.id + created_case_file = CaseFileService.create(case_file_data) + + header = TokenJWTClaims.default.value + header["preferred_username"] = created_staff.auth_user_guid + headers = factory_auth_header(jwt=jwt, claims=header) + + url = urljoin(API_BASE_URL, f"case-files/{created_case_file.id}") + result = client.patch(url, data=json.dumps(case_file_data), headers=headers) + assert result.status_code == HTTPStatus.OK diff --git a/compliance-api/tests/integration/api/test_staff_user.py b/compliance-api/tests/integration/api/test_staff_user.py index e44eed71..252d04c7 100644 --- a/compliance-api/tests/integration/api/test_staff_user.py +++ b/compliance-api/tests/integration/api/test_staff_user.py @@ -34,7 +34,9 @@ def mock_auth_service(mocker): yield mock_get_user_by_guid, mock_update_user_group -def test_create_staff_user_mandatory(mock_auth_service, mocker, client, auth_header): +def test_create_staff_user_mandatory( + mock_auth_service, mocker, client, auth_header_super_user +): """Create staff user.""" url = urljoin(API_BASE_URL, "staff-users") mock_get_user_by_guid = mocker.patch( @@ -54,7 +56,9 @@ def test_create_staff_user_mandatory(mock_auth_service, mocker, client, auth_hea "position_id": 1, } - result = client.post(url, data=json.dumps(staff_user_data), headers=auth_header) + result = client.post( + url, data=json.dumps(staff_user_data), headers=auth_header_super_user + ) print(result.json) assert result.status_code == HTTPStatus.CREATED assert result.json["auth_user_guid"] == staff_user_data["auth_user_guid"] @@ -64,7 +68,9 @@ def test_create_staff_user_mandatory(mock_auth_service, mocker, client, auth_hea assert result.json["last_name"] == lastname -def test_create_staff_user_all_fields(mock_auth_service, mocker, client, auth_header): +def test_create_staff_user_all_fields( + mock_auth_service, mocker, client, auth_header_super_user +): """Create staff user.""" user_data = StaffScenario.default_data.value auth_user_guid = fake.word() @@ -90,7 +96,9 @@ def test_create_staff_user_all_fields(mock_auth_service, mocker, client, auth_he "supervisor_id": new_user.id, } - result = client.post(url, data=json.dumps(staff_user_data), headers=auth_header) + result = client.post( + url, data=json.dumps(staff_user_data), headers=auth_header_super_user + ) assert result.status_code == HTTPStatus.CREATED assert result.json["auth_user_guid"] == staff_user_data["auth_user_guid"] @@ -102,7 +110,28 @@ def test_create_staff_user_all_fields(mock_auth_service, mocker, client, auth_he assert result.json["last_name"] == lastname -def test_create_existing_user(mock_auth_service, client, auth_header): +def test_create_staff_user_with_non_super_user( + mock_auth_service, mocker, client, auth_header +): + """Create staff user.""" + user_data = StaffScenario.default_data.value + auth_user_guid = fake.word() + user_data["auth_user_guid"] = auth_user_guid + new_user = StaffScenario.create(user_data) + url = urljoin(API_BASE_URL, "staff-users") + username = fake.word() + staff_user_data = { + "auth_user_guid": username, + "permission": "USER", + "position_id": 1, + "deputy_director_id": new_user.id, + "supervisor_id": new_user.id, + } + result = client.post(url, data=json.dumps(staff_user_data), headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_create_existing_user(mock_auth_service, client, auth_header_super_user): """Create an existing user.""" url = urljoin(API_BASE_URL, "staff-users") user_data = StaffScenario.default_data.value @@ -116,12 +145,14 @@ def test_create_existing_user(mock_auth_service, client, auth_header): "position_id": 1, } - result = client.post(url, data=json.dumps(user_payload), headers=auth_header) + result = client.post( + url, data=json.dumps(user_payload), headers=auth_header_super_user + ) assert result.status_code == HTTPStatus.CONFLICT -def test_get_users(mock_auth_service, mocker, client, auth_header): +def test_get_users(mock_auth_service, mocker, client, auth_header_super_user): """Create an existing user.""" url = urljoin(API_BASE_URL, "staff-users") user_data = StaffScenario.default_data.value @@ -140,7 +171,7 @@ def test_get_users(mock_auth_service, mocker, client, auth_header): mock_get_epic_users_by_app.return_value = epic_users - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) print(result.json) filtered_user = next( (user for user in result.json if user["auth_user_guid"] == auth_user_guid1), @@ -152,29 +183,46 @@ def test_get_users(mock_auth_service, mocker, client, auth_header): assert result.status_code == HTTPStatus.OK -def test_get_user_by_id(mock_auth_service, client, auth_header): +def test_get_users_with_non_super_user(mock_auth_service, mocker, client, auth_header): + """Create an existing user.""" + url = urljoin(API_BASE_URL, "staff-users") + + result = client.get(url, headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_get_user_by_id(mock_auth_service, client, auth_header_super_user): """Get user by id.""" staff_data = StaffScenario.default_data.value staff_data["auth_user_guid"] = fake.word() created_user = StaffScenario.create(staff_data) url = urljoin(API_BASE_URL, f"staff-users/{created_user.id}") - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.OK assert result.json["id"] == created_user.id -def test_get_user_by_id_not_found(mock_auth_service, client, auth_header): +def test_get_user_by_id_with_non_super_user(mock_auth_service, client, auth_header): + """Get user by id.""" + url = urljoin(API_BASE_URL, "staff-users/1") + + result = client.get(url, headers=auth_header) + + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_get_user_by_id_not_found(mock_auth_service, client, auth_header_super_user): """Get user by id not found.""" url = urljoin(API_BASE_URL, "staff-users/9999") - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.NOT_FOUND -def test_update_staff(mock_auth_service, client, auth_header): +def test_update_staff(mock_auth_service, client, auth_header_super_user): """Update staff user.""" staff_data = StaffScenario.default_data.value staff_data["auth_user_guid"] = fake.word() @@ -189,14 +237,31 @@ def test_update_staff(mock_auth_service, client, auth_header): "permission": "VIEWER", } - result = client.patch(url, data=json.dumps(update_payload), headers=auth_header) + result = client.patch( + url, data=json.dumps(update_payload), headers=auth_header_super_user + ) assert result.status_code == HTTPStatus.OK assert result.json["deputy_director_id"] == update_payload["deputy_director_id"] assert result.json["supervisor_id"] == update_payload["supervisor_id"] -def test_user_update_non_existing(mock_auth_service, client, auth_header): +def test_update_staff_with_non_super_user(mock_auth_service, client, auth_header): + """Update staff user.""" + url = urljoin(API_BASE_URL, "staff-users/1") + update_payload = { + "position_id": 2, + "deputy_director_id": 1, + "supervisor_id": 1, + "permission": "VIEWER", + } + + result = client.patch(url, data=json.dumps(update_payload), headers=auth_header) + + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_user_update_non_existing(mock_auth_service, client, auth_header_super_user): """Update non-existing user.""" url = urljoin(API_BASE_URL, "staff-users/9999") update_payload = { @@ -205,22 +270,33 @@ def test_user_update_non_existing(mock_auth_service, client, auth_header): "supervisor_id": 1, "permission": "VIEWER", } - result = client.patch(url, data=json.dumps(update_payload), headers=auth_header) + result = client.patch( + url, data=json.dumps(update_payload), headers=auth_header_super_user + ) print(result.json) assert result.status_code == HTTPStatus.NOT_FOUND -def test_delete_user(mock_auth_service, client, auth_header): +def test_delete_user(mock_auth_service, client, auth_header_super_user): """Delete user.""" staff_data = StaffScenario.default_data.value staff_data["auth_user_guid"] = fake.word() created_user = StaffScenario.create(staff_data) url = urljoin(API_BASE_URL, f"staff-users/{created_user.id}") - result = client.delete(url, headers=auth_header) + result = client.delete(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.OK - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.NOT_FOUND + + +def test_delete_user_with_non_super_user(mock_auth_service, client, auth_header): + """Delete user.""" + url = urljoin(API_BASE_URL, "staff-users/1") + + result = client.delete(url, headers=auth_header) + + assert result.status_code == HTTPStatus.FORBIDDEN diff --git a/compliance-api/tests/integration/api/test_topic.py b/compliance-api/tests/integration/api/test_topic.py index fd8f7b21..72df9407 100644 --- a/compliance-api/tests/integration/api/test_topic.py +++ b/compliance-api/tests/integration/api/test_topic.py @@ -1,4 +1,5 @@ """Test suit for topic.""" + import json from http import HTTPStatus from urllib.parse import urljoin @@ -10,66 +11,112 @@ API_BASE_URL = "/api/" -def test_get_topics(app, client, auth_header): +def test_get_topics(app, client, auth_header_super_user): """Get topics.""" url = urljoin(API_BASE_URL, "topics") topics_in_db = TopicModel.get_all() # Create topics TopicScenario.create(TopicScenario.topic1.value) TopicScenario.create(TopicScenario.topic2.value) - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) assert len(result.json) == len(topics_in_db) + 2 assert result.status_code == HTTPStatus.OK -def test_get_specific_topic(app, client, auth_header): +def test_get_topics_with_non_super_user(app, client, auth_header): + """Get topics.""" + url = urljoin(API_BASE_URL, "topics") + result = client.get(url, headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_get_specific_topic(app, client, auth_header_super_user): """Get topic by id.""" # Create a topic created_topic = TopicScenario.create(TopicScenario.topic1.value) url = urljoin(API_BASE_URL, f"topics/{created_topic.id}") - result = client.get(url, headers=auth_header) + result = client.get(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.OK assert result.json["id"] == created_topic.id assert result.json["name"] == created_topic.name -def test_create_topic(client, auth_header): +def test_get_specific_topic_with_non_super_user(app, client, auth_header): + """Get topic by id.""" + url = urljoin(API_BASE_URL, "topics/1") + result = client.get(url, headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_create_topic(client, auth_header_super_user): """Create topic.""" url = urljoin(API_BASE_URL, "topics") result = client.post( - url, data=json.dumps(TopicScenario.default_topic.value), headers=auth_header + url, + data=json.dumps(TopicScenario.default_topic.value), + headers=auth_header_super_user, ) assert result.json["name"] == TopicScenario.default_topic.value["name"] assert result.status_code == HTTPStatus.CREATED -def test_create_topic_with_mandatory_missing(client, auth_header): +def test_create_topic_with_non_super_user(client, auth_header): + """Create topic.""" + url = urljoin(API_BASE_URL, "topics") + result = client.post( + url, + data=json.dumps(TopicScenario.default_topic.value), + headers=auth_header, + ) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_create_topic_with_mandatory_missing(client, auth_header_super_user): """Create topic with missing mandatory field.""" url = urljoin(API_BASE_URL, "topics") create_dict = TopicScenario.default_topic.value create_dict.pop("name") - result = client.post(url, data=json.dumps(create_dict), headers=auth_header) + result = client.post( + url, data=json.dumps(create_dict), headers=auth_header_super_user + ) response = json.loads(result.json["message"]) assert "name" in response assert result.status_code == HTTPStatus.BAD_REQUEST -def test_update_topic(client, auth_header): +def test_update_topic(client, auth_header_super_user): """Update topic.""" topic = TopicScenario.create(TopicScenario.topic1.value) url = urljoin(API_BASE_URL, f"topics/{topic.id}") update_dict = TopicScenario.topic1.value update_dict["name"] = "changed" - result = client.patch(url, data=json.dumps(update_dict), headers=auth_header) + result = client.patch( + url, data=json.dumps(update_dict), headers=auth_header_super_user + ) assert result.json["name"] == update_dict["name"] assert result.status_code == HTTPStatus.OK -def test_delete_topic(client, auth_header): +def test_update_topic_with_non_super_user(client, auth_header): + """Update topic.""" + url = urljoin(API_BASE_URL, "topics/1") + update_dict = TopicScenario.topic1.value + result = client.patch(url, data=json.dumps(update_dict), headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN + + +def test_delete_topic(client, auth_header_super_user): """Delete topic.""" topic = TopicScenario.create(TopicScenario.topic1.value) url = urljoin(API_BASE_URL, f"topics/{topic.id}") - result = client.delete(url, headers=auth_header) + result = client.delete(url, headers=auth_header_super_user) assert result.status_code == HTTPStatus.OK topic_get = TopicModel.find_by_id(topic.id) assert topic_get is None + + +def test_delete_topic_with_non_supser_user(client, auth_header): + """Delete topic.""" + url = urljoin(API_BASE_URL, "topics/1") + result = client.delete(url, headers=auth_header) + assert result.status_code == HTTPStatus.FORBIDDEN diff --git a/compliance-api/tests/utilities/factory_scenario/token_claim.py b/compliance-api/tests/utilities/factory_scenario/token_claim.py index fc11533f..ff0c295a 100644 --- a/compliance-api/tests/utilities/factory_scenario/token_claim.py +++ b/compliance-api/tests/utilities/factory_scenario/token_claim.py @@ -23,3 +23,12 @@ class TokenJWTClaims(dict, Enum): "groups": ["/COMPLIANCE/VIEWER"], "realm_access": {"roles": []}, } + super_user = { + "iss": CONFIG.JWT_OIDC_TEST_ISSUER, + "sub": "f7a4a1d3-73a8-4cbc-a40f-bb1145302065", + "firstname": fake.first_name(), + "lastname": fake.last_name(), + "preferred_username": fake.user_name(), + "groups": ["/COMPLIANCE/SUPERUSER"], + "realm_access": {"roles": []}, + } diff --git a/deployment/charts/compliance-api/templates/deploymentconfig.yaml b/deployment/charts/compliance-api/templates/deploymentconfig.yaml index a8a11765..9fd4a470 100644 --- a/deployment/charts/compliance-api/templates/deploymentconfig.yaml +++ b/deployment/charts/compliance-api/templates/deploymentconfig.yaml @@ -58,7 +58,7 @@ spec: labels: app: {{ .Chart.Name }} annotations: - vault.hashicorp.com/agent-inject: "true" + .hashicorp.com/agent-inject: "true" vault.hashicorp.com/agent-inject-secret-{{ .Values.vault.secretName }}: {{ .Values.vault.path }} vault.hashicorp.com/agent-inject-template-{{ .Values.vault.secretName }}: | {{`{{`}} with secret "{{ .Values.vault.path }}" {{`}}`}} @@ -69,7 +69,7 @@ spec: vault.hashicorp.com/agent-inject-token: "true" vault.hashicorp.com/agent-pre-populate-only: "true" vault.hashicorp.com/auth-path: {{ .Values.vault.authPath }} - vault.hashicorp.com/namespace: {{ .Values.vault.namespace }} + vault.hashicorp.com/namespace: {vault{ .Values.vault.namespace }} vault.hashicorp.com/role: {{ .Values.vault.role }} spec: containers: