Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor provenance available check #791

Merged
merged 19 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions src/macaron/repo_finder/provenance_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@
from functools import partial

from packageurl import PackageURL
from pydriller import Git

from macaron.config.defaults import defaults
from macaron.repo_finder.commit_finder import AbstractPurlType, determine_abstract_purl_type
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.checks.provenance_available_check import ProvenanceAvailableException
from macaron.slsa_analyzer.ci_service import GitHubActions
from macaron.slsa_analyzer.ci_service.base_ci_service import NoneCIService
from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, JFrogMavenRegistry, NPMRegistry
from macaron.slsa_analyzer.package_registry.npm_registry import NPMAttestationAsset
from macaron.slsa_analyzer.provenance.intoto import InTotoPayload
from macaron.slsa_analyzer.provenance.intoto.errors import LoadIntotoAttestationError
from macaron.slsa_analyzer.provenance.loader import load_provenance_payload
from macaron.slsa_analyzer.provenance.slsa import SLSAProvenanceData
from macaron.slsa_analyzer.provenance.witness import is_witness_provenance_payload, load_witness_verifier_config
from macaron.slsa_analyzer.specs.ci_spec import CIInfo

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -49,6 +55,8 @@ def find_provenance(self, purl: PackageURL) -> list[InTotoPayload]:
list[InTotoPayload]
The provenance payload, or an empty list if not found.
"""
logger.debug("Seeking provenance of: %s", purl)

if determine_abstract_purl_type(purl) == AbstractPurlType.REPOSITORY:
# Do not perform default discovery for repository type targets.
return []
Expand Down Expand Up @@ -367,3 +375,154 @@ def find_gav_provenance(purl: PackageURL, registry: JFrogMavenRegistry) -> list[

# We assume that there is only one provenance per GAV.
return provenances[:1]


def find_provenance_from_ci(analyze_ctx: AnalyzeContext, git_obj: Git | None) -> InTotoPayload | None:
"""Try to find provenance from CI services of the repository.

Note that we stop going through the CI services once we encounter a CI service
that does host provenance assets.

This method also loads the provenance payloads into the ``CIInfo`` object where
the provenance assets are found.

Parameters
----------
analyze_ctx: AnalyzeContext
The contenxt of the ongoing analysis.
git_obj: Git | None
The Pydriller Git object representing the repository, if any.

Returns
-------
InTotoPayload | None
The provenance payload, or None if not found.
"""
provenance_extensions = defaults.get_list(
"slsa.verifier",
"provenance_extensions",
fallback=["intoto.jsonl"],
)
component = analyze_ctx.component
ci_info_entries = analyze_ctx.dynamic_data["ci_services"]

if not component.repository:
logger.debug("Unable to find a provenance because a repository was not found for %s.", component.purl)
return None

repo_full_name = component.repository.full_name
for ci_info in ci_info_entries:
ci_service = ci_info["service"]

if isinstance(ci_service, NoneCIService):
continue

if isinstance(ci_service, GitHubActions):
# Find the release for the software component version being analyzed.

digest = component.repository.commit_sha
tag = None
if git_obj:
# Use the software component commit to find the tag.
if not digest:
logger.debug("Cannot retrieve asset provenance without commit digest.")
return None
tags = git_obj.repo.tags
for _tag in tags:
if _tag.commit and str(_tag.commit) == digest:
tromai marked this conversation as resolved.
Show resolved Hide resolved
tag = str(_tag)
break

if not tag:
logger.debug("Could not find the tag matching commit: %s", digest)
return None

# Get the correct release using the tag.
release_payload = ci_service.api_client.get_release_by_tag(repo_full_name, tag)
if not release_payload:
logger.debug("Failed to find release matching tag: %s", tag)
return None

# Store the release data for other checks.
ci_info["release"] = release_payload

# Get the provenance assets.
for prov_ext in provenance_extensions:
provenance_assets = ci_service.api_client.fetch_assets(
release_payload,
ext=prov_ext,
)
if not provenance_assets:
continue

logger.info("Found the following provenance assets:")
for provenance_asset in provenance_assets:
logger.info("* %s", provenance_asset.url)

# Store the provenance assets for other checks.
ci_info["provenance_assets"].extend(provenance_assets)

# Download the provenance assets and load the provenance payloads.
download_provenances_from_github_actions_ci_service(
ci_info,
)

# TODO consider how to handle multiple payloads here.
return ci_info["provenances"][0].payload if ci_info["provenances"] else None

return None


def download_provenances_from_github_actions_ci_service(ci_info: CIInfo) -> None:
"""Download provenances from GitHub Actions.

Parameters
----------
ci_info: CIInfo,
A ``CIInfo`` instance that holds a GitHub Actions git service object.
"""
ci_service = ci_info["service"]
prov_assets = ci_info["provenance_assets"]

try:
with tempfile.TemporaryDirectory() as temp_path:
downloaded_provs = []
for prov_asset in prov_assets:
# Check the size before downloading.
if prov_asset.size_in_bytes > defaults.getint(
"slsa.verifier",
"max_download_size",
fallback=1000000,
):
logger.info(
"Skip verifying the provenance %s: asset size too large.",
prov_asset.name,
)
continue

provenance_filepath = os.path.join(temp_path, prov_asset.name)

if not ci_service.api_client.download_asset(
prov_asset.url,
provenance_filepath,
):
logger.debug(
"Could not download the provenance %s. Skip verifying...",
prov_asset.name,
)
continue

# Read the provenance.
try:
payload = load_provenance_payload(provenance_filepath)
except LoadIntotoAttestationError as error:
logger.error("Error logging provenance: %s", error)
continue

# Add the provenance file.
downloaded_provs.append(SLSAProvenanceData(payload=payload, asset=prov_asset))

# Persist the provenance payloads into the CIInfo object.
ci_info["provenances"] = downloaded_provs
except OSError as error:
logger.error("Error while storing provenance in the temporary directory: %s", error)
2 changes: 1 addition & 1 deletion src/macaron/slsa_analyzer/analyze_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def provenances(self) -> dict[str, list[InTotoV01Statement | InTotoV1Statement]]
result: dict[str, list[InTotoV01Statement | InTotoV1Statement]] = defaultdict(list)
for ci_info in ci_services:
result[ci_info["service"].name].extend(
prov_asset.payload.statement for prov_asset in ci_info["provenances"]
provenance.payload.statement for provenance in ci_info["provenances"]
)
package_registry_entries = self.dynamic_data["package_registries"]
for package_registry_entry in package_registry_entries:
Expand Down
143 changes: 137 additions & 6 deletions src/macaron/slsa_analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
check_if_input_repo_commit_provenance_conflict,
extract_repo_and_commit_from_provenance,
)
from macaron.repo_finder.provenance_finder import ProvenanceFinder
from macaron.repo_finder.provenance_finder import ProvenanceFinder, find_provenance_from_ci
from macaron.slsa_analyzer import git_url
from macaron.slsa_analyzer.analyze_context import AnalyzeContext
from macaron.slsa_analyzer.asset import VirtualReleaseAsset
Expand Down Expand Up @@ -323,7 +323,7 @@ def run_single(
)

provenance_is_verified = False
if not provenance_payload and parsed_purl and not config.get_value("path"):
if not provenance_payload and parsed_purl:
tromai marked this conversation as resolved.
Show resolved Hide resolved
# Try to find the provenance file for the parsed PURL.
provenance_finder = ProvenanceFinder()
provenances = provenance_finder.find_provenance(parsed_purl)
Expand All @@ -350,7 +350,7 @@ def run_single(
):
return Record(
record_id=repo_id,
description="Input mismatch between repo/commit and provenance.",
description="Input mismatch between repo and provenance.",
pre_config=config,
status=SCMStatus.ANALYSIS_FAILED,
)
Expand Down Expand Up @@ -433,6 +433,41 @@ def run_single(
analyze_ctx.dynamic_data["expectation"] = self.expectations.get_expectation_for_target(
analyze_ctx.component.purl.split("@")[0]
)

git_service = self._determine_git_service(analyze_ctx)
self._determine_ci_services(analyze_ctx, git_service)
self._determine_build_tools(analyze_ctx, git_service)
self._determine_package_registries(analyze_ctx)
tromai marked this conversation as resolved.
Show resolved Hide resolved

if not provenance_payload:
# Look for provenance using the CI.
provenance_payload = find_provenance_from_ci(analyze_ctx, git_obj)
# If found, verify analysis target against new provenance
if provenance_payload:
# If repository URL was not provided as input, check the one found during analysis.
if not repo_path_input and component.repository:
repo_path_input = component.repository.remote_path

# Extract the digest and repository URL from provenance.
provenance_repo_url = provenance_commit_digest = None
try:
provenance_repo_url, provenance_commit_digest = extract_repo_and_commit_from_provenance(
provenance_payload
)
except ProvenanceError as error:
logger.debug("Failed to extract repo or commit from provenance: %s", error)

# Try to validate the input repo and/or commit against provenance contents.
if (provenance_repo_url or provenance_commit_digest) and check_if_input_repo_commit_provenance_conflict(
repo_path_input, digest_input, provenance_repo_url, provenance_commit_digest
):
return Record(
record_id=repo_id,
description="Input mismatch between repo/commit and provenance.",
pre_config=config,
status=SCMStatus.ANALYSIS_FAILED,
)

analyze_ctx.dynamic_data["provenance"] = provenance_payload
if provenance_payload:
analyze_ctx.dynamic_data["is_inferred_prov"] = False
Expand Down Expand Up @@ -986,6 +1021,103 @@ def _resolve_local_path(start_dir: str, local_path: str) -> str:
logger.error(error)
return ""

def _determine_git_service(self, analyze_ctx: AnalyzeContext) -> BaseGitService:
"""Determine the Git service used by the software component."""
remote_path = analyze_ctx.component.repository.remote_path if analyze_ctx.component.repository else None
git_service = self.get_git_service(remote_path)

if isinstance(git_service, NoneGitService):
logger.info("Unable to find repository or unsupported git service for %s", analyze_ctx.component.purl)
else:
logger.info(
"Detected git service %s for %s.", git_service.name, analyze_ctx.component.repository.complete_name
)
analyze_ctx.dynamic_data["git_service"] = git_service

return git_service

def _determine_build_tools(self, analyze_ctx: AnalyzeContext, git_service: BaseGitService) -> None:
"""Determine the build tools that match the software component's PURL type."""
for build_tool in BUILD_TOOLS:
build_tool.load_defaults()
if build_tool.purl_type == analyze_ctx.component.type:
logger.debug(
"Found %s build tool based on the %s PackageURL.", build_tool.name, analyze_ctx.component.purl
)
analyze_ctx.dynamic_data["build_spec"]["purl_tools"].append(build_tool)

if isinstance(git_service, NoneGitService):
continue

if not analyze_ctx.component.repository:
continue

logger.info(
"Checking if the repo %s uses build tool %s",
analyze_ctx.component.repository.complete_name,
build_tool.name,
)

if build_tool.is_detected(analyze_ctx.component.repository.fs_path):
logger.info("The repo uses %s build tool.", build_tool.name)
analyze_ctx.dynamic_data["build_spec"]["tools"].append(build_tool)

if not analyze_ctx.dynamic_data["build_spec"]["tools"]:
if analyze_ctx.component.repository:
logger.info(
"Unable to discover any build tools for repository %s or the build tools are not supported.",
analyze_ctx.component.repository.complete_name,
)
else:
logger.info("Unable to discover build tools because repository is None.")

def _determine_ci_services(self, analyze_ctx: AnalyzeContext, git_service: BaseGitService) -> None:
"""Determine the CI services used by the software component."""
if isinstance(git_service, NoneGitService):
return

# Determine the CI services.
for ci_service in CI_SERVICES:
ci_service.load_defaults()
ci_service.set_api_client()

if ci_service.is_detected(
repo_path=analyze_ctx.component.repository.fs_path,
git_service=analyze_ctx.dynamic_data["git_service"],
):
logger.info("The repo uses %s CI service.", ci_service.name)

# Parse configuration files and generate IRs.
# Add the bash commands to the context object to be used by other checks.
callgraph = ci_service.build_call_graph(
analyze_ctx.component.repository.fs_path,
os.path.relpath(analyze_ctx.component.repository.fs_path, analyze_ctx.output_dir),
)
analyze_ctx.dynamic_data["ci_services"].append(
CIInfo(
service=ci_service,
callgraph=callgraph,
provenance_assets=[],
release={},
provenances=[
SLSAProvenanceData(
payload=InTotoV01Payload(statement=Provenance().payload),
asset=VirtualReleaseAsset(name="No_ASSET", url="NO_URL", size_in_bytes=0),
)
],
)
)

def _determine_package_registries(self, analyze_ctx: AnalyzeContext) -> None:
"""Determine the package registries used by the software component based on its build tools."""
build_tools = analyze_ctx.dynamic_data["build_spec"]["tools"]
for package_registry in PACKAGE_REGISTRIES:
for build_tool in build_tools:
if package_registry.is_detected(build_tool):
analyze_ctx.dynamic_data["package_registries"].append(
PackageRegistryInfo(build_tool=build_tool, package_registry=package_registry)
)

def perform_checks(self, analyze_ctx: AnalyzeContext) -> dict[str, CheckResult]:
"""Run the analysis on the target repo and return the results.

Expand Down Expand Up @@ -1060,7 +1192,7 @@ def perform_checks(self, analyze_ctx: AnalyzeContext) -> dict[str, CheckResult]:
service=ci_service,
callgraph=callgraph,
provenance_assets=[],
latest_release={},
release={},
provenances=[
SLSAProvenanceData(
payload=InTotoV01Payload(statement=Provenance().payload),
Expand All @@ -1085,8 +1217,7 @@ def perform_checks(self, analyze_ctx: AnalyzeContext) -> dict[str, CheckResult]:
)
)

results = registry.scan(analyze_ctx)
return results
return registry.scan(analyze_ctx)


class DuplicateCmpError(DuplicateError):
Expand Down
Loading
Loading