diff --git a/pyproject.toml b/pyproject.toml index 1d697f32e..529587b4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ "jinja2 >=3.1.2,<4.0.0", "SQLAlchemy >=2.0.0,<3.0.0", "defusedxml >=0.7.1,<1.0.0", + "problog >=2.2.4,<3.0.0" ] keywords = [] # https://pypi.org/classifiers/ @@ -187,6 +188,7 @@ module = [ "gitdb.*", "yamale.*", "defusedxml.*", + "problog.*" ] ignore_missing_imports = true diff --git a/src/macaron/config/defaults.ini b/src/macaron/config/defaults.ini index 355dfea56..80c818be8 100644 --- a/src/macaron/config/defaults.ini +++ b/src/macaron/config/defaults.ini @@ -237,6 +237,7 @@ publisher = twine flit conda + tox # These are the Python interpreters that may be used to load modules. interpreter = python @@ -250,6 +251,7 @@ build_arg = deploy_arg = publish upload + release [builder.pip.ci.deploy] github_actions = pypa/gh-action-pypi-publish diff --git a/src/macaron/parsers/bashparser.py b/src/macaron/parsers/bashparser.py index f7b03d9f5..7f44d52be 100644 --- a/src/macaron/parsers/bashparser.py +++ b/src/macaron/parsers/bashparser.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2022, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2023, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module is a Python wrapper for the compiled bashparser binary. @@ -33,6 +33,7 @@ class BashCommands(TypedDict): """CI service type.""" commands: list[list[str]] """Parsed bash commands.""" + workflow_info: dict def parse_file(file_path: str, macaron_path: str = "") -> dict: @@ -115,6 +116,7 @@ def extract_bash_from_ci( bash_content: str, ci_file: str, ci_type: str, + workflow_info: dict, macaron_path: str = "", recursive: bool = False, repo_path: str = "", @@ -152,7 +154,9 @@ def extract_bash_from_ci( parsed_parent = parse(bash_content) caller_commands = parsed_parent.get("commands", []) if caller_commands: - yield BashCommands(caller_path=ci_file, CI_path=ci_file, CI_type=ci_type, commands=caller_commands) + yield BashCommands( + caller_path=ci_file, CI_path=ci_file, CI_type=ci_type, commands=caller_commands, workflow_info=workflow_info + ) # Parse the bash script files called from the current script. if recursive and repo_path: @@ -171,4 +175,5 @@ def extract_bash_from_ci( CI_path=ci_file, CI_type=ci_type, commands=callee_commands, + workflow_info=workflow_info, ) diff --git a/src/macaron/slsa_analyzer/build_tool/base_build_tool.py b/src/macaron/slsa_analyzer/build_tool/base_build_tool.py index 2235b8e5d..64e93d76f 100644 --- a/src/macaron/slsa_analyzer/build_tool/base_build_tool.py +++ b/src/macaron/slsa_analyzer/build_tool/base_build_tool.py @@ -79,6 +79,7 @@ def __init__(self, name: str) -> None: } self.build_log: list[str] = [] self.wrapper_files: list[str] = [] + self.project_name: str = "" def __str__(self) -> str: return self.name diff --git a/src/macaron/slsa_analyzer/build_tool/pip.py b/src/macaron/slsa_analyzer/build_tool/pip.py index 4abdbd09b..0c9865338 100644 --- a/src/macaron/slsa_analyzer/build_tool/pip.py +++ b/src/macaron/slsa_analyzer/build_tool/pip.py @@ -7,6 +7,8 @@ """ import logging +import os +import tomllib from macaron.config.defaults import defaults from macaron.dependency_analyzer import DependencyAnalyzer, NoneDependencyAnalyzer @@ -49,6 +51,23 @@ def is_detected(self, repo_path: str) -> bool: """ for file in self.build_configs: if file_exists(repo_path, file): + # Find project name value from the config file. + # TODO: improve this approach, support setup.py + file_path = os.path.join(repo_path, file) + if file == "pyproject.toml": + try: + with open(file_path, "rb") as toml_file: + try: + data = tomllib.load(toml_file) + project = data.get("project", {}) + if project: + # Store the project name + self.project_name = project.get("name", "") + logger.info("Package name: %s", self.project_name) + except tomllib.TOMLDecodeError: + logger.debug("Failed to read the %s file: invalid toml file.", file) + except FileNotFoundError: + logger.debug("Failed to read the %s file.", file) return True return False diff --git a/src/macaron/slsa_analyzer/build_tool/poetry.py b/src/macaron/slsa_analyzer/build_tool/poetry.py index c101a368d..8177af1fa 100644 --- a/src/macaron/slsa_analyzer/build_tool/poetry.py +++ b/src/macaron/slsa_analyzer/build_tool/poetry.py @@ -64,8 +64,9 @@ def is_detected(self, repo_path: str) -> bool: if files_detected: # If a package_lock file exists, and a config file is present, Poetry build tool is detected. + # TODO: package_lock_exists check removed for now so poetry # tool name is stored. if package_lock_exists: - return True + logger.info("Lock file found.") # return True # TODO: this implementation assumes one build type, so when multiple build types are supported, this # needs to be updated. # Take the highest level file, if there are two at the same level, take the first in the list. @@ -76,16 +77,17 @@ def is_detected(self, repo_path: str) -> bool: try: data = tomllib.load(toml_file) # Check for the existence of a [tool.poetry] section. - if ("tool" in data) and ("poetry" in data["tool"]): + poetry_tool = data.get("tool", {}).get("poetry", {}) + if poetry_tool: + # Store the project name + self.project_name = poetry_tool.get("name") return True except tomllib.TOMLDecodeError: logger.error("Failed to read the %s file: invalid toml file.", conf) - return False - return False except FileNotFoundError: logger.error("Failed to read the %s file.", conf) - return False - + if package_lock_exists: + return True return False def prepare_config_files(self, wrapper_path: str, build_dir: str) -> bool: diff --git a/src/macaron/slsa_analyzer/checks/base_check.py b/src/macaron/slsa_analyzer/checks/base_check.py index 432a61158..3a43cf635 100644 --- a/src/macaron/slsa_analyzer/checks/base_check.py +++ b/src/macaron/slsa_analyzer/checks/base_check.py @@ -86,6 +86,7 @@ def run(self, target: AnalyzeContext, skipped_info: Optional[SkippedInfo] = None justification=[], result_type=CheckResultType.SKIPPED, result_tables=[], + confidence_score=0.0, ) if skipped_info: diff --git a/src/macaron/slsa_analyzer/checks/build_as_code_check.py b/src/macaron/slsa_analyzer/checks/build_as_code_check.py index e25768049..40d5b7401 100644 --- a/src/macaron/slsa_analyzer/checks/build_as_code_check.py +++ b/src/macaron/slsa_analyzer/checks/build_as_code_check.py @@ -4,24 +4,21 @@ """This module contains the BuildAsCodeCheck class.""" import logging -import os +from problog import get_evaluatable +from problog.program import PrologString, Term from sqlalchemy.orm import Mapped, mapped_column -from sqlalchemy.sql.sqltypes import String +from sqlalchemy.sql.sqltypes import Float, String -from macaron.config.defaults import defaults from macaron.database.database_manager import ORMBase from macaron.database.table_definitions import CheckFactsTable from macaron.slsa_analyzer.analyze_context import AnalyzeContext -from macaron.slsa_analyzer.build_tool.base_build_tool import BaseBuildTool, NoneBuildTool +from macaron.slsa_analyzer.build_tool.base_build_tool import NoneBuildTool +from macaron.slsa_analyzer.checks import build_as_code_subchecks from macaron.slsa_analyzer.checks.base_check import BaseCheck +from macaron.slsa_analyzer.checks.build_as_code_subchecks import BuildAsCodeSubchecks, DeploySubcheckResults from macaron.slsa_analyzer.checks.check_result import CheckResult, CheckResultType from macaron.slsa_analyzer.ci_service.base_ci_service import NoneCIService -from macaron.slsa_analyzer.ci_service.circleci import CircleCI -from macaron.slsa_analyzer.ci_service.github_actions import GHWorkflowType -from macaron.slsa_analyzer.ci_service.gitlab_ci import GitLabCI -from macaron.slsa_analyzer.ci_service.jenkins import Jenkins -from macaron.slsa_analyzer.ci_service.travis import Travis from macaron.slsa_analyzer.registry import registry from macaron.slsa_analyzer.slsa_req import ReqName @@ -37,6 +34,8 @@ class BuildAsCodeTable(CheckFactsTable, ORMBase): build_trigger: Mapped[str] = mapped_column(String, nullable=True) deploy_command: Mapped[str] = mapped_column(String, nullable=True) build_status_url: Mapped[str] = mapped_column(String, nullable=True) + confidence_score: Mapped[float] = mapped_column(Float, nullable=True) + evidence: Mapped[str] = mapped_column(String, nullable=True) class BuildAsCodeCheck(BaseCheck): @@ -56,6 +55,8 @@ def __init__(self) -> None: ("mcn_trusted_builder_level_three_1", CheckResultType.FAILED), ] eval_reqs = [ReqName.BUILD_AS_CODE] + self.confidence_score_threshold = 0.7 + super().__init__( check_id="mcn_build_as_code_1", description=description, @@ -64,50 +65,6 @@ def __init__(self) -> None: result_on_skip=CheckResultType.PASSED, ) - def _has_deploy_command(self, commands: list[list[str]], build_tool: BaseBuildTool) -> str: - """Check if the bash command is a build and deploy command.""" - # Account for Python projects having separate tools for packaging and publishing. - deploy_tool = build_tool.publisher if build_tool.publisher else build_tool.builder - for com in commands: - - # Check for empty or invalid commands. - if not com or not com[0]: - continue - # The first argument in a bash command is the program name. - # So first check that the program name is a supported build tool name. - # We need to handle cases where the first argument is a path to the program. - cmd_program_name = os.path.basename(com[0]) - if not cmd_program_name: - logger.debug("Found invalid program name %s.", com[0]) - continue - - check_build_commands = any(build_cmd for build_cmd in deploy_tool if build_cmd == cmd_program_name) - - # Support the use of interpreters like Python that load modules, i.e., 'python -m pip install'. - check_module_build_commands = any( - interpreter == cmd_program_name - and com[1] - and com[1] in build_tool.interpreter_flag - and com[2] - and com[2] in deploy_tool - for interpreter in build_tool.interpreter - ) - prog_name_index = 2 if check_module_build_commands else 0 - - if check_build_commands or check_module_build_commands: - # Check the arguments in the bash command for the deploy goals. - # If there are no deploy args for this build tool, accept as deploy command. - if not build_tool.deploy_arg: - logger.info("No deploy arguments required. Accept %s as deploy command.", str(com)) - return str(com) - - for word in com[(prog_name_index + 1) :]: - # TODO: allow plugin versions in arguments, e.g., maven-plugin:1.6.8:deploy. - if word in build_tool.deploy_arg: - logger.info("Found deploy command %s.", str(com)) - return str(com) - return "" - def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResultType: """Implement the check in this method. @@ -130,164 +87,121 @@ def run_check(self, ctx: AnalyzeContext, check_result: CheckResult) -> CheckResu # Checking if a build tool is discovered for this repo. if build_tool and not isinstance(build_tool, NoneBuildTool): for ci_info in ci_services: + confidence_score = 0.0 ci_service = ci_info["service"] # Checking if a CI service is discovered for this repo. if isinstance(ci_service, NoneCIService): continue - trusted_deploy_actions = defaults.get_list("builder.pip.ci.deploy", "github_actions", fallback=[]) - - # Check for use of a trusted Github Actions workflow to publish/deploy. - # TODO: verify that deployment is legitimate and not a test - if trusted_deploy_actions: - for callee in ci_info["callgraph"].bfs(): - workflow_name = callee.name.split("@")[0] - - if not workflow_name or callee.node_type not in [ - GHWorkflowType.EXTERNAL, - GHWorkflowType.REUSABLE, - ]: - logger.debug("Workflow %s is not relevant. Skipping...", callee.name) - continue - if workflow_name in trusted_deploy_actions: - trigger_link = ci_service.api_client.get_file_link( - ctx.repo_full_name, - ctx.commit_sha, - ci_service.api_client.get_relative_path_of_workflow( - os.path.basename(callee.caller_path) - ), - ) - deploy_action_source_link = ci_service.api_client.get_file_link( - ctx.repo_full_name, ctx.commit_sha, callee.caller_path - ) - - html_url = ci_service.has_latest_run_passed( - ctx.repo_full_name, - ctx.branch_name, - ctx.commit_sha, - ctx.commit_date, - os.path.basename(callee.caller_path), - ) - - # TODO: include in the justification multiple cases of external action usage - justification: list[str | dict[str, str]] = [ - { - f"The target repository uses build tool {build_tool.name}" - " to deploy": deploy_action_source_link, - "The build is triggered by": trigger_link, - }, - f"Deploy action: {workflow_name}", - {"The status of the build can be seen at": html_url} - if html_url - else "However, could not find a passing workflow run.", - ] - check_result["justification"].extend(justification) - if ctx.dynamic_data["is_inferred_prov"] and ci_info["provenances"]: - predicate = ci_info["provenances"][0]["predicate"] - predicate["buildType"] = f"Custom {ci_service.name}" - predicate["builder"]["id"] = deploy_action_source_link - predicate["invocation"]["configSource"][ - "uri" - ] = f"{ctx.remote_path}@refs/heads/{ctx.branch_name}" - predicate["invocation"]["configSource"]["digest"]["sha1"] = ctx.commit_sha - predicate["invocation"]["configSource"]["entryPoint"] = trigger_link - predicate["metadata"]["buildInvocationId"] = html_url - check_result["result_tables"] = [ - BuildAsCodeTable( - build_tool_name=build_tool.name, - ci_service_name=ci_service.name, - build_trigger=trigger_link, - deploy_command=workflow_name, - build_status_url=html_url, - ) - ] - return CheckResultType.PASSED - - for bash_cmd in ci_info["bash_commands"]: - deploy_cmd = self._has_deploy_command(bash_cmd["commands"], build_tool) - if deploy_cmd: - # Get the permalink and HTML hyperlink tag of the CI file that triggered the bash command. - trigger_link = ci_service.api_client.get_file_link( - ctx.repo_full_name, - ctx.commit_sha, - ci_service.api_client.get_relative_path_of_workflow(os.path.basename(bash_cmd["CI_path"])), - ) - # Get the permalink of the source file of the bash command. - bash_source_link = ci_service.api_client.get_file_link( - ctx.repo_full_name, ctx.commit_sha, bash_cmd["caller_path"] - ) - - html_url = ci_service.has_latest_run_passed( - ctx.repo_full_name, - ctx.branch_name, - ctx.commit_sha, - ctx.commit_date, - os.path.basename(bash_cmd["CI_path"]), - ) - - justification_cmd: list[str | dict[str, str]] = [ - { - f"The target repository uses build tool {build_tool.name} to deploy": bash_source_link, - "The build is triggered by": trigger_link, - }, - f"Deploy command: {deploy_cmd}", - {"The status of the build can be seen at": html_url} - if html_url - else "However, could not find a passing workflow run.", - ] - check_result["justification"].extend(justification_cmd) + # Initialize the BuildAsCodeSubchecks object with the AnalyzeContext. + build_as_code_subchecks.build_as_code_subcheck_results = BuildAsCodeSubchecks(ctx=ctx, ci_info=ci_info) + + # ProbLog rules to be evaluated. + prolog_string = PrologString( + """ + :- use_module('src/macaron/slsa_analyzer/checks/problog_predicates.py'). + + A :: ci_parsed :- ci_parsed_check(A). + B :: deploy_action :- deploy_action_check(B). + C :: deploy_command :- deploy_command_check(C). + D :: deploy_kws :- deploy_kws_check(D). + E :: release_workflow_trigger_deploy_command :- release_workflow_trigger_deploy_command_check(E). + F :: release_workflow_trigger_deploy_action :- release_workflow_trigger_deploy_action_check(F). + G :: tested_deploy_action :- tested_deploy_action_check(G). + H :: publishing_workflow_deploy_command :- publishing_workflow_deploy_command_check(H). + I :: publishing_workflow_deploy_action :- publishing_workflow_deploy_action_check(I). + J :: step_uses_secrets_deploy_action :- step_uses_secrets_deploy_action_check(J). + K :: step_uses_secrets_deploy_command :- step_uses_secrets_deploy_command_check(K). + + 0.8 :: deploy_action_certainty :- deploy_action. + 0.10 :: deploy_action_certainty :- tested_deploy_action. + 0.85 :: deploy_action_certainty :- release_workflow_trigger_deploy_action. + %0.95 :: deploy_action_certainty :- publishing_workflow_deploy_action. + 0.65 :: deploy_action_certainty :- step_uses_secrets_deploy_action. + + 0.75 :: deploy_command_certainty :- deploy_command. + 0.85 :: deploy_command_certainty :- release_workflow_trigger_deploy_command. + %0.95 :: deploy_command_certainty :- publishing_workflow_deploy_command. + 0.65 :: deploy_command_certainty :- step_uses_secrets_deploy_command. + + 0.70 :: deploy_kws_certainty :- deploy_kws. + + query(deploy_command_certainty). + query(deploy_action_certainty). + query(deploy_kws_certainty). + """ + ) + # TODO: we want all the logic to be happening inside the rules, + # can we make decisions in here instead of intermediate querying? + + # Convert the result dictionary from Term:float to str:float + term_result: dict[Term, float] = get_evaluatable().create_from(prolog_string).evaluate() + result: dict[str, float] = {str(key): value for key, value in term_result.items()} + deploy_methods = { + "deploy_command": result["deploy_command_certainty"], + "deploy_action": result["deploy_action_certainty"], + "deploy_kws": result["deploy_kws_certainty"], + } + + deploy_methods_valid = {key: value for key, value in deploy_methods.items() if value != 0} + + if deploy_methods_valid.values(): + # Determine the deployment method with the highest certainty score. + highest_certainty = max(deploy_methods_valid, key=deploy_methods_valid.__getitem__) + highest_certainty_score = deploy_methods[highest_certainty] + deploy_method = build_as_code_subchecks.build_as_code_subcheck_results.get_subcheck_results( + highest_certainty + ) + + if isinstance(deploy_method, DeploySubcheckResults): if ctx.dynamic_data["is_inferred_prov"] and ci_info["provenances"]: + # Store the values for the inferred provenance representation. predicate = ci_info["provenances"][0]["predicate"] predicate["buildType"] = f"Custom {ci_service.name}" - predicate["builder"]["id"] = bash_source_link predicate["invocation"]["configSource"][ "uri" ] = f"{ctx.remote_path}@refs/heads/{ctx.branch_name}" predicate["invocation"]["configSource"]["digest"]["sha1"] = ctx.commit_sha - predicate["invocation"]["configSource"]["entryPoint"] = trigger_link - predicate["metadata"]["buildInvocationId"] = html_url - check_result["result_tables"] = [ - BuildAsCodeTable( - build_tool_name=build_tool.name, - ci_service_name=ci_service.name, - build_trigger=trigger_link, - deploy_command=deploy_cmd, - build_status_url=html_url, - ) - ] - return CheckResultType.PASSED - - # We currently don't parse these CI configuration files. - # We just look for a keyword for now. - for unparsed_ci in (Jenkins, Travis, CircleCI, GitLabCI): - if isinstance(ci_service, unparsed_ci): - if build_tool.ci_deploy_kws[ci_service.name]: - deploy_kw, config_name = ci_service.has_kws_in_config( - build_tool.ci_deploy_kws[ci_service.name], repo_path=ctx.repo_path - ) - if not config_name: - break - check_result["justification"].append( - f"The target repository uses build tool {build_tool.name}" - + f" in {ci_service.name} using {deploy_kw} to deploy." + + predicate["metadata"]["buildInvocationId"] = deploy_method.html_url + predicate["builder"]["id"] = deploy_method.source_link + predicate["invocation"]["configSource"]["entryPoint"] = deploy_method.trigger_link + + if highest_certainty == "deploy_kws": + predicate["builder"]["id"] = deploy_method.config_name + predicate["invocation"]["configSource"]["entryPoint"] = deploy_method.config_name + + logger.info(build_as_code_subchecks.build_as_code_subcheck_results.check_results.values()) + + all_evidence = build_as_code_subchecks.build_as_code_subcheck_results.evidence + + distinct_evidence = [*set(all_evidence)] + ev_string = ", ".join(distinct_evidence) + logger.info("Evidence vals %s", ev_string) + + confidence_score = round(highest_certainty_score, 4) + check_result["result_tables"] = [ + BuildAsCodeTable( + build_tool_name=build_tool.name, + ci_service_name=ci_service.name, + build_trigger=deploy_method.trigger_link, + deploy_command=deploy_method.deploy_cmd, + build_status_url=deploy_method.html_url, + confidence_score=confidence_score, + evidence=ev_string, ) - if ctx.dynamic_data["is_inferred_prov"] and ci_info["provenances"]: - predicate = ci_info["provenances"][0]["predicate"] - predicate["buildType"] = f"Custom {ci_service.name}" - predicate["builder"]["id"] = config_name - predicate["invocation"]["configSource"][ - "uri" - ] = f"{ctx.remote_path}@refs/heads/{ctx.branch_name}" - predicate["invocation"]["configSource"]["digest"]["sha1"] = ctx.commit_sha - predicate["invocation"]["configSource"]["entryPoint"] = config_name - check_result["result_tables"] = [ - BuildAsCodeTable( - build_tool_name=build_tool.name, - ci_service_name=ci_service.name, - deploy_command=deploy_kw, - ) - ] - return CheckResultType.PASSED + ] + check_result["confidence_score"] = confidence_score + + # TODO: compile all justifications + # check_result["justification"].append() + + # TODO: Investigate using proofs + logger.info("The certainty of this check passing is: %s", confidence_score) + + # Check whether the confidence score is greater than the minimum threshold for this check. + if confidence_score >= self.confidence_score_threshold: + return CheckResultType.PASSED pass_msg = f"The target repository does not use {build_tool.name} to deploy." check_result["justification"].append(pass_msg) diff --git a/src/macaron/slsa_analyzer/checks/build_as_code_subchecks.py b/src/macaron/slsa_analyzer/checks/build_as_code_subchecks.py new file mode 100644 index 000000000..f257e8cc2 --- /dev/null +++ b/src/macaron/slsa_analyzer/checks/build_as_code_subchecks.py @@ -0,0 +1,440 @@ +# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains a class to store results from the BuildAsCodeCheck subchecks.""" + +import logging +import os +import re + +from attr import dataclass + +from macaron.config.defaults import defaults +from macaron.slsa_analyzer.analyze_context import AnalyzeContext +from macaron.slsa_analyzer.build_tool.base_build_tool import BaseBuildTool +from macaron.slsa_analyzer.build_tool.pip import Pip +from macaron.slsa_analyzer.build_tool.poetry import Poetry +from macaron.slsa_analyzer.ci_service.circleci import CircleCI +from macaron.slsa_analyzer.ci_service.github_actions import GHWorkflowType +from macaron.slsa_analyzer.ci_service.gitlab_ci import GitLabCI +from macaron.slsa_analyzer.ci_service.jenkins import Jenkins +from macaron.slsa_analyzer.ci_service.travis import Travis +from macaron.slsa_analyzer.registry_service.api_client import PyPIAPIClient +from macaron.slsa_analyzer.specs.ci_spec import CIInfo + +logger: logging.Logger = logging.getLogger(__name__) + + +def has_deploy_command(commands: list[list[str]], build_tool: BaseBuildTool) -> str: + """Check if the bash command is a build and deploy command.""" + # Account for Python projects having separate tools for packaging and publishing. + deploy_tool = build_tool.publisher if build_tool.publisher else build_tool.builder + for com in commands: + + # Check for empty or invalid commands. + if not com or not com[0]: + continue + # The first argument in a bash command is the program name. + # So first check that the program name is a supported build tool name. + # We need to handle cases where the first argument is a path to the program. + cmd_program_name = os.path.basename(com[0]) + if not cmd_program_name: + logger.debug("Found invalid program name %s.", com[0]) + continue + + check_build_commands = any(build_cmd for build_cmd in deploy_tool if build_cmd == cmd_program_name) + + # Support the use of interpreters like Python that load modules, i.e., 'python -m pip install'. + check_module_build_commands = any( + interpreter == cmd_program_name + and com[1] + and com[1] in build_tool.interpreter_flag + and com[2] + and com[2] in deploy_tool + for interpreter in build_tool.interpreter + ) + prog_name_index = 2 if check_module_build_commands else 0 + + if check_build_commands or check_module_build_commands: + # Check the arguments in the bash command for the deploy goals. + # If there are no deploy args for this build tool, accept as deploy command. + if not build_tool.deploy_arg: + logger.info("No deploy arguments required. Accept %s as deploy command.", str(com)) + return str(com) + + for word in com[(prog_name_index + 1) :]: + # TODO: allow plugin versions in arguments, e.g., maven-plugin:1.6.8:deploy. + if word in build_tool.deploy_arg: + logger.info("Found deploy command %s.", str(com)) + return str(com) + return "" + + +@dataclass +class DeploySubcheckResults: + """DataClass containing information required from deploy command subchecks.""" + + certainty: float = 0.0 + justification: list[str | dict[str, str]] = [""] + deploy_cmd: str = "" + trigger_link: str = "" + source_link: str = "" + html_url: str = "" + config_name: str = "" + workflow_name: str = "" + workflow_file: str = "" + workflow_info: dict = {} + + +class BuildAsCodeSubchecks: + """Class for storing the results from the BuildAsCodeCheck subchecks.""" + + # store analyze context + def __init__(self, ctx: AnalyzeContext, ci_info: CIInfo) -> None: + self.ctx = ctx + self.build_tool: BaseBuildTool = ctx.dynamic_data["build_spec"].get("tool") # type: ignore + self.ci_services = ctx.dynamic_data["ci_services"] + self.check_results: dict[str, DeploySubcheckResults] = {} # Update this with each check. + self.ci_info = ci_info + self.ci_service = ci_info["service"] + # Certainty value to be returned if a subcheck fails. + self.failed_check = 0.0 + self.evidence: list[str] = [] + + # TODO: Make subcheck functions available to other checks. + + # TODO: Before each check is run, check whether a certainty result already exists in self.check_results + # to avoid re-running unecessarily. + + def ci_parsed(self) -> float: + """Check whether parsing is supported for this CI service's CI config files.""" + check_certainty = 1.0 + # TODO: If this check has already been run on this repo, return certainty. + if self.ci_info["bash_commands"]: + justification: list[str | dict[str, str]] = ["The CI workflow files for this CI service are parsed."] + self.check_results["ci_parsed"] = DeploySubcheckResults( + certainty=check_certainty, justification=justification + ) + self.evidence.append("ci_parsed") + logger.info("Evidence found: ci_parsed -> %s", check_certainty) + return check_certainty + return self.failed_check + + def deploy_command(self) -> float: + """Check for the use of deploy command to deploy.""" + check_certainty = 0.8 + + for bash_cmd in self.ci_info["bash_commands"]: + deploy_cmd = has_deploy_command(bash_cmd["commands"], self.build_tool) + if deploy_cmd: + # Get the permalink and HTML hyperlink tag of the CI file that triggered the bash command. + trigger_link = self.ci_service.api_client.get_file_link( + self.ctx.repo_full_name, + self.ctx.commit_sha, + self.ci_service.api_client.get_relative_path_of_workflow(os.path.basename(bash_cmd["CI_path"])), + ) + # Get the permalink of the source file of the bash command. + bash_source_link = self.ci_service.api_client.get_file_link( + self.ctx.repo_full_name, self.ctx.commit_sha, bash_cmd["caller_path"] + ) + + html_url = self.ci_service.has_latest_run_passed( + self.ctx.repo_full_name, + self.ctx.branch_name, + self.ctx.commit_sha, + self.ctx.commit_date, + os.path.basename(bash_cmd["CI_path"]), + ) + + workflow_file = os.path.basename(trigger_link) + workflow_info = bash_cmd["workflow_info"] + + justification: list[str | dict[str, str]] = [ + { + f"The target repository uses build tool {self.build_tool.name} to deploy": bash_source_link, + "The build is triggered by": trigger_link, + }, + f"Deploy command: {deploy_cmd}", + {"The status of the build can be seen at": html_url} + if html_url + else "However, could not find a passing workflow run.", + ] + self.evidence.append("deploy_command") + logger.info("Evidence found: deploy_command -> %s", check_certainty) + self.check_results["deploy_command"] = DeploySubcheckResults( + certainty=check_certainty, + justification=justification, + deploy_cmd=deploy_cmd, + trigger_link=trigger_link, + source_link=bash_source_link, + html_url=html_url, + workflow_file=workflow_file, + workflow_info=workflow_info, + ) + + return check_certainty + return self.failed_check + + def deploy_kws(self) -> float: + """Check for the use of deploy keywords to deploy.""" + check_certainty = 0.4 + + # We currently don't parse these CI configuration files. + # We just look for a keyword for now. + for unparsed_ci in (Jenkins, Travis, CircleCI, GitLabCI): + if isinstance(self.ci_service, unparsed_ci): + if self.build_tool.ci_deploy_kws[self.ci_service.name]: + deploy_kw, config_name = self.ci_service.has_kws_in_config( + self.build_tool.ci_deploy_kws[self.ci_service.name], repo_path=self.ctx.repo_path + ) + if not config_name: + return self.failed_check + + justification: list[str | dict[str, str]] = [f"The target repository uses {deploy_kw} to deploy."] + self.evidence.append("deploy_kws") + + self.check_results["deploy_kws"] = DeploySubcheckResults( + certainty=check_certainty, + justification=justification, + deploy_cmd=deploy_kw, + config_name=config_name, + ) + logger.info("Evidence found: deploy_kws -> %s", check_certainty) + return check_certainty + + return self.failed_check + + def tested_deploy_action(self, workflow_file: str = "", workflow_name: str = "") -> float: + """Check for the use of a test deploy to PyPi given a CI workflow.""" + check_certainty = 0.9 + logger.info("File name: %s", workflow_file) + for callee in self.ci_info["callgraph"].bfs(): + # TODO: figure out a way to generalize this implementation for other external GHAs. + # Currently just checks for the pypa/gh-action-pypi-publish action. + if not workflow_name or callee.node_type not in [ + GHWorkflowType.EXTERNAL, + GHWorkflowType.REUSABLE, + ]: + logger.debug("Workflow %s is not relevant. Skipping...", callee.name) + continue + callee_name = callee.name.split("@")[0] + + if callee_name == workflow_name == "pypa/gh-action-pypi-publish": + workflow_info = callee.parsed_obj + inputs = workflow_info.get("Inputs", {}) + repo_url = "" + if inputs: + repo_url = inputs.get("repository_url", {}).get("Value", {}).get("Value", "") + # TODO: Use values that come from defaults.ini rather than hardcoded. + if repo_url == "https://test.pypi.org/legacy/": + self.evidence.append("tested_deploy_action") + logger.info("Evidence found: tested_deploy_action -> %s", check_certainty) + return check_certainty + return self.failed_check + + def deploy_action(self) -> float: + """Check for use of a trusted Github Actions workflow to publish/deploy.""" + check_certainty = 0.95 + + if isinstance(self.build_tool, (Pip, Poetry)): + trusted_deploy_actions = defaults.get_list("builder.pip.ci.deploy", "github_actions", fallback=[]) + + for callee in self.ci_info["callgraph"].bfs(): + workflow_name = callee.name.split("@")[0] + + if not workflow_name or callee.node_type not in [ + GHWorkflowType.EXTERNAL, + GHWorkflowType.REUSABLE, + ]: + logger.debug("Workflow %s is not relevant. Skipping...", callee.name) + continue + + if workflow_name in trusted_deploy_actions: + workflow_info = callee.parsed_obj + inputs = workflow_info.get("Inputs", {}) + + # Deployment is to Pypi if there isn't a repository url + # https://packaging.python.org/en/latest/guides/ + # publishing-package-distribution-releases-using-github-actions-ci-cd-workflows/ + if inputs and inputs.get("repository_url"): + logger.debug( + "Workflow %s has a repository url, indicating a non-legit publish to PyPi. Skipping...", + callee.name, + ) + continue + + # TODO: all of this logic could be generalized in build_as_code body. + trigger_link = self.ci_service.api_client.get_file_link( + self.ctx.repo_full_name, + self.ctx.commit_sha, + self.ci_service.api_client.get_relative_path_of_workflow(os.path.basename(callee.caller_path)), + ) + deploy_action_source_link = self.ci_service.api_client.get_file_link( + self.ctx.repo_full_name, self.ctx.commit_sha, callee.caller_path + ) + + html_url = self.ci_service.has_latest_run_passed( + self.ctx.repo_full_name, + self.ctx.branch_name, + self.ctx.commit_sha, + self.ctx.commit_date, + os.path.basename(callee.caller_path), + ) + + workflow_file = os.path.basename(trigger_link) + + # TODO: include in the justification multiple cases of external action usage + justification: list[str | dict[str, str]] = [ + { + "To deploy": deploy_action_source_link, + "The build is triggered by": trigger_link, + }, + f"Deploy action: {workflow_name}", + {"The status of the build can be seen at": html_url} + if html_url + else "However, could not find a passing workflow run.", + ] + + self.evidence.append("deploy_action") + logger.info("Evidence found: deploy_action -> %s", check_certainty) + + self.check_results["deploy_action"] = DeploySubcheckResults( + certainty=check_certainty, + justification=justification, + deploy_cmd=workflow_name, + trigger_link=trigger_link, + source_link=deploy_action_source_link, + html_url=html_url, + workflow_name=workflow_name, + workflow_file=workflow_file, + workflow_info=workflow_info, + ) + + return check_certainty + + return self.failed_check + + # TODO: workflow_name isn't used as a file in some places! + + def release_workflow_trigger(self, workflow_file: str = "") -> float: + """Check that the workflow is triggered by a valid event.""" + check_certainty = 0.9 + check_certainty_lowered = 0.75 + + if not workflow_file: + return self.failed_check + + valid_trigger_events = ["workflow_dispatch", "push", "release", "create"] + invalid_trigger_events = ["pull_request"] + valid_trigger = [""] + invalid_trigger = "" + + # TODO: Consider activity types for release, i.e. prereleased + for callee in self.ci_info["callgraph"].bfs(): + # Find the workflow file that the deployment method was used in and + # extract the trigger event types. + if callee.name == workflow_file: + trigger_events = callee.parsed_obj.get("On", {}) + for event in trigger_events: + hook = event.get("Hook", {}) + trigger_type = str(hook.get("Value", "")) + # Check that the identified event trigger type is a valid release event. + if trigger_type in valid_trigger_events: + valid_trigger.append(trigger_type) + if trigger_type in invalid_trigger_events: + invalid_trigger = trigger_type + + if valid_trigger: + logger.info( + "Valid trigger event '%s' found for the workflow file: %s.", valid_trigger[0], workflow_file + ) + self.evidence.append("release_workflow_trigger") + justification: list[str | dict[str, str]] = [ + f"Valid trigger event type '{valid_trigger[0]}' used in workflow file: {workflow_file}" + ] + self.check_results["release_workflow_trigger"] = DeploySubcheckResults(justification=justification) + if invalid_trigger: + logger.info("Evidence found: release_workflow_trigger -> %s", check_certainty_lowered) + return check_certainty_lowered + + logger.info("Evidence found: release_workflow_trigger -> %s", check_certainty) + return check_certainty + return self.failed_check + + def pypi_publishing_workflow_timestamp(self) -> float: + """Compare PyPI release timestamp with GHA publishing workflow timestamps.""" + check_certainty = 0.9 + project_name = self.build_tool.project_name + pypi_timestamp = "" + # Query PyPI API for the timestamp of the latest release. + if project_name: + api_client = PyPIAPIClient() + response = api_client.get_all_project_data(project_name=project_name) + latest = response.get("urls", [""])[0] + if latest: + pypi_timestamp = latest.get("upload_time") + if not pypi_timestamp: + return self.failed_check + + # TODO: Collect 5 of the most recent successful workflow runs + workflow_data: dict = {} + workflow_name = "" + + workflow_created_timestamp = workflow_data.get("created_at", "") + workflow_updated_timestamp = workflow_data.get("updated_at", "") + + # Compare timestamp of most recent PyPI release with several GHAs workflow runs. + if workflow_created_timestamp and workflow_updated_timestamp: + # TODO: convert into datetime object to compare + if workflow_created_timestamp <= pypi_timestamp <= workflow_updated_timestamp: + self.evidence.append("publish_timestamp") + justification: list[str | dict[str, str]] = [ + f"The timestamp of workflow {workflow_name} matches with the PyPI package release time." + ] + self.check_results["publish_timestamp"] = DeploySubcheckResults(justification=justification) + logger.info("Evidence found: publishing_workflow_timestamp -> %s", check_certainty) + return check_certainty + + return self.failed_check + + def step_uses_secrets(self, step_info: dict) -> float: + """Identify whether a workflow step uses secrets.""" + check_certainty = 0.9 + + # inputs = step_info.get("Inputs", {}) + logger.info("inputs: %s", step_info) + if self._step_uses_secrets(step_info): + self.evidence.append("deploy_step_uses_secrets") + logger.info("Evidence found: step_secrets -> %s", check_certainty) + justification: list[str | dict[str, str]] = [ + "The workflow step that contains the deployment method uses secrets." + ] + self.check_results["step_secrets"] = DeploySubcheckResults(justification=justification) + return check_certainty + return self.failed_check + + def _step_uses_secrets(self, inputs: dict) -> bool: + """Recurse through GitHub Actions syntax tree to find the use of secrets.""" + for value in inputs.values(): + if isinstance(value, str): + # Match the pattern '${{ content }}' + pattern = re.compile(r"\$\{\{([^}]*)\}\}", re.IGNORECASE) + match = pattern.search(value) + if match is not None: + content = match.group(1).strip() + contents = content.split(".") + # Note that we only support the case: ${{ secrets.TOKEN }} for now. + # Exclude 'secrets.GITHUB_TOKEN'.. + if len(contents) == 2 and (contents[0] == "secrets") and (contents[1] != "GITHUB_TOKEN"): + return True + elif isinstance(value, dict): + if self._step_uses_secrets(value): + return True + return False + + def get_subcheck_results(self, subcheck_name: str) -> DeploySubcheckResults: + """Return the results for a particular subcheck.""" + return self.check_results[subcheck_name] + + +build_as_code_subcheck_results: BuildAsCodeSubchecks = None # type: ignore # pylint: disable=invalid-name diff --git a/src/macaron/slsa_analyzer/checks/check_result.py b/src/macaron/slsa_analyzer/checks/check_result.py index ab5531e01..3397dedb3 100644 --- a/src/macaron/slsa_analyzer/checks/check_result.py +++ b/src/macaron/slsa_analyzer/checks/check_result.py @@ -40,6 +40,7 @@ class CheckResult(TypedDict): result_tables: list[DeclarativeBase | Table] # recommendation: str result_type: CheckResultType + confidence_score: float class SkippedInfo(TypedDict): diff --git a/src/macaron/slsa_analyzer/checks/problog_predicates.py b/src/macaron/slsa_analyzer/checks/problog_predicates.py new file mode 100644 index 000000000..0cbbdff51 --- /dev/null +++ b/src/macaron/slsa_analyzer/checks/problog_predicates.py @@ -0,0 +1,201 @@ +# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Contains ProbLog predicates that return the results stored in the BuildAsCodeSubchecks dataclass.""" +import logging + +from problog.extern import problog_export + +from macaron.slsa_analyzer.checks.build_as_code_subchecks import build_as_code_subcheck_results + +FAILED_CHECK = 0.0 + +logger: logging.Logger = logging.getLogger(__name__) + +# TODO: check that a result doesn't already exist before running the check. + + +@problog_export("-int") # type: ignore +def ci_parsed_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + check = build_as_code_subcheck_results.check_results.get("ci_parsed") + if check: + return check.certainty + return build_as_code_subcheck_results.ci_parsed() + + +@problog_export("-int") # type: ignore +def deploy_action_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [ci_parsed_check() > 0] + # Verify dependencies and that this check hasn't already been run. + if not all(depends_on): + return FAILED_CHECK + check = build_as_code_subcheck_results.check_results.get("deploy_action") + if check: + return check.certainty + return build_as_code_subcheck_results.deploy_action() + + +@problog_export("-int") # type: ignore +def deploy_command_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [ci_parsed_check() > 0.0] + # Verify dependencies and that this check hasn't already been run. + check = build_as_code_subcheck_results.check_results.get("deploy_command") + if not all(depends_on): + return FAILED_CHECK + if check: + return check.certainty + return build_as_code_subcheck_results.deploy_command() + + +@problog_export("-int") # type: ignore +def deploy_kws_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [ci_parsed_check() == 0.0] + if not all(depends_on): + return FAILED_CHECK + return build_as_code_subcheck_results.deploy_kws() + + +@problog_export("-int") # type: ignore +def release_workflow_trigger_deploy_command_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [deploy_command_check() > 0.0] + if not all(depends_on): + return FAILED_CHECK + workflow_file = build_as_code_subcheck_results.check_results["deploy_command"].workflow_file + return build_as_code_subcheck_results.release_workflow_trigger(workflow_file=workflow_file) + + +@problog_export("-int") # type: ignore +def release_workflow_trigger_deploy_action_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [deploy_action_check() > 0.0] + if not all(depends_on): + return FAILED_CHECK + workflow_file = build_as_code_subcheck_results.check_results["deploy_action"].workflow_file + return build_as_code_subcheck_results.release_workflow_trigger(workflow_file=workflow_file) + + +@problog_export("-int") # type: ignore +def tested_deploy_action_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [deploy_action_check() > 0.0] + if not all(depends_on): + return FAILED_CHECK + workflow_name = build_as_code_subcheck_results.check_results["deploy_action"].workflow_name + return build_as_code_subcheck_results.tested_deploy_action(workflow_name=workflow_name) + + +@problog_export("-int") # type: ignore +def publishing_workflow_deploy_action_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [release_workflow_trigger_deploy_action_check()] + if not all(depends_on): + return FAILED_CHECK + # workflow_name = build_as_code_subcheck_results.check_results["deploy_action"] + return build_as_code_subcheck_results.pypi_publishing_workflow_timestamp() + + +@problog_export("-int") # type: ignore +def publishing_workflow_deploy_command_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [release_workflow_trigger_deploy_command_check() > 0.0] + if not all(depends_on): + return FAILED_CHECK + # workflow_name = build_as_code_subcheck_results.check_results["deploy_action"] + return build_as_code_subcheck_results.pypi_publishing_workflow_timestamp() + + +@problog_export("-int") # type: ignore +def step_uses_secrets_deploy_command_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + # TODO: currently we don't store the GHA object during deploy_command_check so + # can't perform this sub-task (no workflow_info available). + depends_on = [deploy_command_check() > 0.0] + if not all(depends_on): + return FAILED_CHECK + step_info = build_as_code_subcheck_results.check_results["deploy_command"].workflow_info + if step_info: + return build_as_code_subcheck_results.step_uses_secrets(step_info=step_info) + return FAILED_CHECK + + +@problog_export("-int") # type: ignore +def step_uses_secrets_deploy_action_check() -> float: + """Get the value of the subcheck. + + Returns + ------- + Certainty + The certainty of the check. + """ + depends_on = [deploy_action_check() > 0.0] + if not all(depends_on): + return FAILED_CHECK + step_info = build_as_code_subcheck_results.check_results["deploy_action"].workflow_info + if step_info: + return build_as_code_subcheck_results.step_uses_secrets(step_info=step_info) + return FAILED_CHECK diff --git a/src/macaron/slsa_analyzer/ci_service/github_actions.py b/src/macaron/slsa_analyzer/ci_service/github_actions.py index 7ae85dee9..836b4d0f8 100644 --- a/src/macaron/slsa_analyzer/ci_service/github_actions.py +++ b/src/macaron/slsa_analyzer/ci_service/github_actions.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022 - 2022, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022 - 2023, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """This module analyzes GitHub Actions CI.""" @@ -170,7 +170,7 @@ def build_call_graph_from_node(self, node: GitHubNode) -> None: name=step["Exec"]["Uses"]["Value"], node_type=GHWorkflowType.EXTERNAL, source_path="", - parsed_obj={}, + parsed_obj=step["Exec"], caller_path=node.source_path, ) ) @@ -278,6 +278,7 @@ def extract_all_bash(self, callgraph: CallGraph, macaron_path: str = "") -> Iter step["Exec"]["Run"]["Value"], ci_file=self.api_client.get_relative_path_of_workflow(callee.name), ci_type="github_actions", + workflow_info=step, recursive=True, repo_path=callgraph.repo_path, working_dir=step["Exec"]["WorkingDirectory"] or "", diff --git a/src/macaron/slsa_analyzer/registry.py b/src/macaron/slsa_analyzer/registry.py index 9fd3b487e..b98de6136 100644 --- a/src/macaron/slsa_analyzer/registry.py +++ b/src/macaron/slsa_analyzer/registry.py @@ -375,6 +375,7 @@ def scan(self, target: AnalyzeContext, skipped_checks: list[SkippedInfo]) -> dic justification=[message], result_type=CheckResultType.UNKNOWN, result_tables=[], + confidence_score=0, ) graph.done(check_id) else: diff --git a/src/macaron/slsa_analyzer/registry_service/api_client.py b/src/macaron/slsa_analyzer/registry_service/api_client.py new file mode 100644 index 000000000..bb610766d --- /dev/null +++ b/src/macaron/slsa_analyzer/registry_service/api_client.py @@ -0,0 +1,82 @@ +# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module provides API clients for Registry services, such as PyPi.""" + +import logging + +from macaron.util import send_get_http + +logger: logging.Logger = logging.getLogger(__name__) + +# TODO: Create BaseAPIClient + + +class PyPIAPIClient: + """This class acts as a client to use PyPi API. + + See https://warehouse.pypa.io/api-reference/ for the PyPI API documentation. + """ + + _PYPI_API_URL = "https://pypi.org/pypi" + + def get_all_project_data(self, project_name: str) -> dict: + """Query PyPi JSON API for the information about an individual project at the latest version. + + The url would be in the following form: + ``https://pypi.org/pypi/{project_name}/json`` + + Parameters + ---------- + project_name : str + The full name of the project (case insensitive). + + Returns + ------- + dict + The json query result or an empty dict if failed. + + Examples + -------- + The following call to this method will perform a query to ``https://pypi.org/pypi/flask/json`` + + >>> pypi_client.get_all_project_data( + project_name="flask" + ) + """ + logger.debug("Query for project %s 's data", project_name) + url = f"{PyPIAPIClient._PYPI_API_URL}/{project_name}/json" + response_data = send_get_http(url, {}) + return response_data + + def get_release_data(self, project_name: str, version: str) -> dict: + """Query PyPi JSON API for the information about an individual release at a specific version. + + The url would be in the following form: + ``https://pypi.org/pypi/{project_name}/{version}/json`` + + Parameters + ---------- + project_name : str + The full name of the project (case insensitive). + version : str + The version of the project in the form ``*.*.*``. + + Returns + ------- + dict + The json query result or an empty dict if failed. + + Examples + -------- + The following call to this method will perform a query to ``https://pypi.org/pypi/flask/1.0.0/json`` + + >>> pypi_client.get_release_data( + project_name="flask", + version="1.0.0" + ) + """ + logger.debug("Query for project %s 's data at version %s", project_name, version) + url = f"{PyPIAPIClient._PYPI_API_URL}/{project_name}/{version}/json" + response_data = send_get_http(url, {}) + return response_data diff --git a/src/macaron/slsa_analyzer/registry_service/pypi.py b/src/macaron/slsa_analyzer/registry_service/pypi.py new file mode 100644 index 000000000..8e822d07b --- /dev/null +++ b/src/macaron/slsa_analyzer/registry_service/pypi.py @@ -0,0 +1,25 @@ +# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module contains the spec for the PyPI service.""" + +from macaron.slsa_analyzer.registry_service.api_client import PyPIAPIClient + + +class PyPI: + """This class contains the spec of the PyPI service.""" + + def __init__(self) -> None: + """Initialize instance.""" + self._api_client: PyPIAPIClient = None # type: ignore + + @property + def api_client(self) -> PyPIAPIClient: + """Return the API client used for querying PyPI API. + + This API is used to check if a PyPI repo can be cloned. + """ + if not self._api_client: + self._api_client = PyPIAPIClient() + + return self._api_client diff --git a/tests/slsa_analyzer/checks/test_build_as_code_check.py b/tests/slsa_analyzer/checks/test_build_as_code_check.py index b7a07b0f9..8f3252844 100644 --- a/tests/slsa_analyzer/checks/test_build_as_code_check.py +++ b/tests/slsa_analyzer/checks/test_build_as_code_check.py @@ -180,9 +180,10 @@ def test_gha_workflow_deployment( """Test the use of verified GitHub Actions to deploy.""" check = BuildAsCodeCheck() check_result = CheckResult(justification=[]) # type: ignore + bash_commands = BashCommands(caller_path="source_file", CI_path="ci_file", CI_type="github_actions", commands=[[]]) ci_info = CIInfo( service=github_actions_service, - bash_commands=[], + bash_commands=[bash_commands], callgraph=CallGraph(BaseNode(), ""), provenance_assets=[], latest_release={},