diff --git a/src/yardstick/tool/grype.py b/src/yardstick/tool/grype.py index c1349fe..615bb2a 100644 --- a/src/yardstick/tool/grype.py +++ b/src/yardstick/tool/grype.py @@ -1,4 +1,5 @@ import atexit +import hashlib import json import logging import os @@ -144,7 +145,7 @@ def _install_from_git( abspath = os.path.abspath(path) if not tool_exists: cls._run_go_build( - abspath=abspath, + abs_install_dir=abspath, repo_path=repo_path, description=description, binpath=path, @@ -154,25 +155,81 @@ def _install_from_git( return Grype(path=path, version_detail=description, **kwargs) + @classmethod + def _local_build_version_suffix(cls, src_path: str) -> str: + src_path = os.path.abspath(os.path.expanduser(src_path)) + git_desc = "" + diff_digest = "clean" + try: + repo = git.Repo(src_path) + except: + logging.error(f"failed to open existing grype repo at {src_path!r}") + raise + git_desc = repo.git.describe("--tags", "--always", "--long", "--dirty") + if repo.is_dirty(): + hash_obj = hashlib.sha1() + for untracked in repo.untracked_files: + hash_obj.update(cls._hash_file(os.path.join(repo.working_dir, untracked)).encode()) + hash_obj.update(repo.git.diff("HEAD").encode()) + diff_digest = hash_obj.hexdigest()[:8] + return f"{git_desc}-{diff_digest}" + + @classmethod + def _hash_file(cls, path: str) -> str: + hash_obj = hashlib.sha1() + with open(path, "rb") as f: + while True: + data = f.read(4096) + if not data: + break + hash_obj.update(data) + return hash_obj.hexdigest() + + @classmethod + def _install_from_path( + cls, + path: Optional[str], + src_path: str, + **kwargs, + ) -> "Grype": + # get the description and head ref from the repo + src_repo_path = os.path.abspath(os.path.expanduser(src_path)) + build_version = cls._local_build_version_suffix(src_repo_path) + logging.debug(f"installing grype from path={src_repo_path!r}") + logging.debug(f"installing grype to path={path!r}") + if not path: + path = tempfile.mkdtemp() + atexit.register(shutil.rmtree, path) + dest_path = os.path.join(path.replace("path:", ""), build_version, "local_install") + os.makedirs(dest_path, exist_ok=True) + cls._run_go_build( + abs_install_dir=os.path.abspath(dest_path), + description=f"{path}:{build_version}", + repo_path=src_repo_path, + binpath=dest_path, + ) + + return Grype(path=dest_path, **kwargs) + @staticmethod def _run_go_build( - abspath: str, + abs_install_dir: str, repo_path: str, description: str, binpath: str, version_ref: str = "github.com/anchore/grype/internal/version.version", ): - logging.debug(f"installing grype via build to {abspath!r}") + logging.debug(f"installing grype via build to {abs_install_dir!r}") main_pkg_path = "./cmd/grype" if not os.path.exists(os.path.join(repo_path, "cmd", "grype", "main.go")): # support legacy installations, when the main.go was in the root of the repo main_pkg_path = "." - c = f"go build -ldflags \"-w -s -extldflags '-static' -X {version_ref}={description}\" -o {abspath} {main_pkg_path}" + c = f"go build -ldflags \"-w -s -extldflags '-static' -X {version_ref}={description}\" -o {abs_install_dir} {main_pkg_path}" logging.debug(f"running {c!r}") - e = {"GOBIN": abspath, "CGO_ENABLED": "0"} + e = {"GOBIN": abs_install_dir, "CGO_ENABLED": "0"} e.update(os.environ) subprocess.check_call( @@ -185,6 +242,24 @@ def _run_go_build( os.chmod(f"{binpath}/grype", 0o755) + @classmethod + def _get_latest_version_from_github(cls) -> str: + headers = {} + if os.environ.get("GITHUB_TOKEN") is not None: + headers["Authorization"] = "Bearer " + os.environ.get("GITHUB_TOKEN") + + response = requests.get( + "https://api.github.com/repos/anchore/grype/releases/latest", + headers=headers, + ) + + if response.status_code >= 400: + logging.error(f"error while fetching latest grype version: {response.status_code}: {response.reason} {response.text}") + + response.raise_for_status() + + return response.json()["name"] + # pylint: disable=too-many-arguments @classmethod def install( @@ -217,25 +292,8 @@ def install( version = cls._latest_version_from_github logging.info(f"latest grype release found (cached) is {version}") else: - headers = {} - if os.environ.get("GITHUB_TOKEN") is not None: - headers["Authorization"] = "Bearer " + os.environ.get("GITHUB_TOKEN") - - response = requests.get( - "https://api.github.com/repos/anchore/grype/releases/latest", - headers=headers, - ) - - if response.status_code >= 400: - logging.error( - f"error while fetching latest grype version: {response.status_code}: {response.reason} {response.text}" - ) - - response.raise_for_status() - - version = response.json()["name"] + version = cls._get_latest_version_from_github() cls._latest_version_from_github = version - path = os.path.join(os.path.dirname(path), version) logging.info(f"latest grype release found is {version}") @@ -246,10 +304,29 @@ def install( version, ): tool_obj = cls._install_from_installer( - version=version, path=path, use_cache=use_cache, profile=grype_profile, **kwargs + version=version, + path=path, + use_cache=use_cache, + profile=grype_profile, + **kwargs, + ) + elif version.startswith("path:"): + tool_obj = cls._install_from_path( + path=path, + src_path=version.removeprefix("path:"), + version=version.removeprefix("path:"), + use_cache=use_cache, + profile=grype_profile, + **kwargs, ) else: - tool_obj = cls._install_from_git(version=version, path=path, use_cache=use_cache, profile=grype_profile, **kwargs) + tool_obj = cls._install_from_git( + version=version, + path=path, + use_cache=use_cache, + profile=grype_profile, + **kwargs, + ) # always update the DB, raise exception on failure if db_import_path: diff --git a/tests/unit/tool/test_grype.py b/tests/unit/tool/test_grype.py index c8a5134..dfca621 100644 --- a/tests/unit/tool/test_grype.py +++ b/tests/unit/tool/test_grype.py @@ -21,3 +21,26 @@ def test_grype_no_profile(): tool = Grype(path="test-path") tool.capture(image="test-image", tool_input=None) assert check_output.call_args.args[0] == ["test-path/grype", "-o", "json", "test-image"] + + +def test_install_from_path(): + with mock.patch("subprocess.check_call") as check_call, mock.patch("git.Repo") as repo, mock.patch( + "os.path.exists" + ) as exists, mock.patch("os.makedirs") as makedirs, mock.patch("os.chmod") as chmod: + check_call.return_value = bytes("test-output", "utf-8") + exists.return_value = True + fake_repo = mock.Mock() + fake_repo.git = mock.Mock() + fake_repo.untracked_files = [] + git_describe_val = "v0.65.1-1-g74a7a67-dirty" + hash_of_git_diff = "a29864cf5600b481056b6fa30a21cdbabc15287d"[:8] + fake_repo.git.describe.return_value = git_describe_val + fake_repo.git.diff.return_value = "test-diff" # hash is 'a29864cf5600b481056b6fa30a21cdbabc15287d' + repo.return_value = fake_repo + version_str = "path:/where/grype/is/cloned" + normalized_version_str = version_str.replace("/", "_").removeprefix("path:") + expected_grype_path = ( + f".yardstick/tools/grype/{normalized_version_str}/{git_describe_val}-{hash_of_git_diff}/local_install" + ) + tool = Grype.install(version=version_str, path=".yardstick/tools/grype/path:_where_grype_is_cloned", update_db=False) + assert tool.path == expected_grype_path