Skip to content

Commit

Permalink
feat: install grype from path (#111)
Browse files Browse the repository at this point in the history
Enable a path to a local build of grype to be specified, rather than only being
able to install from pushed commits or released versions.

Signed-off-by: Will Murphy <[email protected]>
  • Loading branch information
willmurphyscode authored Aug 14, 2023
1 parent 09fbfda commit 34d2530
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 25 deletions.
127 changes: 102 additions & 25 deletions src/yardstick/tool/grype.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import atexit
import hashlib
import json
import logging
import os
Expand Down Expand Up @@ -144,7 +145,7 @@ def _install_from_git(
abspath = os.path.abspath(path)
if not tool_exists:
cls._run_go_build(
abspath=abspath,
abs_install_dir=abspath,
repo_path=repo_path,
description=description,
binpath=path,
Expand All @@ -154,25 +155,81 @@ def _install_from_git(

return Grype(path=path, version_detail=description, **kwargs)

@classmethod
def _local_build_version_suffix(cls, src_path: str) -> str:
src_path = os.path.abspath(os.path.expanduser(src_path))
git_desc = ""
diff_digest = "clean"
try:
repo = git.Repo(src_path)
except:
logging.error(f"failed to open existing grype repo at {src_path!r}")
raise
git_desc = repo.git.describe("--tags", "--always", "--long", "--dirty")
if repo.is_dirty():
hash_obj = hashlib.sha1()
for untracked in repo.untracked_files:
hash_obj.update(cls._hash_file(os.path.join(repo.working_dir, untracked)).encode())
hash_obj.update(repo.git.diff("HEAD").encode())
diff_digest = hash_obj.hexdigest()[:8]
return f"{git_desc}-{diff_digest}"

@classmethod
def _hash_file(cls, path: str) -> str:
hash_obj = hashlib.sha1()
with open(path, "rb") as f:
while True:
data = f.read(4096)
if not data:
break
hash_obj.update(data)
return hash_obj.hexdigest()

@classmethod
def _install_from_path(
cls,
path: Optional[str],
src_path: str,
**kwargs,
) -> "Grype":
# get the description and head ref from the repo
src_repo_path = os.path.abspath(os.path.expanduser(src_path))
build_version = cls._local_build_version_suffix(src_repo_path)
logging.debug(f"installing grype from path={src_repo_path!r}")
logging.debug(f"installing grype to path={path!r}")
if not path:
path = tempfile.mkdtemp()
atexit.register(shutil.rmtree, path)
dest_path = os.path.join(path.replace("path:", ""), build_version, "local_install")
os.makedirs(dest_path, exist_ok=True)
cls._run_go_build(
abs_install_dir=os.path.abspath(dest_path),
description=f"{path}:{build_version}",
repo_path=src_repo_path,
binpath=dest_path,
)

return Grype(path=dest_path, **kwargs)

@staticmethod
def _run_go_build(
abspath: str,
abs_install_dir: str,
repo_path: str,
description: str,
binpath: str,
version_ref: str = "github.com/anchore/grype/internal/version.version",
):
logging.debug(f"installing grype via build to {abspath!r}")
logging.debug(f"installing grype via build to {abs_install_dir!r}")

main_pkg_path = "./cmd/grype"
if not os.path.exists(os.path.join(repo_path, "cmd", "grype", "main.go")):
# support legacy installations, when the main.go was in the root of the repo
main_pkg_path = "."

c = f"go build -ldflags \"-w -s -extldflags '-static' -X {version_ref}={description}\" -o {abspath} {main_pkg_path}"
c = f"go build -ldflags \"-w -s -extldflags '-static' -X {version_ref}={description}\" -o {abs_install_dir} {main_pkg_path}"
logging.debug(f"running {c!r}")

e = {"GOBIN": abspath, "CGO_ENABLED": "0"}
e = {"GOBIN": abs_install_dir, "CGO_ENABLED": "0"}
e.update(os.environ)

subprocess.check_call(
Expand All @@ -185,6 +242,24 @@ def _run_go_build(

os.chmod(f"{binpath}/grype", 0o755)

@classmethod
def _get_latest_version_from_github(cls) -> str:
headers = {}
if os.environ.get("GITHUB_TOKEN") is not None:
headers["Authorization"] = "Bearer " + os.environ.get("GITHUB_TOKEN")

response = requests.get(
"https://api.github.com/repos/anchore/grype/releases/latest",
headers=headers,
)

if response.status_code >= 400:
logging.error(f"error while fetching latest grype version: {response.status_code}: {response.reason} {response.text}")

response.raise_for_status()

return response.json()["name"]

# pylint: disable=too-many-arguments
@classmethod
def install(
Expand Down Expand Up @@ -217,25 +292,8 @@ def install(
version = cls._latest_version_from_github
logging.info(f"latest grype release found (cached) is {version}")
else:
headers = {}
if os.environ.get("GITHUB_TOKEN") is not None:
headers["Authorization"] = "Bearer " + os.environ.get("GITHUB_TOKEN")

response = requests.get(
"https://api.github.com/repos/anchore/grype/releases/latest",
headers=headers,
)

if response.status_code >= 400:
logging.error(
f"error while fetching latest grype version: {response.status_code}: {response.reason} {response.text}"
)

response.raise_for_status()

version = response.json()["name"]
version = cls._get_latest_version_from_github()
cls._latest_version_from_github = version

path = os.path.join(os.path.dirname(path), version)
logging.info(f"latest grype release found is {version}")

Expand All @@ -246,10 +304,29 @@ def install(
version,
):
tool_obj = cls._install_from_installer(
version=version, path=path, use_cache=use_cache, profile=grype_profile, **kwargs
version=version,
path=path,
use_cache=use_cache,
profile=grype_profile,
**kwargs,
)
elif version.startswith("path:"):
tool_obj = cls._install_from_path(
path=path,
src_path=version.removeprefix("path:"),
version=version.removeprefix("path:"),
use_cache=use_cache,
profile=grype_profile,
**kwargs,
)
else:
tool_obj = cls._install_from_git(version=version, path=path, use_cache=use_cache, profile=grype_profile, **kwargs)
tool_obj = cls._install_from_git(
version=version,
path=path,
use_cache=use_cache,
profile=grype_profile,
**kwargs,
)

# always update the DB, raise exception on failure
if db_import_path:
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/tool/test_grype.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,26 @@ def test_grype_no_profile():
tool = Grype(path="test-path")
tool.capture(image="test-image", tool_input=None)
assert check_output.call_args.args[0] == ["test-path/grype", "-o", "json", "test-image"]


def test_install_from_path():
with mock.patch("subprocess.check_call") as check_call, mock.patch("git.Repo") as repo, mock.patch(
"os.path.exists"
) as exists, mock.patch("os.makedirs") as makedirs, mock.patch("os.chmod") as chmod:
check_call.return_value = bytes("test-output", "utf-8")
exists.return_value = True
fake_repo = mock.Mock()
fake_repo.git = mock.Mock()
fake_repo.untracked_files = []
git_describe_val = "v0.65.1-1-g74a7a67-dirty"
hash_of_git_diff = "a29864cf5600b481056b6fa30a21cdbabc15287d"[:8]
fake_repo.git.describe.return_value = git_describe_val
fake_repo.git.diff.return_value = "test-diff" # hash is 'a29864cf5600b481056b6fa30a21cdbabc15287d'
repo.return_value = fake_repo
version_str = "path:/where/grype/is/cloned"
normalized_version_str = version_str.replace("/", "_").removeprefix("path:")
expected_grype_path = (
f".yardstick/tools/grype/{normalized_version_str}/{git_describe_val}-{hash_of_git_diff}/local_install"
)
tool = Grype.install(version=version_str, path=".yardstick/tools/grype/path:_where_grype_is_cloned", update_db=False)
assert tool.path == expected_grype_path

0 comments on commit 34d2530

Please sign in to comment.