diff --git a/cartography/intel/github/teams.py b/cartography/intel/github/teams.py index 6afebeb824..0f5adc9886 100644 --- a/cartography/intel/github/teams.py +++ b/cartography/intel/github/teams.py @@ -1,4 +1,6 @@ import logging +from collections import namedtuple +from time import sleep from typing import Any from typing import Dict from typing import List @@ -15,6 +17,8 @@ logger = logging.getLogger(__name__) +RepoPermission = namedtuple('RepoPermission', ['repo_url', 'permission']) + @timeit def get_teams(org: str, api_url: str, token: str) -> Tuple[PaginatedGraphqlData, Dict[str, Any]]: @@ -45,26 +49,53 @@ def get_teams(org: str, api_url: str, token: str) -> Tuple[PaginatedGraphqlData, @timeit def _get_team_repos_for_multiple_teams( - team_raw_data: List[Dict[str, Any]], + team_raw_data: list[dict[str, Any]], org: str, api_url: str, token: str, -) -> Dict[str, Any]: - result = {} +) -> dict[str, list[RepoPermission]]: + result: dict[str, list[RepoPermission]] = {} for team in team_raw_data: team_name = team['slug'] repo_count = team['repositories']['totalCount'] - team_repos = _get_team_repos(org, api_url, token, team_name) if repo_count > 0 else None + if repo_count == 0: + # This team has access to no repos so let's move on + result[team_name] = [] + continue repo_urls = [] repo_permissions = [] - if team_repos: - repo_urls = [t['url'] for t in team_repos.nodes] if team_repos.nodes else [] - repo_permissions = [t['permission'] for t in team_repos.edges] if team_repos.edges else [] + + max_tries = 5 + + for current_try in range(1, max_tries + 1): + team_repos = _get_team_repos(org, api_url, token, team_name) + + try: + # The `or []` is because `.nodes` can be None. See: + # https://docs.github.com/en/graphql/reference/objects#teamrepositoryconnection + for repo in team_repos.nodes or []: + repo_urls.append(repo['url']) + + # The `or []` is because `.edges` can be None. + for edge in team_repos.edges or []: + repo_permissions.append(edge['permission']) + # We're done! Break out of the retry loop. + break + + except TypeError: + # Handles issue #1334 + logger.warning( + f"GitHub returned None when trying to find repo or permission data for team {team_name}.", + exc_info=True, + ) + if current_try == max_tries: + raise RuntimeError(f"GitHub returned a None repo url for team {team_name}, retries exhausted.") + sleep(current_try ** 2) # Shape = [(repo_url, 'WRITE'), ...]] - result[team_name] = list(zip(repo_urls, repo_permissions)) + result[team_name] = [RepoPermission(url, perm) for url, perm in zip(repo_urls, repo_permissions)] return result @@ -114,8 +145,8 @@ def _get_team_repos(org: str, api_url: str, token: str, team: str) -> PaginatedG def transform_teams( team_paginated_data: PaginatedGraphqlData, org_data: Dict[str, Any], - team_repo_data: Dict[str, Any], -) -> List[Dict[str, Any]]: + team_repo_data: dict[str, list[RepoPermission]], +) -> list[dict[str, Any]]: result = [] for team in team_paginated_data.nodes: team_name = team['slug'] diff --git a/tests/unit/cartography/intel/github/test_teams.py b/tests/unit/cartography/intel/github/test_teams.py new file mode 100644 index 0000000000..e00e90e53d --- /dev/null +++ b/tests/unit/cartography/intel/github/test_teams.py @@ -0,0 +1,220 @@ +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + +from cartography.intel.github.teams import _get_team_repos_for_multiple_teams +from cartography.intel.github.teams import RepoPermission +from cartography.intel.github.teams import transform_teams +from cartography.intel.github.util import PaginatedGraphqlData + +TEST_ORG_DATA = { + 'url': 'https://github.com/testorg', + 'login': 'testorg', +} + + +@patch('cartography.intel.github.teams._get_team_repos') +def test_get_team_repos_empty_team_list(mock_get_team_repos): + # Assert that if we pass in empty data then we get back empty data + assert _get_team_repos_for_multiple_teams( + [], + 'test-org', + 'https://api.github.com', + 'test-token', + ) == {} + mock_get_team_repos.assert_not_called() + + +@patch('cartography.intel.github.teams._get_team_repos') +def test_get_team_repos_team_with_no_repos(mock_get_team_repos): + # Arrange + team_data = [{'slug': 'team1', 'repositories': {'totalCount': 0}}] + + # Assert that we retrieve data where a team has no repos + assert _get_team_repos_for_multiple_teams( + team_data, + 'test-org', + 'https://api.github.com', + 'test-token', + ) == {'team1': []} + mock_get_team_repos.assert_not_called() + + +@patch('cartography.intel.github.teams._get_team_repos') +def test_get_team_repos_happy_path(mock_get_team_repos): + # Arrange + team_data = [{'slug': 'team1', 'repositories': {'totalCount': 2}}] + mock_team_repos = MagicMock() + mock_team_repos.nodes = [{'url': 'https://github.com/org/repo1'}, {'url': 'https://github.com/org/repo2'}] + mock_team_repos.edges = [{'permission': 'WRITE'}, {'permission': 'READ'}] + mock_get_team_repos.return_value = mock_team_repos + + # Act + assert that the returned data is correct + assert _get_team_repos_for_multiple_teams( + team_data, + 'test-org', + 'https://api.github.com', + 'test-token', + ) == { + 'team1': [ + RepoPermission('https://github.com/org/repo1', 'WRITE'), + RepoPermission('https://github.com/org/repo2', 'READ'), + ], + } + + # Assert that we did not retry because this was the happy path + mock_get_team_repos.assert_called_once_with('test-org', 'https://api.github.com', 'test-token', 'team1') + + +@patch('cartography.intel.github.teams._get_team_repos') +@patch('cartography.intel.github.teams.sleep') +def test_get_team_repos_github_returns_none(mock_sleep, mock_get_team_repos): + # Arrange + team_data = [{'slug': 'team1', 'repositories': {'totalCount': 1}}] + mock_team_repos = MagicMock() + # Set up the condition where GitHub returns a None url and None edge as in #1334. + mock_team_repos.nodes = [None] + mock_team_repos.edges = [None] + mock_get_team_repos.return_value = mock_team_repos + + # Assert we raise an exception + with pytest.raises(RuntimeError): + _get_team_repos_for_multiple_teams( + team_data, + 'test-org', + 'https://api.github.com', + 'test-token', + ) + + # Assert that we retry and give up + assert mock_get_team_repos.call_count == 5 + assert mock_sleep.call_count == 4 + + +def test_transform_teams_empty_team_data(): + # Arrange + team_paginated_data = PaginatedGraphqlData(nodes=[], edges=[]) + team_repo_data: dict[str, list[RepoPermission]] = {} + + # Act + assert + assert transform_teams(team_paginated_data, TEST_ORG_DATA, team_repo_data) == [] + + +def test_transform_teams_team_with_no_repos(): + # Arrange + team_paginated_data = PaginatedGraphqlData( + nodes=[ + { + 'slug': 'team1', + 'url': 'https://github.com/testorg/team1', + 'description': 'Test Team 1', + 'repositories': {'totalCount': 0}, + }, + ], + edges=[], + ) + team_repo_data = {'team1': []} + + # Act + Assert + assert transform_teams(team_paginated_data, TEST_ORG_DATA, team_repo_data) == [ + { + 'name': 'team1', + 'url': 'https://github.com/testorg/team1', + 'description': 'Test Team 1', + 'repo_count': 0, + 'org_url': 'https://github.com/testorg', + 'org_login': 'testorg', + }, + ] + + +def test_transform_teams_team_with_repos(): + # Arrange + team_paginated_data = PaginatedGraphqlData( + nodes=[ + { + 'slug': 'team1', + 'url': 'https://github.com/testorg/team1', + 'description': 'Test Team 1', + 'repositories': {'totalCount': 2}, + }, + ], + edges=[], + ) + team_repo_data = { + 'team1': [ + RepoPermission('https://github.com/testorg/repo1', 'READ'), + RepoPermission('https://github.com/testorg/repo2', 'WRITE'), + ], + } + + # Act + assert transform_teams(team_paginated_data, TEST_ORG_DATA, team_repo_data) == [ + { + 'name': 'team1', + 'url': 'https://github.com/testorg/team1', + 'description': 'Test Team 1', + 'repo_count': 2, + 'org_url': 'https://github.com/testorg', + 'org_login': 'testorg', + 'READ': 'https://github.com/testorg/repo1', + }, + { + 'name': 'team1', + 'url': 'https://github.com/testorg/team1', + 'description': 'Test Team 1', + 'repo_count': 2, + 'org_url': 'https://github.com/testorg', + 'org_login': 'testorg', + 'WRITE': 'https://github.com/testorg/repo2', + }, + ] + + +def test_transform_teams_multiple_teams(): + # Arrange + team_paginated_data = PaginatedGraphqlData( + nodes=[ + { + 'slug': 'team1', + 'url': 'https://github.com/testorg/team1', + 'description': 'Test Team 1', + 'repositories': {'totalCount': 1}, + }, + { + 'slug': 'team2', + 'url': 'https://github.com/testorg/team2', + 'description': 'Test Team 2', + 'repositories': {'totalCount': 0}, + }, + ], + edges=[], + ) + team_repo_data = { + 'team1': [ + RepoPermission('https://github.com/testorg/repo1', 'ADMIN'), + ], + 'team2': [], + } + + # Act + assert + assert transform_teams(team_paginated_data, TEST_ORG_DATA, team_repo_data) == [ + { + 'name': 'team1', + 'url': 'https://github.com/testorg/team1', + 'description': 'Test Team 1', + 'repo_count': 1, + 'org_url': 'https://github.com/testorg', + 'org_login': 'testorg', + 'ADMIN': 'https://github.com/testorg/repo1', + }, + { + 'name': 'team2', + 'url': 'https://github.com/testorg/team2', + 'description': 'Test Team 2', + 'repo_count': 0, + 'org_url': 'https://github.com/testorg', + 'org_login': 'testorg', + }, + ]