Skip to content

Commit

Permalink
chore: refactor out in-line import
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Selwyn-Smith <[email protected]>
  • Loading branch information
benmss committed Aug 19, 2024
1 parent 2d3a64a commit d431cab
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 261 deletions.
24 changes: 23 additions & 1 deletion src/macaron/repo_finder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""This package contains the dependency resolvers for Java projects."""


def to_domain_from_known_purl_types(purl_type: str) -> str | None:
"""Return the git service domain from a known web-based purl type.
This method is used to handle cases where the purl type value is not the git domain but a pre-defined
repo-based type in https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst.
Note that this method will be updated when there are new pre-defined types as per the PURL specification.
Parameters
----------
purl_type : str
The type field of the PURL.
Returns
-------
str | None
The git service domain corresponding to the purl type or None if the purl type is unknown.
"""
known_types = {"github": "github.com", "bitbucket": "bitbucket.org"}
return known_types.get(purl_type, None)
3 changes: 1 addition & 2 deletions src/macaron/repo_finder/commit_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from packageurl import PackageURL
from pydriller import Commit, Git

from macaron.repo_finder import repo_finder_deps_dev
from macaron.repo_finder.repo_finder import to_domain_from_known_purl_types
from macaron.repo_finder import repo_finder_deps_dev, to_domain_from_known_purl_types
from macaron.slsa_analyzer.git_service import GIT_SERVICES

logger: logging.Logger = logging.getLogger(__name__)
Expand Down
2 changes: 1 addition & 1 deletion src/macaron/repo_finder/provenance_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@

from macaron.errors import ProvenanceError
from macaron.json_tools import JsonType, json_extract
from macaron.repo_finder import to_domain_from_known_purl_types
from macaron.repo_finder.commit_finder import (
AbstractPurlType,
determine_abstract_purl_type,
extract_commit_from_version,
)
from macaron.repo_finder.repo_finder import to_domain_from_known_purl_types
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV1Payload, InTotoV01Payload

logger: logging.Logger = logging.getLogger(__name__)
Expand Down
206 changes: 178 additions & 28 deletions src/macaron/repo_finder/repo_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,30 @@
import os
from urllib.parse import ParseResult, urlunparse

from git import InvalidGitRepositoryError
from packageurl import PackageURL
from pydriller import Git

from macaron.config.defaults import defaults
from macaron.config.global_config import global_config
from macaron.errors import CloneError, RepoCheckOutError
from macaron.repo_finder import to_domain_from_known_purl_types
from macaron.repo_finder.commit_finder import find_commit
from macaron.repo_finder.repo_finder_base import BaseRepoFinder
from macaron.repo_finder.repo_finder_deps_dev import DepsDevRepoFinder
from macaron.repo_finder.repo_finder_java import JavaRepoFinder
from macaron.slsa_analyzer.git_service import GIT_SERVICES, BaseGitService
from macaron.slsa_analyzer.git_service.base_git_service import NoneGitService
from macaron.slsa_analyzer.git_url import (
GIT_REPOS_DIR,
check_out_repo_target,
get_remote_origin_of_local_repo,
get_remote_vcs_url,
get_repo_dir_name,
is_empty_repo,
is_remote_repo,
resolve_local_path,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,28 +96,6 @@ def find_repo(purl: PackageURL) -> str:
return repo_finder.find_repo(purl)


def to_domain_from_known_purl_types(purl_type: str) -> str | None:
"""Return the git service domain from a known web-based purl type.
This method is used to handle cases where the purl type value is not the git domain but a pre-defined
repo-based type in https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst.
Note that this method will be updated when there are new pre-defined types as per the PURL specification.
Parameters
----------
purl_type : str
The type field of the PURL.
Returns
-------
str | None
The git service domain corresponding to the purl type or None if the purl type is unknown.
"""
known_types = {"github": "github.com", "bitbucket": "bitbucket.org"}
return known_types.get(purl_type, None)


def to_repo_path(purl: PackageURL, available_domains: list[str]) -> str | None:
"""Return the repository path from the PURL string.
Expand Down Expand Up @@ -189,12 +184,12 @@ def find_source(purl_string: str, repo: str | None) -> bool:

# Prepare the repo.
logger.debug("Preparing repo: %s", found_repo)
# Importing here to avoid cyclic import problem.
from macaron.slsa_analyzer.analyzer import Analyzer # pylint: disable=import-outside-toplevel, cyclic-import

analyzer = Analyzer(global_config.output_path, global_config.build_log_path)
repo_dir = os.path.join(analyzer.output_path, analyzer.GIT_REPOS_DIR)
git_obj = analyzer.prepare_repo(repo_dir, found_repo, "", "", purl)
repo_dir = os.path.join(global_config.output_path, GIT_REPOS_DIR)
git_obj = prepare_repo(
repo_dir,
found_repo,
purl=purl,
)

if not git_obj:
# TODO expand this message to cover cases where the obj was not created due to lack of correct tag.
Expand All @@ -218,3 +213,158 @@ def find_source(purl_string: str, repo: str | None) -> bool:
logger.info("%s/commit/%s", found_repo, digest)

return True


def prepare_repo(
target_dir: str,
repo_path: str,
branch_name: str = "",
digest: str = "",
purl: PackageURL | None = None,
) -> Git | None:
"""Prepare the target repository for analysis.
If ``repo_path`` is a remote path, the target repo is cloned to ``{target_dir}/{unique_path}``.
The ``unique_path`` of a repository will depend on its remote url.
For example, if given the ``repo_path`` https://github.com/org/name.git, it will
be cloned to ``{target_dir}/github_com/org/name``.
If ``repo_path`` is a local path, this method will check if ``repo_path`` resolves to a directory inside
``local_repos_path`` and to a valid git repository.
Parameters
----------
target_dir : str
The directory where all remote repository will be cloned.
repo_path : str
The path to the repository, can be either local or remote.
branch_name : str
The name of the branch we want to checkout.
digest : str
The hash of the commit that we want to checkout in the branch.
purl : PackageURL | None
The PURL of the analysis target.
Returns
-------
Git | None
The pydriller.Git object of the repository or None if error.
"""
# TODO: separate the logic for handling remote and local repos instead of putting them into this method.
logger.info(
"Preparing the repository for the analysis (path=%s, branch=%s, digest=%s)",
repo_path,
branch_name,
digest,
)

resolved_local_path = ""
is_remote = is_remote_repo(repo_path)

if is_remote:
logger.info("The path to repo %s is a remote path.", repo_path)
resolved_remote_path = get_remote_vcs_url(repo_path)
if not resolved_remote_path:
logger.error("The provided path to repo %s is not a valid remote path.", repo_path)
return None

git_service = get_git_service(resolved_remote_path)
repo_unique_path = get_repo_dir_name(resolved_remote_path)
resolved_local_path = os.path.join(target_dir, repo_unique_path)
logger.info("Cloning the repository.")
try:
git_service.clone_repo(resolved_local_path, resolved_remote_path)
except CloneError as error:
logger.error("Cannot clone %s: %s", resolved_remote_path, str(error))
return None
else:
logger.info("Checking if the path to repo %s is a local path.", repo_path)
resolved_local_path = resolve_local_path(get_local_repos_path(), repo_path)

if resolved_local_path:
try:
git_obj = Git(resolved_local_path)
except InvalidGitRepositoryError:
logger.error("No git repo exists at %s.", resolved_local_path)
return None
else:
logger.error("Error happened while preparing the repo.")
return None

if is_empty_repo(git_obj):
logger.error("The target repository does not have any commit.")
return None

# Find the digest and branch if a version has been specified
if not digest and purl and purl.version:
found_digest = find_commit(git_obj, purl)
if not found_digest:
logger.error("Could not map the input purl string to a specific commit in the corresponding repository.")
return None
digest = found_digest

# Checking out the specific branch or commit. This operation varies depends on the git service that the
# repository uses.
if not is_remote:
# If the repo path provided by the user is a local path, we need to get the actual origin remote URL of
# the repo to decide on the suitable git service.
origin_remote_url = get_remote_origin_of_local_repo(git_obj)
if is_remote_repo(origin_remote_url):
# The local repo's origin remote url is a remote URL (e.g https://host.com/a/b): In this case, we obtain
# the corresponding git service using ``self.get_git_service``.
git_service = get_git_service(origin_remote_url)
else:
# The local repo's origin remote url is a local path (e.g /path/to/local/...). This happens when the
# target repository is a clone from another local repo or is a clone from a git archive -
# https://git-scm.com/docs/git-archive: In this case, we fall-back to the generic function
# ``git_url.check_out_repo_target``.
if not check_out_repo_target(git_obj, branch_name, digest, not is_remote):
logger.error("Cannot checkout the specific branch or commit of the target repo.")
return None

return git_obj

try:
git_service.check_out_repo(git_obj, branch_name, digest, not is_remote)
except RepoCheckOutError as error:
logger.error("Failed to check out repository at %s", resolved_local_path)
logger.error(error)
return None

return git_obj


def get_local_repos_path() -> str:
"""Get the local repos path from global config or use default.
If the directory does not exist, it is created.
"""
local_repos_path = (
global_config.local_repos_path
if global_config.local_repos_path
else os.path.join(global_config.output_path, GIT_REPOS_DIR, "local_repos")
)
if not os.path.exists(local_repos_path):
os.makedirs(local_repos_path, exist_ok=True)
return local_repos_path


def get_git_service(remote_path: str | None) -> BaseGitService:
"""Return the git service used from the remote path.
Parameters
----------
remote_path : str | None
The remote path of the repo.
Returns
-------
BaseGitService
The git service derived from the remote path.
"""
if remote_path:
for git_service in GIT_SERVICES:
if git_service.is_detected(remote_path):
return git_service

return NoneGitService()
Loading

0 comments on commit d431cab

Please sign in to comment.