From 3e41d03a64356b565e992f416c9e5508279cf1d7 Mon Sep 17 00:00:00 2001 From: MCatherine Date: Tue, 28 Nov 2023 12:01:08 -0800 Subject: [PATCH] feat: #1046 add test for admin management endpoints (#1055) --- .../api/app/repositories/user_repository.py | 4 + .../app/routers/router_application_admin.py | 2 +- .../api/app/services/user_service.py | 3 + server/admin_management/local-dev.env | 1 - server/admin_management/readme.md | 3 +- server/admin_management/tests/conftest.py | 44 ++++ server/admin_management/tests/constants.py | 29 +++ .../test_application_admin_repository.py | 103 ++++++++++ .../test_application_repository.py | 23 +++ .../repositories/test_user_repository.py | 83 ++++++++ .../routers/test_application_admin_router.py | 193 ++++++++++++++++++ .../test_application_admin_service.py | 59 ++++++ .../tests/services/test_user_service.py | 45 ++++ 13 files changed, 589 insertions(+), 3 deletions(-) create mode 100644 server/admin_management/tests/constants.py create mode 100644 server/admin_management/tests/repositories/test_application_admin_repository.py create mode 100644 server/admin_management/tests/repositories/test_application_repository.py create mode 100644 server/admin_management/tests/repositories/test_user_repository.py create mode 100644 server/admin_management/tests/routers/test_application_admin_router.py create mode 100644 server/admin_management/tests/services/test_application_admin_service.py create mode 100644 server/admin_management/tests/services/test_user_service.py diff --git a/server/admin_management/api/app/repositories/user_repository.py b/server/admin_management/api/app/repositories/user_repository.py index 746cc653e..3c9cb6cf4 100644 --- a/server/admin_management/api/app/repositories/user_repository.py +++ b/server/admin_management/api/app/repositories/user_repository.py @@ -1,5 +1,6 @@ import logging from sqlalchemy.orm import Session +from typing import List from api.app.models import model as models from api.app import schemas @@ -35,6 +36,9 @@ def get_user_by_cognito_user_id(self, cognito_user_id: str) -> models.FamUser: .one_or_none() ) + def get_users(self) -> List[models.FamUser]: + return self.db.query(models.FamUser).all() + def create_user(self, fam_user: schemas.FamUser) -> models.FamUser: LOGGER.debug(f"Creating fam user: {fam_user}") diff --git a/server/admin_management/api/app/routers/router_application_admin.py b/server/admin_management/api/app/routers/router_application_admin.py index ddd8444a8..b4e5445ab 100644 --- a/server/admin_management/api/app/routers/router_application_admin.py +++ b/server/admin_management/api/app/routers/router_application_admin.py @@ -144,7 +144,7 @@ def delete_application_admin( status_code=200, dependencies=[Depends(authorize_by_fam_admin)], ) -def get_application_admin_by_applicationid( +def get_application_admin_by_application_id( application_id: int, db: Session = Depends(database.get_db), ): diff --git a/server/admin_management/api/app/services/user_service.py b/server/admin_management/api/app/services/user_service.py index c48c0cea7..bedc8bd8f 100644 --- a/server/admin_management/api/app/services/user_service.py +++ b/server/admin_management/api/app/services/user_service.py @@ -18,6 +18,9 @@ def get_user_by_domain_and_name(self, user_type_code: str, user_name: str): def get_user_by_cognito_user_id(self, cognito_user_id: str): return self.user_repo.get_user_by_cognito_user_id(cognito_user_id) + def get_users(self): + return self.user_repo.get_users() + def find_or_create(self, user_type_code: str, user_name: str, requester: str): LOGGER.debug( f"Request for finding or creating a user with user_type: {user_type_code}, " diff --git a/server/admin_management/local-dev.env b/server/admin_management/local-dev.env index 599e26cce..6e7d8954f 100644 --- a/server/admin_management/local-dev.env +++ b/server/admin_management/local-dev.env @@ -7,4 +7,3 @@ COGNITO_REGION=ca-central-1 COGNITO_USER_POOL_ID=ca-central-1_p8X8GdjKW COGNITO_CLIENT_ID=6jfveou69mgford233or30hmta COGNITO_USER_POOL_DOMAIN=dev-fam-user-pool-domain - diff --git a/server/admin_management/readme.md b/server/admin_management/readme.md index ab786a667..035f864ae 100644 --- a/server/admin_management/readme.md +++ b/server/admin_management/readme.md @@ -18,4 +18,5 @@ Note: This is no longer necessary if running through Docker or running tests thr ``` cd server/admin_management set -o allexport; source local-dev.env; set +o allexport -``` \ No newline at end of file +``` + diff --git a/server/admin_management/tests/conftest.py b/server/admin_management/tests/conftest.py index 0ff6afaab..65cf7108d 100644 --- a/server/admin_management/tests/conftest.py +++ b/server/admin_management/tests/conftest.py @@ -15,6 +15,12 @@ import api.app.jwt_validation as jwt_validation from api.app.main import app +from api.app.repositories.user_repository import UserRepository +from api.app.repositories.application_repository import ApplicationRepository +from api.app.repositories.application_admin_repository import ApplicationAdminRepository +from api.app.services.user_service import UserService +from api.app.services.application_admin_service import ApplicationAdminService + LOGGER = logging.getLogger(__name__) # the folder contains test docker-compose.yml, ours in the root directory COMPOSE_PATH = os.path.join(os.path.dirname(__file__), "../../../") @@ -67,6 +73,23 @@ def test_client_fixture_unit() -> TestClient: return TestClient(app) +@pytest.fixture(scope="function") +def test_client_fixture(db_pg_session) -> TestClient: + """returns a requests object of the current app, + with the objects defined in the model created in it. + + :rtype: starlette.testclient + """ + # reset to default database which points to postgres container + app.dependency_overrides[database.get_db] = lambda: db_pg_session + + yield TestClient(app) + + # reset other dependency override back to app default in each test + # during test case teardown. + app.dependency_overrides = {} + + @pytest.fixture(scope="function") def test_rsa_key(): @@ -110,3 +133,24 @@ def override_get_rsa_key_method_none(): def override_get_rsa_key_none(kid): return None + + +@pytest.fixture(scope="function") +def user_repo(db_pg_session: Session): + return UserRepository(db_pg_session) + +@pytest.fixture(scope="function") +def application_repo(db_pg_session: Session): + return ApplicationRepository(db_pg_session) + +@pytest.fixture(scope="function") +def application_admin_repo(db_pg_session: Session): + return ApplicationAdminRepository(db_pg_session) + +@pytest.fixture(scope="function") +def user_service(db_pg_session: Session): + return UserService(db_pg_session) + +@pytest.fixture(scope="function") +def application_admin_service(db_pg_session: Session): + return ApplicationAdminService(db_pg_session) \ No newline at end of file diff --git a/server/admin_management/tests/constants.py b/server/admin_management/tests/constants.py new file mode 100644 index 000000000..93b23655a --- /dev/null +++ b/server/admin_management/tests/constants.py @@ -0,0 +1,29 @@ +from api.app import constants as famConstants + + +TEST_CREATOR = "TESTER" +TEST_FOM_DEV_ADMIN_ROLE = "FOM_DEV_ACCESS_ADMIN" +INVALID_APPLICATION_ID = "invalid_application_id" + +# ---------------------- test user data ----------------------------- # +TEST_INVALID_USER_TYPE = "NS" +TEST_NON_EXISTS_COGNITO_USER_ID = f"dev-idir_nonexists@idir" + +TEST_NEW_USER = { + "user_type_code": famConstants.UserType.IDIR, + "user_name": "TEST_USER", + "create_user": TEST_CREATOR, +} + +# ---------------------- test application data ---------------------- # +TEST_NOT_EXIST_APPLICATION_ID = 0 +TEST_APPLICATION_ID_FAM = 1 +TEST_APPLICATION_NAME_FAM = "FAM" + +# -------------------- test application admin data ------------------ # +TEST_NEW_APPLICATION_ADMIN_USER_ID = 1 +TEST_NEW_APPLICATION_ADMIN = { + "user_type_code": famConstants.UserType.BCEID, + "user_name": "TEST_USER", + "application_id": TEST_APPLICATION_ID_FAM, +} diff --git a/server/admin_management/tests/repositories/test_application_admin_repository.py b/server/admin_management/tests/repositories/test_application_admin_repository.py new file mode 100644 index 000000000..068dd0ec9 --- /dev/null +++ b/server/admin_management/tests/repositories/test_application_admin_repository.py @@ -0,0 +1,103 @@ +import logging + +from api.app.repositories.application_admin_repository import ApplicationAdminRepository + +from tests.constants import ( + TEST_APPLICATION_ID_FAM, + TEST_NEW_APPLICATION_ADMIN_USER_ID, + TEST_CREATOR, +) + + +LOGGER = logging.getLogger(__name__) + + +def test_create_application_admin_and_get( + application_admin_repo: ApplicationAdminRepository, +): + # create a new application admin + new_application_admin = application_admin_repo.create_application_admin( + TEST_APPLICATION_ID_FAM, + TEST_NEW_APPLICATION_ADMIN_USER_ID, + TEST_CREATOR, + ) + assert new_application_admin.application_id == TEST_APPLICATION_ID_FAM + assert new_application_admin.user_id == TEST_NEW_APPLICATION_ADMIN_USER_ID + + # get the new created application admin + application_admin = application_admin_repo.get_application_admin_by_app_and_user_id( + TEST_APPLICATION_ID_FAM, + TEST_NEW_APPLICATION_ADMIN_USER_ID, + ) + assert new_application_admin.user_id == application_admin.user_id + assert new_application_admin.application_id == application_admin.application_id + assert ( + new_application_admin.application_admin_id + == application_admin.application_admin_id + ) + + +def test_get_application_admin_by_application_id( + application_admin_repo: ApplicationAdminRepository, +): + # find application admin, no data initially + application_admin = application_admin_repo.get_application_admin_by_application_id( + TEST_APPLICATION_ID_FAM + ) + assert len(application_admin) == 0 + + # create a new application admin + new_application_admin = application_admin_repo.create_application_admin( + TEST_APPLICATION_ID_FAM, + TEST_NEW_APPLICATION_ADMIN_USER_ID, + TEST_CREATOR, + ) + assert new_application_admin.application_id == TEST_APPLICATION_ID_FAM + # get the new application admin by application id + application_admin = application_admin_repo.get_application_admin_by_application_id( + TEST_APPLICATION_ID_FAM + ) + assert application_admin is not None + + +def test_get_application_admin_by_id( + application_admin_repo: ApplicationAdminRepository, +): + # create a new application admin + new_application_admin = application_admin_repo.create_application_admin( + TEST_APPLICATION_ID_FAM, + TEST_NEW_APPLICATION_ADMIN_USER_ID, + TEST_CREATOR, + ) + # get the new application admin by id + application_admin = application_admin_repo.get_application_admin_by_id( + new_application_admin.application_admin_id + ) + assert ( + application_admin.application_admin_id + == new_application_admin.application_admin_id + ) + + +def test_delete_application_admin(application_admin_repo: ApplicationAdminRepository): + # create a new application admin + new_application_admin = application_admin_repo.create_application_admin( + TEST_APPLICATION_ID_FAM, + TEST_NEW_APPLICATION_ADMIN_USER_ID, + TEST_CREATOR, + ) + # verify the new application admin is created + application_admin = application_admin_repo.get_application_admin_by_id( + new_application_admin.application_admin_id + ) + assert application_admin is not None + + # remove the application admin + application_admin_repo.delete_application_admin( + new_application_admin.application_admin_id + ) + # verify the application admin cannot be found anymore + application_admin = application_admin_repo.get_application_admin_by_id( + new_application_admin.application_admin_id + ) + assert application_admin is None diff --git a/server/admin_management/tests/repositories/test_application_repository.py b/server/admin_management/tests/repositories/test_application_repository.py new file mode 100644 index 000000000..0e1393a35 --- /dev/null +++ b/server/admin_management/tests/repositories/test_application_repository.py @@ -0,0 +1,23 @@ +import logging + +from api.app.repositories.application_repository import ApplicationRepository + +from tests.constants import ( + TEST_NOT_EXIST_APPLICATION_ID, + TEST_APPLICATION_ID_FAM, + TEST_APPLICATION_NAME_FAM, +) + + +LOGGER = logging.getLogger(__name__) + + +def test_get_application(application_repo: ApplicationRepository): + # test get existing application + app_by_id = application_repo.get_application(TEST_APPLICATION_ID_FAM) + assert app_by_id.application_id == TEST_APPLICATION_ID_FAM + assert app_by_id.application_name == TEST_APPLICATION_NAME_FAM + + # test get non exist application + app_by_id = application_repo.get_application(TEST_NOT_EXIST_APPLICATION_ID) + assert app_by_id is None diff --git a/server/admin_management/tests/repositories/test_user_repository.py b/server/admin_management/tests/repositories/test_user_repository.py new file mode 100644 index 000000000..32e385f4d --- /dev/null +++ b/server/admin_management/tests/repositories/test_user_repository.py @@ -0,0 +1,83 @@ +import logging +import pytest +from sqlalchemy.exc import IntegrityError + +import api.app.schemas as schemas +from api.app.repositories.user_repository import UserRepository + +from tests.constants import ( + TEST_NEW_USER, + TEST_NON_EXISTS_COGNITO_USER_ID, +) +import tests.jwt_utils as jwt_utils + + +LOGGER = logging.getLogger(__name__) + + +def test_get_user_by_domain_and_name(user_repo: UserRepository): + # test not found + fam_user = user_repo.get_user_by_domain_and_name( + TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"] + ) + assert fam_user is None + + # create a new user and find it and verify found + request_user = schemas.FamUser(**TEST_NEW_USER) + new_user = user_repo.create_user(request_user) + fam_user = user_repo.get_user_by_domain_and_name( + TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"] + ) + assert new_user.user_id == fam_user.user_id + assert new_user.user_name == fam_user.user_name + assert new_user.user_type_code == fam_user.user_type_code + + # get user with username lower case + fam_user = user_repo.get_user_by_domain_and_name( + TEST_NEW_USER["user_type_code"], "test_user" + ) + assert new_user.user_id == fam_user.user_id + assert new_user.user_name == fam_user.user_name + assert new_user.user_type_code == fam_user.user_type_code + + +def test_get_user_by_cognito_user_id(user_repo: UserRepository): + # test not found + fam_user = user_repo.get_user_by_cognito_user_id(TEST_NON_EXISTS_COGNITO_USER_ID) + assert fam_user is None + + # test found + fam_user = user_repo.get_user_by_cognito_user_id(jwt_utils.COGNITO_USERNAME) + assert fam_user.cognito_user_id == jwt_utils.COGNITO_USERNAME + + +def test_get_users(user_repo: UserRepository): + users = user_repo.get_users() + assert users is not None + users_count = len(users) + + request_user = schemas.FamUser(**TEST_NEW_USER) + user_repo.create_user(request_user) + users = user_repo.get_users() + assert len(users) == users_count + 1 + + +def test_create_user(user_repo: UserRepository): + request_user = schemas.FamUser(**TEST_NEW_USER) + new_user = user_repo.create_user(request_user) + assert new_user.user_name == TEST_NEW_USER.get("user_name") + assert new_user.user_type_code == TEST_NEW_USER.get("user_type_code") + fam_user = user_repo.get_user_by_domain_and_name( + TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"] + ) + assert new_user.user_id == fam_user.user_id + assert new_user.user_name == fam_user.user_name + assert new_user.user_type_code == fam_user.user_type_code + + # test create duplicate user + with pytest.raises(IntegrityError) as e: + user_repo.create_user(request_user) + assert ( + str(e.value).find('duplicate key value violates unique constraint "fam_usr_uk"') + != -1 + ) diff --git a/server/admin_management/tests/routers/test_application_admin_router.py b/server/admin_management/tests/routers/test_application_admin_router.py new file mode 100644 index 000000000..0fbc88134 --- /dev/null +++ b/server/admin_management/tests/routers/test_application_admin_router.py @@ -0,0 +1,193 @@ +import logging +from http import HTTPStatus +import starlette.testclient + +from api.app.main import apiPrefix +from api.app.jwt_validation import ERROR_PERMISSION_REQUIRED +from api.app.routers.router_guards import ( + ERROR_INVALID_APPLICATION_ID, + ERROR_INVALID_APPLICATION_ADMIN_ID, +) + +from tests.constants import ( + TEST_NEW_APPLICATION_ADMIN, + TEST_INVALID_USER_TYPE, + TEST_NEW_APPLICATION_ADMIN, + TEST_NOT_EXIST_APPLICATION_ID, + TEST_APPLICATION_ID_FAM, + TEST_FOM_DEV_ADMIN_ROLE, + INVALID_APPLICATION_ID, +) +import tests.jwt_utils as jwt_utils + + +LOGGER = logging.getLogger(__name__) +endPoint = f"{apiPrefix}/application_admin" + + +def test_create_application_admin( + test_client_fixture: starlette.testclient.TestClient, test_rsa_key +): + # test create with invalid role + token = jwt_utils.create_jwt_token(test_rsa_key, [TEST_FOM_DEV_ADMIN_ROLE]) + response = test_client_fixture.post( + f"{endPoint}", json=TEST_NEW_APPLICATION_ADMIN, headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.FORBIDDEN + assert response.json() is not None + assert str(response.json()["detail"]).find(ERROR_PERMISSION_REQUIRED) != -1 + + # test create application admin + token = jwt_utils.create_jwt_token( + test_rsa_key + ) # by deafult it will create with FAM admin role + response = test_client_fixture.post( + f"{endPoint}", json=TEST_NEW_APPLICATION_ADMIN, headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.OK + assert response.json() is not None + data = response.json() + assert data.get("application_id") == TEST_NEW_APPLICATION_ADMIN.get( + "application_id" + ) + + # test create duplicate application admin + response = test_client_fixture.post( + f"{endPoint}", json=TEST_NEW_APPLICATION_ADMIN, headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.CONFLICT + assert response.json() is not None + assert str(response.json()["detail"]).find("User is admin already") != -1 + + # test create with invalid user type + response = test_client_fixture.post( + f"{endPoint}", + json={ + "user_type_code": TEST_INVALID_USER_TYPE, + "user_name": TEST_NEW_APPLICATION_ADMIN.get("user_name"), + "application_id": TEST_NEW_APPLICATION_ADMIN.get("application_id"), + }, + headers=jwt_utils.headers(token), + ) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + assert response.json() is not None + assert str(response.json()["detail"]).find("Input should be 'I' or 'B'") != -1 + + # test create with non exists application id + response = test_client_fixture.post( + f"{endPoint}", + json={ + "user_type_code": TEST_NEW_APPLICATION_ADMIN.get("user_type_code"), + "user_name": TEST_NEW_APPLICATION_ADMIN.get("user_name"), + "application_id": TEST_NOT_EXIST_APPLICATION_ID, + }, + headers=jwt_utils.headers(token), + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + assert response.json() is not None + assert str(response.json()["detail"]).find(ERROR_INVALID_APPLICATION_ID) != -1 + + +def test_delete_application_admin( + test_client_fixture: starlette.testclient.TestClient, test_rsa_key +): + # create an application admin first + token = jwt_utils.create_jwt_token(test_rsa_key) + response = test_client_fixture.post( + f"{endPoint}", json=TEST_NEW_APPLICATION_ADMIN, headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.OK + assert response.json() is not None + data = response.json() + assert data.get("application_id") == TEST_NEW_APPLICATION_ADMIN.get( + "application_id" + ) + + # test delete with invalid role + token = jwt_utils.create_jwt_token(test_rsa_key, [TEST_FOM_DEV_ADMIN_ROLE]) + response = test_client_fixture.delete( + f"{endPoint}/{data['application_admin_id']}", headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.FORBIDDEN + assert response.json() is not None + assert str(response.json()["detail"]).find(ERROR_PERMISSION_REQUIRED) != -1 + + # test delete application admin + token = jwt_utils.create_jwt_token(test_rsa_key) + response = test_client_fixture.delete( + f"{endPoint}/{data['application_admin_id']}", headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.OK + + # test delete non exists application admin + response = test_client_fixture.delete( + f"{endPoint}/{data['application_admin_id']}", headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.BAD_REQUEST + assert response.json() is not None + assert str(response.json()["detail"]).find(ERROR_INVALID_APPLICATION_ADMIN_ID) != -1 + + +def test_get_application_admin_by_application_id( + test_client_fixture: starlette.testclient.TestClient, test_rsa_key +): + # test get with invalid role + token = jwt_utils.create_jwt_token(test_rsa_key, [TEST_FOM_DEV_ADMIN_ROLE]) + response = test_client_fixture.get( + f"{endPoint}/{TEST_APPLICATION_ID_FAM}/admins", headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.FORBIDDEN + assert response.json() is not None + assert str(response.json()["detail"]).find(ERROR_PERMISSION_REQUIRED) != -1 + + # get application admin by application id, get original length + token = jwt_utils.create_jwt_token(test_rsa_key) + response = test_client_fixture.get( + f"{endPoint}/{TEST_APPLICATION_ID_FAM}/admins", + headers=jwt_utils.headers(token), + ) + assert response.status_code == HTTPStatus.OK + assert response.json() is not None + origin_admins_length = len(response.json()) + # create an application admin + response = test_client_fixture.post( + f"{endPoint}", json=TEST_NEW_APPLICATION_ADMIN, headers=jwt_utils.headers(token) + ) + assert response.status_code == HTTPStatus.OK + assert response.json() is not None + data = response.json() + assert data.get("application_id") == TEST_NEW_APPLICATION_ADMIN.get( + "application_id" + ) + # get the application by application id again, verify length adds one + response = test_client_fixture.get( + f"{endPoint}/{TEST_APPLICATION_ID_FAM}/admins", + headers=jwt_utils.headers(token), + ) + assert response.status_code == HTTPStatus.OK + assert response.json() is not None + admins_length = len(response.json()) + assert admins_length == origin_admins_length + 1 + + # test get with non exists application id + response = test_client_fixture.get( + f"{endPoint}/{TEST_NOT_EXIST_APPLICATION_ID}/admins", + headers=jwt_utils.headers(token), + ) + assert response.status_code == HTTPStatus.OK + assert response.json() is not None + assert len(response.json()) == 0 + + # test get with invalid application id + response = test_client_fixture.get( + f"{endPoint}/{INVALID_APPLICATION_ID}/admins", + headers=jwt_utils.headers(token), + ) + assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY + assert response.json() is not None + assert ( + str(response.json()["detail"]).find( + "Input should be a valid integer, unable to parse string as an integer" + ) + != -1 + ) diff --git a/server/admin_management/tests/services/test_application_admin_service.py b/server/admin_management/tests/services/test_application_admin_service.py new file mode 100644 index 000000000..d443e1fdf --- /dev/null +++ b/server/admin_management/tests/services/test_application_admin_service.py @@ -0,0 +1,59 @@ +import logging +import pytest +from pydantic import ValidationError +from fastapi import HTTPException + +from api.app import schemas +from api.app.services.application_admin_service import ApplicationAdminService + +from tests.constants import ( + TEST_NEW_APPLICATION_ADMIN, + TEST_CREATOR, + TEST_INVALID_USER_TYPE, +) + + +LOGGER = logging.getLogger(__name__) + + +def test_create_application_admin(application_admin_service: ApplicationAdminService): + # test invalid user type + with pytest.raises(ValidationError) as e: + application_admin_service.create_application_admin( + schemas.FamAppAdminCreate( + **{ + "user_type_code": TEST_INVALID_USER_TYPE, + "user_name": TEST_NEW_APPLICATION_ADMIN.get("user_name"), + "application_id": TEST_NEW_APPLICATION_ADMIN.get("application_id"), + } + ), + TEST_CREATOR, + ) + assert str(e.value).find("Input should be 'I' or 'B'") != -1 + + # test create application admin + new_application_admin = application_admin_service.create_application_admin( + schemas.FamAppAdminCreate(**TEST_NEW_APPLICATION_ADMIN), + TEST_CREATOR, + ) + assert new_application_admin.application_id == TEST_NEW_APPLICATION_ADMIN.get( + "application_id" + ) + # verify the application admin is created + application_admin = application_admin_service.get_application_admin_by_id( + new_application_admin.application_admin_id + ) + assert new_application_admin.user_id == application_admin.user_id + assert new_application_admin.application_id == application_admin.application_id + assert ( + new_application_admin.application_admin_id + == application_admin.application_admin_id + ) + + # test create duplication application admin + with pytest.raises(HTTPException) as e: + application_admin_service.create_application_admin( + schemas.FamAppAdminCreate(**TEST_NEW_APPLICATION_ADMIN), + TEST_CREATOR, + ) + assert str(e._excinfo).find("User is admin already") != -1 diff --git a/server/admin_management/tests/services/test_user_service.py b/server/admin_management/tests/services/test_user_service.py new file mode 100644 index 000000000..5699dd10a --- /dev/null +++ b/server/admin_management/tests/services/test_user_service.py @@ -0,0 +1,45 @@ +import logging + +from api.app.services.user_service import UserService + +from tests.constants import TEST_NEW_USER + + +LOGGER = logging.getLogger(__name__) + + +def test_find_or_create(user_service: UserService): + # verify the new user not exists + found_user = user_service.get_user_by_domain_and_name( + TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"] + ) + assert found_user is None + initial_users_count = user_service.get_users() + + # give the new user + new_user = user_service.find_or_create( + TEST_NEW_USER["user_type_code"], + TEST_NEW_USER["user_name"], + TEST_NEW_USER["create_user"], + ) + assert new_user.user_name == TEST_NEW_USER["user_name"] + assert new_user.user_type_code == TEST_NEW_USER.get("user_type_code") + # verify new user got created + found_user = user_service.get_user_by_domain_and_name( + TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"] + ) + assert new_user.user_id == found_user.user_id + assert new_user.user_name == found_user.user_name + assert new_user.user_type_code == found_user.user_type_code + after_add_users_count = user_service.get_users() + assert len(after_add_users_count) == len(initial_users_count) + 1 + + # give the existing user + user_service.find_or_create( + TEST_NEW_USER["user_type_code"], + TEST_NEW_USER["user_name"], + TEST_NEW_USER["create_user"], + ) + users = user_service.get_users() + # verify no user created + assert len(users) == len(after_add_users_count)