diff --git a/data/model/team.py b/data/model/team.py index a464741c7b..0afa492487 100644 --- a/data/model/team.py +++ b/data/model/team.py @@ -65,13 +65,13 @@ def create_team(name, org_obj, team_role_name, description=""): def add_user_to_team(user_obj, team): - try: - return TeamMember.create(user=user_obj, team=team) - except Exception: + if user_exists_in_team(user_obj, team): raise UserAlreadyInTeam( "User %s is already a member of team %s" % (user_obj.username, team.name) ) + return TeamMember.create(user=user_obj, team=team) + def remove_user_from_team(org_name, team_name, username, removed_by_username): Org = User.alias() @@ -608,14 +608,18 @@ def get_oidc_team_from_groupname(group_name, login_service_name): Fetch TeamSync row synced with login_service_name from `group_name` in TeamSync.config """ response = [] - with db_transaction(): - query_result = ( - TeamSync.select() - .join(LoginService) - .where(TeamSync.config.contains(group_name), LoginService.name == login_service_name) - ) - for row in query_result: - if json.loads(row.config).get("group_name", None) == group_name: - response.append(row) + query_result = ( + TeamSync.select() + .join(LoginService) + .where(TeamSync.config.contains(group_name), LoginService.name == login_service_name) + ) + + for row in query_result: + if json.loads(row.config).get("group_name", None) == group_name: + response.append(row) return response + + +def user_exists_in_team(user_obj, team): + return TeamMember.select().where(TeamMember.user == user_obj, TeamMember.team == team).exists() diff --git a/data/model/test/test_team.py b/data/model/test/test_team.py index cffc1db52d..cca002bc70 100644 --- a/data/model/test/test_team.py +++ b/data/model/test/test_team.py @@ -1,7 +1,9 @@ +import json + import pytest from data.database import TeamMember -from data.model import DataModelException +from data.model import DataModelException, UserAlreadyInTeam from data.model.organization import create_organization from data.model.team import ( __get_user_admin_teams, @@ -11,10 +13,12 @@ create_team, delete_all_team_members, get_federated_user_teams, + get_oidc_team_from_groupname, list_team_users, remove_team, remove_user_from_team, set_team_syncing, + user_exists_in_team, validate_team_name, ) from data.model.user import create_user_noverify, get_user @@ -166,3 +170,38 @@ def test_get_federated_user_teams(login_service_name, initialized_db): assert len(user_teams) == 2 elif login_service_name == "ldap": assert len(user_teams) == 1 + + +def test_user_exists_in_team(initialized_db): + dev_user = get_user("devtable") + new_org = create_organization("testorg", "testorg" + "@example.com", dev_user) + + team_1 = create_team("team_1", new_org, "member") + assert add_user_to_team(dev_user, team_1) + assert user_exists_in_team(dev_user, team_1) is True + + # add user to team already part of + with pytest.raises(UserAlreadyInTeam): + add_user_to_team(dev_user, team_1) + + team_2 = create_team("team_2", new_org, "member") + assert user_exists_in_team(dev_user, team_2) is False + + +def test_get_oidc_team_from_groupname(initialized_db): + dev_user = get_user("devtable") + new_org = create_organization("testorg", "testorg" + "@example.com", dev_user) + + team_1 = create_team("team_1", new_org, "member") + assert add_user_to_team(dev_user, team_1) + assert set_team_syncing(team_1, "oidc", {"group_name": "grp1"}) + response = get_oidc_team_from_groupname(group_name="grp1", login_service_name="oidc") + assert len(response) == 1 + assert response[0].team.name == "team_1" + assert json.loads(response[0].config).get("group_name") == "grp1" + + response = get_oidc_team_from_groupname(group_name="team_1", login_service_name="ldap") + assert len(response) == 0 + + response = get_oidc_team_from_groupname(group_name="team_1", login_service_name="ldap") + assert len(response) == 0 diff --git a/test/test_external_oidc.py b/test/test_external_oidc.py index d035c51519..bcf744b80f 100644 --- a/test/test_external_oidc.py +++ b/test/test_external_oidc.py @@ -179,6 +179,12 @@ def test_sync_for_non_empty_oidc_groups(self): assert user_teams_before_sync + 2 == user_teams_after_sync + # attempt to sync already synced groups + self.oidc_instance.sync_oidc_groups(user_groups, user_obj) + user_teams_after_sync = TeamMember.select().where(TeamMember.user == user_obj).count() + + assert user_teams_before_sync + 2 == user_teams_after_sync + def test_resync_for_empty_quay_teams(self): user_obj = model.user.get_user("devtable")