Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: #888 transfer to admin table #1068

Merged
merged 11 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion server/admin_management/tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
TEST_APPLICATION_NAME_FAM = "FAM"

# -------------------- test application admin data ------------------ #
TEST_APPLICATION_ADMIN_ID = 3
MCatherine1994 marked this conversation as resolved.
Show resolved Hide resolved
TEST_NEW_APPLICATION_ADMIN_USER_ID = 1
TEST_NEW_APPLICATION_ADMIN = {
"user_type_code": famConstants.UserType.BCEID,
"user_name": "TEST_USER",
"application_id": TEST_APPLICATION_ID_FAM,
"application_id": TEST_APPLICATION_ADMIN_ID,
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from api.app.repositories.application_admin_repository import ApplicationAdminRepository

from tests.constants import (
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_ADMIN_ID,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
Expand All @@ -17,16 +17,16 @@ def test_create_application_admin_and_get(
):
# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_ADMIN_ID,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
assert new_application_admin.application_id == TEST_APPLICATION_ID_FAM
assert new_application_admin.application_id == TEST_APPLICATION_ADMIN_ID
assert new_application_admin.user_id == TEST_NEW_APPLICATION_ADMIN_USER_ID

# get the new created application admin
application_admin = application_admin_repo.get_application_admin_by_app_and_user_id(
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_ADMIN_ID,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
)
assert new_application_admin.user_id == application_admin.user_id
Expand All @@ -40,32 +40,34 @@ def test_create_application_admin_and_get(
def test_get_application_admin_by_application_id(
application_admin_repo: ApplicationAdminRepository,
):
# find application admin, no data initially
application_admin = application_admin_repo.get_application_admin_by_application_id(
TEST_APPLICATION_ID_FAM
# find application admin and get count
application_admins = application_admin_repo.get_application_admin_by_application_id(
TEST_APPLICATION_ADMIN_ID
)
assert len(application_admin) == 0
assert application_admins is not None
application_admin_count = len(application_admins)

# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_ADMIN_ID,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
assert new_application_admin.application_id == TEST_APPLICATION_ID_FAM
assert new_application_admin.application_id == TEST_APPLICATION_ADMIN_ID
# get the new application admin by application id
application_admin = application_admin_repo.get_application_admin_by_application_id(
TEST_APPLICATION_ID_FAM
application_admins = application_admin_repo.get_application_admin_by_application_id(
TEST_APPLICATION_ADMIN_ID
)
assert application_admin is not None
assert application_admins is not None
assert len(application_admins) == application_admin_count + 1


def test_get_application_admin_by_id(
application_admin_repo: ApplicationAdminRepository,
):
# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_ADMIN_ID,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
Expand All @@ -82,7 +84,7 @@ def test_get_application_admin_by_id(
def test_delete_application_admin(application_admin_repo: ApplicationAdminRepository):
# create a new application admin
new_application_admin = application_admin_repo.create_application_admin(
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_ADMIN_ID,
TEST_NEW_APPLICATION_ADMIN_USER_ID,
TEST_CREATOR,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
TEST_INVALID_USER_TYPE,
TEST_NEW_APPLICATION_ADMIN,
MCatherine1994 marked this conversation as resolved.
Show resolved Hide resolved
TEST_NOT_EXIST_APPLICATION_ID,
TEST_APPLICATION_ID_FAM,
TEST_APPLICATION_ADMIN_ID,
TEST_FOM_DEV_ADMIN_ROLE,
INVALID_APPLICATION_ID,
)
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_get_application_admin_by_application_id(
# test get with invalid role
token = jwt_utils.create_jwt_token(test_rsa_key, [TEST_FOM_DEV_ADMIN_ROLE])
response = test_client_fixture.get(
f"{endPoint}/{TEST_APPLICATION_ID_FAM}/admins", headers=jwt_utils.headers(token)
f"{endPoint}/{TEST_APPLICATION_ADMIN_ID}/admins", headers=jwt_utils.headers(token)
)
assert response.status_code == HTTPStatus.FORBIDDEN
assert response.json() is not None
Expand All @@ -143,7 +143,7 @@ def test_get_application_admin_by_application_id(
# get application admin by application id, get original length
token = jwt_utils.create_jwt_token(test_rsa_key)
response = test_client_fixture.get(
f"{endPoint}/{TEST_APPLICATION_ID_FAM}/admins",
f"{endPoint}/{TEST_APPLICATION_ADMIN_ID}/admins",
headers=jwt_utils.headers(token),
)
assert response.status_code == HTTPStatus.OK
Expand All @@ -161,7 +161,7 @@ def test_get_application_admin_by_application_id(
)
# get the application by application id again, verify length adds one
response = test_client_fixture.get(
f"{endPoint}/{TEST_APPLICATION_ID_FAM}/admins",
f"{endPoint}/{TEST_APPLICATION_ADMIN_ID}/admins",
headers=jwt_utils.headers(token),
)
assert response.status_code == HTTPStatus.OK
Expand Down
39 changes: 39 additions & 0 deletions server/auth_function/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,45 @@ def handle_event(db_connection, event) -> event_type.Event:
for record in cursor:
role_list.append(record[0])

# check if login through FAM
MCatherine1994 marked this conversation as resolved.
Show resolved Hide resolved
query_application = """
SELECT application.application_name
FROM app_fam.fam_application application
JOIN app_fam.fam_application_client client ON
application.application_id = client.application_id
WHERE
client.cognito_client_id = {cognito_client_id};
"""
sql_query_application = sql.SQL(query_application).format(
cognito_client_id=sql.Literal(cognito_client_id),
)
cursor.execute(sql_query_application)
# if login through FAM, check fam app admin and add to role list
for record in cursor:
if record[0] == "FAM":
query_fam_app_admin = """
SELECT application.application_name
FROM app_fam.fam_application_admin app_admin
INNER JOIN app_fam.fam_application application ON
ianliuwk1019 marked this conversation as resolved.
Show resolved Hide resolved
app_admin.application_id = application.application_id
JOIN app_fam.fam_application_client client ON
app_admin.application_id = client.application_id
JOIN app_fam.fam_user fam_user ON
app_admin.user_id = fam_user.user_id
WHERE
fam_user.user_guid = {user_guid}
AND fam_user.user_type_code = {user_type_code}
AND client.cognito_client_id = {cognito_client_id};
"""
sql_query_fam_app_admin = sql.SQL(query_fam_app_admin).format(
user_guid=sql.Literal(user_guid),
user_type_code=sql.Literal(user_type_code),
cognito_client_id=sql.Literal(cognito_client_id),
)
cursor.execute(sql_query_fam_app_admin)
for record in cursor:
role_list.append(f"{record[0]}_ADMIN")

event["response"]["claimsOverrideDetails"] = {
"groupOverrideDetails": {
"groupsToOverride": role_list,
Expand Down
28 changes: 28 additions & 0 deletions server/auth_function/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,34 @@ def create_user_role_xref_record(db_pg_transaction, test_user_properties):
),
)

@pytest.fixture(scope="function")
def create_fam_application_admin_record(db_pg_transaction, test_user_properties):
initial_user = test_user_properties
cursor = db_pg_transaction.cursor()
raw_query = """
insert into app_fam.fam_application_admin
(user_id,
application_id,
create_user,
update_user)
VALUES (
(select user_id from app_fam.fam_user where
user_name = %s
and user_type_code = %s),
(select application_id from app_fam.fam_application
where application_name = 'FAM'),
CURRENT_USER,
CURRENT_USER
)
"""
cursor.execute(
raw_query,
(
initial_user.get("idp_username"),
initial_user.get("idp_type_code")
),
)


@pytest.fixture(scope="function")
def initial_user_without_guid_or_cognito_id(db_pg_transaction, cognito_event):
Expand Down
1 change: 1 addition & 0 deletions server/auth_function/test/constant.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
TEST_ROLE_NAME = "EXPECTED"
TEST_ADMIN_ROLE_NAME = "FAM_ADMIN"
4 changes: 3 additions & 1 deletion server/auth_function/test/lamda_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pytest
from psycopg2 import sql
from constant import TEST_ROLE_NAME
from constant import TEST_ROLE_NAME, TEST_ADMIN_ROLE_NAME

modulePath = os.path.join(os.path.dirname(__file__), "..")
sys.path.append(modulePath)
Expand Down Expand Up @@ -143,6 +143,7 @@ def test_direct_role_assignment(
create_test_fam_role,
create_test_fam_cognito_client,
create_user_role_xref_record,
create_fam_application_admin_record
):
"""role doesn't have childreen (ie no forest client roles associated
and the user is getting assigned directly to the role"""
Expand All @@ -157,6 +158,7 @@ def test_direct_role_assignment(
]["groupsToOverride"]
LOGGER.debug(f"override groups: {override_groups}")
assert TEST_ROLE_NAME in override_groups
assert TEST_ADMIN_ROLE_NAME in override_groups


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE UNIQUE INDEX fam_usr_app_admin_uk ON app_fam.fam_application_admin(user_id, application_id);
MCatherine1994 marked this conversation as resolved.
Show resolved Hide resolved

-- migrate the fam roles to fam admin management table
-- first select all fam roles
-- and then select role_id and application_id this role is admin of, based on if the role name contains the application_name
-- -- for example, role name: FAM_ACCESS_ADMIN contains application name: FAM
-- -- role name: FOM_DEV_ACCESS_ADMIN contains application name: FOM_DEV
-- select the user_id and application_id the user is admin of, insert into fam_application_admin
INSERT INTO app_fam.fam_application_admin (user_id, application_id, create_user, create_date)
SELECT user_role_xref.user_id, application.application_id, CURRENT_USER, CURRENT_DATE
FROM app_fam.fam_role role
JOIN app_fam.fam_application application
ON role.role_name LIKE '%' || application.application_name || '%'
ianliuwk1019 marked this conversation as resolved.
Show resolved Hide resolved
JOIN app_fam.fam_user_role_xref user_role_xref
ON role.role_id = user_role_xref.role_id
WHERE role.application_id=1;
ianliuwk1019 marked this conversation as resolved.
Show resolved Hide resolved