Skip to content

Commit

Permalink
feat: #1046 add test for admin management endpoints (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
MCatherine1994 authored Nov 28, 2023
1 parent 477277f commit 7e39737
Show file tree
Hide file tree
Showing 13 changed files with 589 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from sqlalchemy.orm import Session
from typing import List

from api.app.models import model as models
from api.app import schemas
Expand Down Expand Up @@ -35,6 +36,9 @@ def get_user_by_cognito_user_id(self, cognito_user_id: str) -> models.FamUser:
.one_or_none()
)

def get_users(self) -> List[models.FamUser]:
return self.db.query(models.FamUser).all()

def create_user(self, fam_user: schemas.FamUser) -> models.FamUser:
LOGGER.debug(f"Creating fam user: {fam_user}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def delete_application_admin(
status_code=200,
dependencies=[Depends(authorize_by_fam_admin)],
)
def get_application_admin_by_applicationid(
def get_application_admin_by_application_id(
application_id: int,
db: Session = Depends(database.get_db),
):
Expand Down
3 changes: 3 additions & 0 deletions server/admin_management/api/app/services/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def get_user_by_domain_and_name(self, user_type_code: str, user_name: str):
def get_user_by_cognito_user_id(self, cognito_user_id: str):
return self.user_repo.get_user_by_cognito_user_id(cognito_user_id)

def get_users(self):
return self.user_repo.get_users()

def find_or_create(self, user_type_code: str, user_name: str, requester: str):
LOGGER.debug(
f"Request for finding or creating a user with user_type: {user_type_code}, "
Expand Down
1 change: 0 additions & 1 deletion server/admin_management/local-dev.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ COGNITO_REGION=ca-central-1
COGNITO_USER_POOL_ID=ca-central-1_p8X8GdjKW
COGNITO_CLIENT_ID=6jfveou69mgford233or30hmta
COGNITO_USER_POOL_DOMAIN=dev-fam-user-pool-domain

3 changes: 2 additions & 1 deletion server/admin_management/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ Note: This is no longer necessary if running through Docker or running tests thr
```
cd server/admin_management
set -o allexport; source local-dev.env; set +o allexport
```
```

44 changes: 44 additions & 0 deletions server/admin_management/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
import api.app.jwt_validation as jwt_validation
from api.app.main import app

from api.app.repositories.user_repository import UserRepository
from api.app.repositories.application_repository import ApplicationRepository
from api.app.repositories.application_admin_repository import ApplicationAdminRepository
from api.app.services.user_service import UserService
from api.app.services.application_admin_service import ApplicationAdminService

LOGGER = logging.getLogger(__name__)
# the folder contains test docker-compose.yml, ours in the root directory
COMPOSE_PATH = os.path.join(os.path.dirname(__file__), "../../../")
Expand Down Expand Up @@ -67,6 +73,23 @@ def test_client_fixture_unit() -> TestClient:
return TestClient(app)


@pytest.fixture(scope="function")
def test_client_fixture(db_pg_session) -> TestClient:
"""returns a requests object of the current app,
with the objects defined in the model created in it.
:rtype: starlette.testclient
"""
# reset to default database which points to postgres container
app.dependency_overrides[database.get_db] = lambda: db_pg_session

yield TestClient(app)

# reset other dependency override back to app default in each test
# during test case teardown.
app.dependency_overrides = {}


@pytest.fixture(scope="function")
def test_rsa_key():

Expand Down Expand Up @@ -110,3 +133,24 @@ def override_get_rsa_key_method_none():

def override_get_rsa_key_none(kid):
return None


@pytest.fixture(scope="function")
def user_repo(db_pg_session: Session):
return UserRepository(db_pg_session)

@pytest.fixture(scope="function")
def application_repo(db_pg_session: Session):
return ApplicationRepository(db_pg_session)

@pytest.fixture(scope="function")
def application_admin_repo(db_pg_session: Session):
return ApplicationAdminRepository(db_pg_session)

@pytest.fixture(scope="function")
def user_service(db_pg_session: Session):
return UserService(db_pg_session)

@pytest.fixture(scope="function")
def application_admin_service(db_pg_session: Session):
return ApplicationAdminService(db_pg_session)
29 changes: 29 additions & 0 deletions server/admin_management/tests/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from api.app import constants as famConstants


TEST_CREATOR = "TESTER"
TEST_FOM_DEV_ADMIN_ROLE = "FOM_DEV_ACCESS_ADMIN"
INVALID_APPLICATION_ID = "invalid_application_id"

# ---------------------- test user data ----------------------------- #
TEST_INVALID_USER_TYPE = "NS"
TEST_NON_EXISTS_COGNITO_USER_ID = f"dev-idir_nonexists@idir"

TEST_NEW_USER = {
"user_type_code": famConstants.UserType.IDIR,
"user_name": "TEST_USER",
"create_user": TEST_CREATOR,
}

# ---------------------- test application data ---------------------- #
TEST_NOT_EXIST_APPLICATION_ID = 0
TEST_APPLICATION_ID_FAM = 1
TEST_APPLICATION_NAME_FAM = "FAM"

# -------------------- test application admin data ------------------ #
TEST_NEW_APPLICATION_ADMIN_USER_ID = 1
TEST_NEW_APPLICATION_ADMIN = {
"user_type_code": famConstants.UserType.BCEID,
"user_name": "TEST_USER",
"application_id": TEST_APPLICATION_ID_FAM,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import logging

from api.app.repositories.application_admin_repository import ApplicationAdminRepository

from tests.constants import (
TEST_APPLICATION_ID_FAM,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)


LOGGER = logging.getLogger(__name__)


def test_create_application_admin_and_get(
application_admin_repo: ApplicationAdminRepository,
):
# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
assert new_application_admin.application_id == TEST_APPLICATION_ID_FAM
assert new_application_admin.user_id == TEST_NEW_APPLICATION_ADMIN_USER_ID

# get the new created application admin
application_admin = application_admin_repo.get_application_admin_by_app_and_user_id(
TEST_APPLICATION_ID_FAM,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
)
assert new_application_admin.user_id == application_admin.user_id
assert new_application_admin.application_id == application_admin.application_id
assert (
new_application_admin.application_admin_id
== application_admin.application_admin_id
)


def test_get_application_admin_by_application_id(
application_admin_repo: ApplicationAdminRepository,
):
# find application admin, no data initially
application_admin = application_admin_repo.get_application_admin_by_application_id(
TEST_APPLICATION_ID_FAM
)
assert len(application_admin) == 0

# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
assert new_application_admin.application_id == TEST_APPLICATION_ID_FAM
# get the new application admin by application id
application_admin = application_admin_repo.get_application_admin_by_application_id(
TEST_APPLICATION_ID_FAM
)
assert application_admin is not None


def test_get_application_admin_by_id(
application_admin_repo: ApplicationAdminRepository,
):
# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
# get the new application admin by id
application_admin = application_admin_repo.get_application_admin_by_id(
new_application_admin.application_admin_id
)
assert (
application_admin.application_admin_id
== new_application_admin.application_admin_id
)


def test_delete_application_admin(application_admin_repo: ApplicationAdminRepository):
# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
# verify the new application admin is created
application_admin = application_admin_repo.get_application_admin_by_id(
new_application_admin.application_admin_id
)
assert application_admin is not None

# remove the application admin
application_admin_repo.delete_application_admin(
new_application_admin.application_admin_id
)
# verify the application admin cannot be found anymore
application_admin = application_admin_repo.get_application_admin_by_id(
new_application_admin.application_admin_id
)
assert application_admin is None
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging

from api.app.repositories.application_repository import ApplicationRepository

from tests.constants import (
TEST_NOT_EXIST_APPLICATION_ID,
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_NAME_FAM,
)


LOGGER = logging.getLogger(__name__)


def test_get_application(application_repo: ApplicationRepository):
# test get existing application
app_by_id = application_repo.get_application(TEST_APPLICATION_ID_FAM)
assert app_by_id.application_id == TEST_APPLICATION_ID_FAM
assert app_by_id.application_name == TEST_APPLICATION_NAME_FAM

# test get non exist application
app_by_id = application_repo.get_application(TEST_NOT_EXIST_APPLICATION_ID)
assert app_by_id is None
83 changes: 83 additions & 0 deletions server/admin_management/tests/repositories/test_user_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import logging
import pytest
from sqlalchemy.exc import IntegrityError

import api.app.schemas as schemas
from api.app.repositories.user_repository import UserRepository

from tests.constants import (
TEST_NEW_USER,
TEST_NON_EXISTS_COGNITO_USER_ID,
)
import tests.jwt_utils as jwt_utils


LOGGER = logging.getLogger(__name__)


def test_get_user_by_domain_and_name(user_repo: UserRepository):
# test not found
fam_user = user_repo.get_user_by_domain_and_name(
TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"]
)
assert fam_user is None

# create a new user and find it and verify found
request_user = schemas.FamUser(**TEST_NEW_USER)
new_user = user_repo.create_user(request_user)
fam_user = user_repo.get_user_by_domain_and_name(
TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"]
)
assert new_user.user_id == fam_user.user_id
assert new_user.user_name == fam_user.user_name
assert new_user.user_type_code == fam_user.user_type_code

# get user with username lower case
fam_user = user_repo.get_user_by_domain_and_name(
TEST_NEW_USER["user_type_code"], "test_user"
)
assert new_user.user_id == fam_user.user_id
assert new_user.user_name == fam_user.user_name
assert new_user.user_type_code == fam_user.user_type_code


def test_get_user_by_cognito_user_id(user_repo: UserRepository):
# test not found
fam_user = user_repo.get_user_by_cognito_user_id(TEST_NON_EXISTS_COGNITO_USER_ID)
assert fam_user is None

# test found
fam_user = user_repo.get_user_by_cognito_user_id(jwt_utils.COGNITO_USERNAME)
assert fam_user.cognito_user_id == jwt_utils.COGNITO_USERNAME


def test_get_users(user_repo: UserRepository):
users = user_repo.get_users()
assert users is not None
users_count = len(users)

request_user = schemas.FamUser(**TEST_NEW_USER)
user_repo.create_user(request_user)
users = user_repo.get_users()
assert len(users) == users_count + 1


def test_create_user(user_repo: UserRepository):
request_user = schemas.FamUser(**TEST_NEW_USER)
new_user = user_repo.create_user(request_user)
assert new_user.user_name == TEST_NEW_USER.get("user_name")
assert new_user.user_type_code == TEST_NEW_USER.get("user_type_code")
fam_user = user_repo.get_user_by_domain_and_name(
TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"]
)
assert new_user.user_id == fam_user.user_id
assert new_user.user_name == fam_user.user_name
assert new_user.user_type_code == fam_user.user_type_code

# test create duplicate user
with pytest.raises(IntegrityError) as e:
user_repo.create_user(request_user)
assert (
str(e.value).find('duplicate key value violates unique constraint "fam_usr_uk"')
!= -1
)
Loading

0 comments on commit 7e39737

Please sign in to comment.