Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor provenance available check #791

Merged
merged 19 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 1 addition & 17 deletions src/macaron/repo_finder/provenance_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,24 +243,18 @@ def _clean_spdx(uri: str) -> str:
return url


def check_if_input_repo_commit_provenance_conflict(
def check_if_input_repo_provenance_conflict(
repo_path_input: str | None,
digest_input: str | None,
provenance_repo_url: str | None,
provenance_commit_digest: str | None,
) -> bool:
"""Test if the input repo and commit match the contents of the provenance.

Parameters
----------
repo_path_input: str | None
The repo URL from input.
digest_input: str | None
The digest from input.
provenance_repo_url: str | None
The repo URL from provenance.
provenance_commit_digest: str | None
The commit digest from provenance.

Returns
-------
Expand All @@ -277,16 +271,6 @@ def check_if_input_repo_commit_provenance_conflict(
)
return True

# Check the provenance commit against the input commit.
if digest_input and provenance_commit_digest and digest_input != provenance_commit_digest:
logger.debug(
"The commit digest from input does not match what exists in the provenance. "
"Input Commit: %s, Provenance Commit: %s.",
digest_input,
provenance_commit_digest,
)
return True

tromai marked this conversation as resolved.
Show resolved Hide resolved
return False


Expand Down
194 changes: 178 additions & 16 deletions src/macaron/repo_finder/provenance_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@
from functools import partial

from packageurl import PackageURL
from pydriller import Git

from macaron.config.defaults import defaults
from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.checks.provenance_available_check import ProvenanceAvailableException
from macaron.slsa_analyzer.ci_service import GitHubActions
from macaron.slsa_analyzer.ci_service.base_ci_service import NoneCIService
from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, JFrogMavenRegistry, NPMRegistry
from macaron.slsa_analyzer.package_registry.npm_registry import NPMAttestationAsset
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload
from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError
from macaron.slsa_analyzer.provenance.loader import load_provenance_payload
from macaron.slsa_analyzer.provenance.slsa import SLSAProvenanceData
from macaron.slsa_analyzer.provenance.witness import is_witness_provenance_payload, load_witness_verifier_config
from macaron.slsa_analyzer.specs.ci_spec import CIInfo

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,6 +55,8 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
list[InTotoPayload]
The provenance payload, or an empty list if not found.
"""
logger.debug("Seeking provenance of: %s", purl)

if determine_abstract_purl_type(purl) == AbstractPurlType.REPOSITORY:
# Do not perform default discovery for repository type targets.
return []
Expand Down Expand Up @@ -331,7 +339,8 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
logger.error(msg)
raise ProvenanceAvailableException(msg)

provenance_filepaths = []
provenances = []
witness_verifier_config = load_witness_verifier_config()
try:
with tempfile.TemporaryDirectory() as temp_dir:
for provenance_asset in provenance_assets:
Expand All @@ -342,28 +351,181 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[
provenance_asset.name,
)
continue
provenance_filepaths.append(provenance_filepath)
except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)

provenances = []
witness_verifier_config = load_witness_verifier_config()

for provenance_filepath in provenance_filepaths:
try:
provenance_payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as error:
logger.error("Error while loading provenance: %s", error)
continue
try:
provenance_payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as load_error:
logger.error("Error while loading provenance: %s", load_error)
continue

if not is_witness_provenance_payload(provenance_payload, witness_verifier_config.predicate_types):
continue
if not is_witness_provenance_payload(provenance_payload, witness_verifier_config.predicate_types):
continue

provenances.append(provenance_payload)
provenances.append(provenance_payload)
except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)

if not provenances:
logger.debug("No payloads found in provenance files.")
return []

# We assume that there is only one provenance per GAV.
return provenances[:1]


def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> InTotoPayload | None:
"""Try to find provenance from CI services of the repository.

Note that we stop going through the CI services once we encounter a CI service
that does host provenance assets.

This method also loads the provenance payloads into the ``CIInfo`` object where
the provenance assets are found.

Parameters
----------
analyze_ctx: AnalyzeContext
The contenxt of the ongoing analysis.
git_obj: Git | None
The Pydriller Git object representing the repository, if any.

Returns
-------
InTotoPayload | None
The provenance payload, or None if not found.
"""
provenance_extensions = defaults.get_list(
"slsa.verifier",
"provenance_extensions",
fallback=["intoto.jsonl"],
)
component = analyze_ctx.component
ci_info_entries = analyze_ctx.dynamic_data["ci_services"]

if not component.repository:
logger.debug("Unable to find a provenance because a repository was not found for %s.", component.purl)
return None

repo_full_name = component.repository.full_name
for ci_info in ci_info_entries:
ci_service = ci_info["service"]

if isinstance(ci_service, NoneCIService):
continue

if isinstance(ci_service, GitHubActions):
# Find the release for the software component version being analyzed.
digest = component.repository.commit_sha
tag = None
if git_obj:
# Use the software component commit to find the tag.
if not digest:
logger.debug("Cannot retrieve asset provenance without commit digest.")
return None
tags = git_obj.repo.tags
for _tag in tags:
try:
tag_commit = str(_tag.commit)
except ValueError as error:
logger.debug("Commit of tag is a blob or tree: %s", error)
continue
if tag_commit and tag_commit == digest:
tag = str(_tag)
break

if not tag:
logger.debug("Could not find the tag matching commit: %s", digest)
return None

# Get the correct release using the tag.
release_payload = ci_service.api_client.get_release_by_tag(repo_full_name, tag)
if not release_payload:
logger.debug("Failed to find release matching tag: %s", tag)
return None

# Store the release data for other checks.
ci_info["release"] = release_payload

# Get the provenance assets.
for prov_ext in provenance_extensions:
provenance_assets = ci_service.api_client.fetch_assets(
release_payload,
ext=prov_ext,
)
if not provenance_assets:
continue

logger.info("Found the following provenance assets:")
for provenance_asset in provenance_assets:
logger.info("* %s", provenance_asset.url)

# Store the provenance assets for other checks.
ci_info["provenance_assets"].extend(provenance_assets)

# Download the provenance assets and load the provenance payloads.
download_provenances_from_github_actions_ci_service(
ci_info,
)

# TODO consider how to handle multiple payloads here.
return ci_info["provenances"][0].payload if ci_info["provenances"] else None

else:
logger.debug("CI service not supported for provenance finding: %s", ci_service.name)

return None


def download_provenances_from_github_actions_ci_service(ci_info: CIInfo) -> None:
"""Download provenances from GitHub Actions.

Parameters
----------
ci_info: CIInfo,
A ``CIInfo`` instance that holds a GitHub Actions git service object.
"""
ci_service = ci_info["service"]
prov_assets = ci_info["provenance_assets"]

try:
with tempfile.TemporaryDirectory() as temp_path:
downloaded_provs = []
for prov_asset in prov_assets:
# Check the size before downloading.
if prov_asset.size_in_bytes > defaults.getint(
"slsa.verifier",
"max_download_size",
fallback=1000000,
):
logger.info(
"Skip verifying the provenance %s: asset size too large.",
prov_asset.name,
)
continue

provenance_filepath = os.path.join(temp_path, prov_asset.name)

if not ci_service.api_client.download_asset(
prov_asset.url,
provenance_filepath,
):
logger.debug(
"Could not download the provenance %s. Skip verifying...",
prov_asset.name,
)
continue

# Read the provenance.
try:
payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as error:
logger.error("Error logging provenance: %s", error)
continue

# Add the provenance file.
downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset))

# Persist the provenance payloads into the CIInfo object.
ci_info["provenances"] = downloaded_provs
except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)
2 changes: 1 addition & 1 deletion src/macaron/slsa_analyzer/analyze_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def provenances(self) -> dict[str, list[InTotoV01Statement | InTotoV1Statement]]
result: dict[str, list[InTotoV01Statement | InTotoV1Statement]] = defaultdict(list)
for ci_info in ci_services:
result[ci_info["service"].name].extend(
prov_asset.payload.statement for prov_asset in ci_info["provenances"]
provenance.payload.statement for provenance in ci_info["provenances"]
)
package_registry_entries = self.dynamic_data["package_registries"]
for package_registry_entry in package_registry_entries:
Expand Down
Loading
Loading