diff --git a/.github/workflows/validations.yaml b/.github/workflows/validations.yaml index 518336f..e246d46 100644 --- a/.github/workflows/validations.yaml +++ b/.github/workflows/validations.yaml @@ -11,7 +11,7 @@ permissions: env: PYTHON_VERSION: "3.11" - POETRY_VERSION: "1.3.2" + POETRY_VERSION: "1.8.3" jobs: diff --git a/src/yardstick/__init__.py b/src/yardstick/__init__.py index 4cf2c19..822e623 100644 --- a/src/yardstick/__init__.py +++ b/src/yardstick/__init__.py @@ -1,7 +1,18 @@ import logging from typing import Callable, Optional -from . import arrange, artifact, capture, cli, comparison, label, store, tool, utils +from . import ( + arrange, + artifact, + capture, + cli, + comparison, + label, + store, + tool, + validate, + utils, +) __all__ = [ "arrange", @@ -12,6 +23,7 @@ "label", "store", "tool", + "validate", "utils", ] diff --git a/src/yardstick/artifact.py b/src/yardstick/artifact.py index bf16b4c..0fa3d4a 100644 --- a/src/yardstick/artifact.py +++ b/src/yardstick/artifact.py @@ -118,6 +118,7 @@ class ScanConfiguration: image_digest: str tool_name: str tool_version: str + tool_label: str | None = None image_tag: str = "" timestamp: datetime.datetime | None = field( default=None, @@ -205,6 +206,7 @@ def new( tool_name=tool_obj.id, tool_version=tool_obj.version, timestamp=timestamp, + tool_label=label, ) diff --git a/src/yardstick/cli/cli.py b/src/yardstick/cli/cli.py index fcc212f..69744a8 100644 --- a/src/yardstick/cli/cli.py +++ b/src/yardstick/cli/cli.py @@ -8,7 +8,7 @@ import yaml from yardstick import store -from yardstick.cli import config, label, result +from yardstick.cli import config, label, result, validate @click.option("--verbose", "-v", default=False, help="show logs", is_flag=True) @@ -126,5 +126,6 @@ def version(_: config.Application): print(f"{d.name} {d.version} ({d.locate_file(d.name).parent})") +cli.add_command(validate.validate) cli.add_command(result.group) cli.add_command(label.group) diff --git a/src/yardstick/cli/config.py b/src/yardstick/cli/config.py index 4881d2d..3f5f6f3 100644 --- a/src/yardstick/cli/config.py +++ b/src/yardstick/cli/config.py @@ -9,7 +9,7 @@ import yaml from dataclass_wizard import asdict, fromdict # type: ignore[import] -from yardstick import artifact +from yardstick import artifact, validate from yardstick.store import config as store_config DEFAULT_CONFIGS = ( @@ -115,11 +115,17 @@ def parse_oci_reference(image: str) -> tuple[str, str, str, str, str]: return host, path, repository, tag, digest +@dataclass() +class Validation(validate.GateConfig): + name: str = "default" + + @dataclass() class ResultSet: description: str = "" declared: list[artifact.ScanRequest] = field(default_factory=list) matrix: ScanMatrix = field(default_factory=ScanMatrix) + validations: list[Validation] = field(default_factory=list) def images(self) -> list[str]: return self.matrix.images + [req.image for req in self.declared] @@ -151,6 +157,34 @@ class Application: default_max_year: int | None = None derive_year_from_cve_only: bool = False + def max_year_for_any_result_set(self, result_sets: list[str]) -> int | None: + years = [] + for result_set in result_sets: + m = self.max_year_for_result_set(result_set) + if m is not None: + years.append(m) + + if not years: + return None + + return max(years) + + def max_year_for_result_set(self, result_set: str) -> int | None: + """return the max year needed by any validation on the result set, or default_max_year""" + rs = self.result_sets.get(result_set, None) + years = [] + if rs is not None: + for gate in rs.validations: + if gate.max_year is not None: + years.append(gate.max_year) + elif self.default_max_year is not None: + years.append(self.default_max_year) + + if years: + return max(years) + + return self.default_max_year + def clean_dict_keys(d): new = {} diff --git a/src/yardstick/cli/validate.py b/src/yardstick/cli/validate.py new file mode 100644 index 0000000..e20d05b --- /dev/null +++ b/src/yardstick/cli/validate.py @@ -0,0 +1,277 @@ +import re +import sys + +import click +from tabulate import tabulate + +import yardstick +from yardstick import store +from yardstick import validate as val +from yardstick.cli import config, display +from yardstick.validate import Gate, GateInputDescription + + +class bcolors: + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + RESET = "\033[0m" + + +if not sys.stdout.isatty(): + bcolors.HEADER = "" + bcolors.OKBLUE = "" + bcolors.OKCYAN = "" + bcolors.OKGREEN = "" + bcolors.WARNING = "" + bcolors.FAIL = "" + bcolors.BOLD = "" + bcolors.UNDERLINE = "" + bcolors.RESET = "" + + +@click.command() +@click.pass_obj +@click.option( + "--image", + "-i", + "images", + multiple=True, + help="filter down to one or more images to validate with (don't use the full result set)", +) +@click.option( + "--label-comparison", + "-l", + "always_run_label_comparison", + is_flag=True, + help="run label comparison irregardless of relative comparison results", +) +@click.option( + "--breakdown-by-ecosystem", + "-e", + is_flag=True, + help="show label comparison results broken down by ecosystem", +) +@click.option( + "--verbose", "-v", "verbosity", count=True, help="show details of all comparisons" +) +@click.option( + "--result-set", + "-r", + "result_sets", + multiple=True, + default=[], + help="the result set to use for the quality gate", +) +@click.option( + "--all", + "all_result_sets", + is_flag=True, + default=False, + help="validate all known result sets", +) +def validate( + cfg: config.Application, + images: list[str], + always_run_label_comparison: bool, + breakdown_by_ecosystem: bool, + verbosity: int, + result_sets: list[str], + all_result_sets: bool, +): + # TODO: don't artificially inflate logging; figure out what to print + setup_logging(verbosity + 3) + if ( + all_result_sets and result_sets and len(result_sets) > 0 + ): # default result set will be present anyway + raise ValueError( + f"cannot pass --all and -r / --result-set: {all_result_sets} {result_sets}" + ) + + if all_result_sets: + result_sets = [r for r in cfg.result_sets.keys()] + + if not result_sets: + raise ValueError( + "must pass --result-set / -r at least once or --all to validate all result sets" + ) + + # let's not load any more labels than we need to, base this off of the images we're validating + if not images: + unique_images = set() + for r in result_sets: + result_set_obj = store.result_set.load(name=r) + for state in result_set_obj.state: + if state and state.config and state.config.image: + unique_images.add(state.config.image) + images = sorted(list(unique_images)) + + click.echo("Loading label entries...", nl=False) + label_entries = store.labels.load_for_image( + images, year_max_limit=cfg.max_year_for_any_result_set(result_sets) + ) + click.echo(f"done! {len(label_entries)} entries loaded") + + gates = [] + for result_set in result_sets: + rs_config = cfg.result_sets[result_set] + for gate_config in rs_config.validations: + if gate_config.max_year is None: + gate_config.max_year = cfg.default_max_year + + click.echo( + f"{bcolors.HEADER}{bcolors.BOLD}Validating with {result_set!r}{bcolors.RESET}" + ) + new_gates = val.validate_result_set( + gate_config, + result_set, + images=images, + always_run_label_comparison=always_run_label_comparison, + verbosity=verbosity, + label_entries=label_entries, + ) + for gate in new_gates: + show_results_used(gate.input_description) + show_delta_commentary(gate) + + gates.extend(new_gates) + click.echo() + + if breakdown_by_ecosystem: + click.echo( + f"{bcolors.HEADER}Breaking down label comparison by ecosystem performance...{bcolors.RESET}", + ) + results_by_image, label_entries, stats = ( + yardstick.compare_results_against_labels_by_ecosystem( + result_set=result_set, + year_max_limit=cfg.max_year_for_result_set(result_set), + label_entries=label_entries, + ) + ) + display.labels_by_ecosystem_comparison( + results_by_image, + stats, + show_images_used=False, + ) + click.echo() + + failure = not all([gate.passed() for gate in gates]) + if failure: + click.echo("Reasons for quality gate failure:") + for gate in gates: + for reason in gate.reasons: + click.echo(f" - {reason}") + + if failure: + click.echo() + click.echo(f"{bcolors.FAIL}{bcolors.BOLD}Quality gate FAILED{bcolors.RESET}") + sys.exit(1) + else: + click.echo( + f"{bcolors.OKGREEN}{bcolors.BOLD}Quality gate passed!{bcolors.RESET}" + ) + + +def setup_logging(verbosity: int): + # pylint: disable=redefined-outer-name, import-outside-toplevel + import logging.config + + if verbosity in [0, 1, 2]: + log_level = "WARN" + elif verbosity == 3: + log_level = "INFO" + else: + log_level = "DEBUG" + + logging.config.dictConfig( + { + "version": 1, + "formatters": { + "standard": { + # [%(module)s.%(funcName)s] + "format": "%(asctime)s [%(levelname)s] %(message)s", + "datefmt": "", + }, + }, + "handlers": { + "default": { + "level": log_level, + "formatter": "standard", + "class": "logging.StreamHandler", + "stream": "ext://sys.stderr", + }, + }, + "loggers": { + "": { # root logger + "handlers": ["default"], + "level": log_level, + }, + }, + } + ) + + +def show_delta_commentary(gate: Gate): + if not gate.deltas: + click.echo("No differences found between tooling (with labels)") + return + + header_row = ["TOOL PARTITION", "PACKAGE", "VULNERABILITY", "LABEL", "COMMENTARY"] + + all_rows = [] + for delta in gate.deltas: + color = "" + if delta.is_improved: + color = bcolors.OKBLUE + elif delta.is_improved is not None and not delta.is_improved: + color = bcolors.FAIL + all_rows.append( + [ + f"{color}{delta.tool} ONLY{bcolors.RESET}", + f"{color}{delta.package_name}@{delta.package_version}{bcolors.RESET}", + f"{color}{delta.vulnerability_id}{bcolors.RESET}", + f"{color}{delta.label}{bcolors.RESET}", + f"{delta.commentary}", + ] + ) + + def escape_ansi(line): + ansi_escape = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]") + return ansi_escape.sub("", line) + + # sort but don't consider ansi escape codes + all_rows = sorted( + all_rows, key=lambda x: escape_ansi(str(x[0] + x[1] + x[2] + x[3])) + ) + click.echo("Match differences between tooling (with labels):") + indent = " " + click.echo( + indent + + tabulate( + [header_row] + all_rows, + tablefmt="plain", + ).replace("\n", "\n" + indent) + + "\n" + ) + + +def show_results_used(input_description: GateInputDescription): + if not input_description: + return + click.echo(f" Results used for image {input_description.image}:") + for idx, description in enumerate(input_description.configs): + branch = "├──" + if idx == len(input_description.configs) - 1: + branch = "└──" + label = " " + if description.tool_label and len(description.tool_label) > 0: + label = f" ({description.tool_label}) " + click.echo( + f" {branch} {description.id} : {description.tool}{label} against {input_description.image}" + ) + click.echo() diff --git a/src/yardstick/comparison.py b/src/yardstick/comparison.py index 1226bd1..dee6164 100644 --- a/src/yardstick/comparison.py +++ b/src/yardstick/comparison.py @@ -813,7 +813,6 @@ def of_results_against_label( comparisons_by_result_id[result.ID] = comp comparisons.append(comp) - # { image : {tool@version : F1 score or "I" if impractical } } stats_by_image_tool_pair = ImageToolLabelStats.new(comparisons) return comparisons_by_result_id, stats_by_image_tool_pair diff --git a/src/yardstick/validate/__init__.py b/src/yardstick/validate/__init__.py new file mode 100644 index 0000000..59fef25 --- /dev/null +++ b/src/yardstick/validate/__init__.py @@ -0,0 +1,14 @@ +from .delta import DeltaType, Delta +from .gate import Gate, GateConfig, GateInputResultConfig, GateInputDescription +from .validate import validate_image, validate_result_set + +__all__ = [ + "GateConfig", + "GateInputResultConfig", + "GateInputDescription", + "DeltaType", + "Delta", + "Gate", + "validate_image", + "validate_result_set", +] diff --git a/src/yardstick/validate/delta.py b/src/yardstick/validate/delta.py new file mode 100644 index 0000000..e0ca733 --- /dev/null +++ b/src/yardstick/validate/delta.py @@ -0,0 +1,99 @@ +import enum +from dataclasses import dataclass + +from yardstick import artifact, comparison + + +class DeltaType(enum.Enum): + Unknown = "Unknown" + FixedFalseNegative = "FixedFalseNegative" + FixedFalsePositive = "FixedFalsePositive" + NewFalseNegative = "NewFalseNegative" + NewFalsePositive = "NewFalsePositive" + + +@dataclass +class Delta: + tool: str + package_name: str + package_version: str + vulnerability_id: str + added: bool + label: str | None = None + + @property + def is_improved(self) -> bool | None: + if self.outcome in {DeltaType.FixedFalseNegative, DeltaType.FixedFalsePositive}: + return True + if self.outcome in {DeltaType.NewFalseNegative, DeltaType.NewFalsePositive}: + return False + return None + + @property + def commentary(self) -> str: + commentary = "" + # if self.is_improved and self.label == artifact.Label.TruePositive.name: + if self.outcome == DeltaType.FixedFalseNegative: + commentary = "(this is a new TP 🙌)" + elif self.outcome == DeltaType.FixedFalsePositive: + commentary = "(got rid of a former FP 🙌)" + elif self.outcome == DeltaType.NewFalsePositive: + commentary = "(this is a new FP 😱)" + elif self.outcome == DeltaType.NewFalseNegative: + commentary = "(this is a new FN 😱)" + + return commentary + + @property + def outcome(self) -> DeltaType: + # TODO: this would be better handled post init and set I think + if not self.label: + return DeltaType.Unknown + + if not self.added: + # the tool which found the unique result is the reference tool... + if self.label == artifact.Label.TruePositive.name: + # drats! we missed a case (this is a new FN) + return DeltaType.NewFalseNegative + elif artifact.Label.FalsePositive.name in self.label: + # we got rid of a FP! ["hip!", "hip!"] + return DeltaType.FixedFalsePositive + else: + # the tool which found the unique result is the current tool... + if self.label == artifact.Label.TruePositive.name: + # highest of fives! we found a new TP that the previous tool release missed! + return DeltaType.FixedFalseNegative + elif artifact.Label.FalsePositive.name in self.label: + # welp, our changes resulted in a new FP... not great, maybe not terrible? + return DeltaType.NewFalsePositive + + return DeltaType.Unknown + + +def compute_deltas( + comparisons_by_result_id: dict[str, comparison.AgainstLabels], + reference_tool: str, + relative_comparison: comparison.ByPreservedMatch, +): + deltas = [] + for result in relative_comparison.results: + label_comparison = comparisons_by_result_id[result.ID] + for unique_match in relative_comparison.unique[result.ID]: + labels = label_comparison.labels_by_match[unique_match.ID] + if not labels: + label = "(unknown)" + elif len(set(labels)) > 1: + label = ", ".join([la.name for la in labels]) + else: + label = labels[0].name + + delta = Delta( + tool=result.config.tool, + package_name=unique_match.package.name, + package_version=unique_match.package.version, + vulnerability_id=unique_match.vulnerability.id, + added=result.config.tool != reference_tool, + label=label, + ) + deltas.append(delta) + return deltas diff --git a/src/yardstick/validate/gate.py b/src/yardstick/validate/gate.py new file mode 100644 index 0000000..bc12ce8 --- /dev/null +++ b/src/yardstick/validate/gate.py @@ -0,0 +1,105 @@ +from dataclasses import dataclass, field, InitVar +from typing import Optional + +from yardstick import comparison +from yardstick.validate.delta import Delta + + +@dataclass +class GateConfig: + max_f1_regression: float = 0.0 + max_new_false_negatives: int = 0 + max_unlabeled_percent: int = 0 + max_year: int | None = None + reference_tool_label: str = "reference" + candidate_tool_label: str = "candidate" + # only consider matches from these namespaces when judging results + allowed_namespaces: list[str] = field(default_factory=list) + # fail this gate unless all of these namespaces are present + required_namespaces: list[str] = field(default_factory=list) + fail_on_empty_match_set: bool = True + + +@dataclass +class GateInputResultConfig: + id: str + tool: str + tool_label: str + + +@dataclass +class GateInputDescription: + image: str + configs: list[GateInputResultConfig] = field(default_factory=list) + + +@dataclass +class Gate: + reference_comparison: InitVar[Optional[comparison.LabelComparisonSummary]] + candidate_comparison: InitVar[Optional[comparison.LabelComparisonSummary]] + + config: GateConfig + + input_description: GateInputDescription + reasons: list[str] = field(default_factory=list) + deltas: list[Delta] = field(default_factory=list) + + def __post_init__( + self, + reference_comparison: Optional[comparison.LabelComparisonSummary], + candidate_comparison: Optional[comparison.LabelComparisonSummary], + ): + if not reference_comparison or not candidate_comparison: + return + + reasons = [] + + reference_f1_score = reference_comparison.f1_score + current_f1_score = candidate_comparison.f1_score + if current_f1_score < reference_f1_score - self.config.max_f1_regression: + reasons.append( + f"current F1 score is lower than the latest release F1 score: candidate_score={current_f1_score:0.2f} reference_score={reference_f1_score:0.2f} image={self.input_description.image}" + ) + + if ( + candidate_comparison.indeterminate_percent + > self.config.max_unlabeled_percent + ): + reasons.append( + f"current indeterminate matches % is greater than {self.config.max_unlabeled_percent}%: candidate={candidate_comparison.indeterminate_percent:0.2f}% image={self.input_description.image}" + ) + + reference_fns = reference_comparison.false_negatives + candidate_fns = candidate_comparison.false_negatives + if candidate_fns > reference_fns + self.config.max_new_false_negatives: + reasons.append( + f"current false negatives is greater than the latest release false negatives: candidate={candidate_fns} reference={reference_fns} image={self.input_description.image}" + ) + + self.reasons = reasons + + def passed(self) -> bool: + return len(self.reasons) == 0 + + @classmethod + def failing(cls, reasons: list[str], input_description: GateInputDescription): + """failing bypasses Gate's normal validation calculating and returns a + gate that is failing for the reasons given.""" + return cls( + reference_comparison=None, + candidate_comparison=None, + config=GateConfig(), + reasons=reasons, + input_description=input_description, + ) + + @classmethod + def passing(cls, input_description: GateInputDescription): + """passing bypasses a Gate's normal validation and returns a gate that is passing.""" + return cls( + reference_comparison=None, + candidate_comparison=None, + config=GateConfig(), + reasons=[], # a gate with no reason to fail is considered passing + input_description=input_description, + ) diff --git a/src/yardstick/validate/validate.py b/src/yardstick/validate/validate.py new file mode 100644 index 0000000..0cac180 --- /dev/null +++ b/src/yardstick/validate/validate.py @@ -0,0 +1,308 @@ +import logging +import sys +from typing import Sequence, Optional, Callable + +import yardstick +from yardstick import artifact, store, utils +from yardstick.cli import display +from yardstick.validate.delta import compute_deltas +from yardstick.validate.gate import ( + GateInputDescription, + GateInputResultConfig, + GateConfig, + Gate, +) + + +def guess_tool_orientation(tools: list[str]): + """ + Given a pair of tools, guess which is latest version, and which is the one + being compared to the latest version. This should only be used as a fallback. + Instead, specify reference tool label and candidate tool label in validations. + Returns (latest_tool, current_tool) + """ + if len(tools) != 2: + raise RuntimeError("expected 2 tools, got %s" % tools) + tool_a, tool_b = sorted(tools) + if tool_a == tool_b: + raise ValueError("latest release tool and current tool are the same") + if tool_a.endswith("latest"): + return tool_a, tool_b + elif tool_b.endswith("latest"): + return tool_b, tool_a + + if "@path:" in tool_a and "@path:" not in tool_b: + # tool_a is a local build, so compare it against tool_b + return tool_b, tool_a + + if "@path:" in tool_b and "@path:" not in tool_a: + # tool_b is a local build, so compare it against tool_a + return tool_a, tool_b + + return tool_a, tool_b + + +class bcolors: + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + RESET = "\033[0m" + + +if not sys.stdout.isatty(): + bcolors.HEADER = "" + bcolors.OKBLUE = "" + bcolors.OKCYAN = "" + bcolors.OKGREEN = "" + bcolors.WARNING = "" + bcolors.FAIL = "" + bcolors.BOLD = "" + bcolors.UNDERLINE = "" + bcolors.RESET = "" + + +def results_used( + image: str, results: Sequence[artifact.ScanResult] +) -> GateInputDescription: + return GateInputDescription( + image=image, + configs=[ + GateInputResultConfig( + id=result.ID, + tool=result.config.tool, + tool_label=result.config.tool_label, + ) + for result in results + ], + ) + + +def validate_result_set( + gate_config: GateConfig, + result_set: str, + images: list[str], + always_run_label_comparison: bool, + verbosity: int, + label_entries: Optional[list[artifact.LabelEntry]] = None, +) -> list[Gate]: + result_set_obj = store.result_set.load(name=result_set) + + if gate_config.allowed_namespaces: + m_filter = namespace_filter(gate_config.allowed_namespaces) + logging.info( + f"only considering matches from allowed namespaces: {' '.join(gate_config.allowed_namespaces)}" + ) + else: + m_filter = None + + ret = [] + for image, result_states in result_set_obj.result_state_by_image.items(): + if images and image not in images: + logging.info( + f"Skipping image {image!r} because --images is passed but does not include it" + ) + continue + tools = ", ".join([s.request.tool for s in result_states]) + logging.info(f"Testing image: {image!r} with {tools!r}") + + gate = validate_image( + image=image, + gate_config=gate_config, + descriptions=[s.config.path for s in result_states if s.config is not None], + always_run_label_comparison=always_run_label_comparison, + verbosity=verbosity, + label_entries=label_entries, + match_filter=m_filter, + ) + ret.append(gate) + + return ret + + +def namespace_filter( + namespaces: list[str], +) -> Callable[[list[artifact.Match]], list[artifact.Match]]: + include = set(namespaces) + + def match_filter(matches: list[artifact.Match]) -> list[artifact.Match]: + result = [] + for match in matches: + if utils.dig(match.fullentry, "vulnerability", "namespace") in include: + result.append(match) + return result + + return match_filter + + +def validate_image( + image: str, + gate_config: GateConfig, + descriptions: list[str], + always_run_label_comparison: bool, + verbosity: int, + label_entries: Optional[list[artifact.LabelEntry]] = None, + match_filter: Callable[[list[artifact.Match]], list[artifact.Match]] | None = None, +) -> Gate: + """ + Compare the results of two different vulnerability scanner configurations with each other, + and if necessary with label information. Returns a pass-fail Gate based on + the comparison, which fails if the candidate tool results are worse than the reference + tool results, as specified by the `gate_config`. + + Parameters + ---------- + image : str + The identifier or name of the image being analyzed. + gate_config : GateConfig + The configuration object that specifies comparison thresholds, tool labels, + and allowed/required namespaces. + descriptions : list[str] + A list of descriptions or metadata associated with the image results for the comparison. + always_run_label_comparison : bool + If True, run comparison against labels even if no differences are found between the + two tools. + verbosity : int + Level of verbosity for displaying comparison details. A higher value means more detailed output. + label_entries : Optional[list[artifact.LabelEntry]], optional + To save time, pass label entries. If present, will be used instead of loading from disk. + match_filter : Callable[[list[artifact.Match]], list[artifact.Match]] | None, optional + An optional filter function to refine the set of matches used in the comparison, by default None. + Useful for filtering by namespace, for example. + + Returns + ------- + Gate + A `Gate` object that represents the pass/fail status based on the comparison. If the candidate + tool results are worse than the reference tool according to the `gate_config`, the gate will fail. + Otherwise, the gate will pass. + + Raises + ------ + RuntimeError + If an unexpected number of results (other than 2) are found during the label comparison. + """ + # Load the relative comparison between the reference and candidate tool runs, without label info. + # This optimizes performance by allowing early exit if there are no matches or identical results. + relative_comparison = yardstick.compare_results( + descriptions=descriptions, + year_max_limit=gate_config.max_year, + matches_filter=match_filter, + ) + + # show the relative comparison results + if verbosity > 0: + details = verbosity > 1 + display.preserved_matches( + relative_comparison, details=details, summary=True, common=False + ) + + if gate_config.fail_on_empty_match_set: + if not sum( + len(res.matches) if res.matches else 0 + for res in relative_comparison.results + ): + return Gate.failing( + reasons=[ + "gate configured to fail on empty matches, and no matches found", + ], + input_description=results_used(image, relative_comparison.results), + ) + + if not always_run_label_comparison and not sum( + [ + len(relative_comparison.unique[result.ID]) + for result in relative_comparison.results + ] + ): + return Gate.passing( + input_description=results_used(image, relative_comparison.results), + ) + + logging.info("Running comparison against labels...") + # Compare against labels. Because the reference tool configuration and the + # candidate tool configuration both found matches, and did not find the same + # set of matches, we need to compare to known-correct label data and do + # a little stats to determine whether candidate tool is better or the same + # as reference tool. + results, label_entries, comparisons_by_result_id, stats_by_image_tool_pair = ( + yardstick.compare_results_against_labels( + descriptions=descriptions, + year_max_limit=gate_config.max_year, + label_entries=label_entries, + matches_filter=match_filter, + ) + ) + + if verbosity > 0: + show_fns = verbosity > 1 + display.label_comparison( + results, + comparisons_by_result_id, + stats_by_image_tool_pair, + show_fns=show_fns, + show_summaries=True, + ) + + if len(results) != 2: + raise RuntimeError( + f"validate_image compares results of exactly 2 runs, but found {len(results)}" + ) + + candidate_tool, reference_tool = tool_designations( + gate_config.candidate_tool_label, [r.config for r in results] + ) + + # keep a list of differences between tools to summarize in UI + # not that this is different from the statistical comparison; + # deltas basically a UI/logging concern; the stats are a pass/fail concern. + deltas = compute_deltas( + comparisons_by_result_id, reference_tool, relative_comparison + ) + + reference_comparisons_by_images = { + comp.config.image: comp + for comp in comparisons_by_result_id.values() + if comp.config.tool == reference_tool + } + reference_comparison = reference_comparisons_by_images[image] + candidate_comparisons_by_images = { + comp.config.image: comp + for comp in comparisons_by_result_id.values() + if comp.config.tool == candidate_tool + } + candidate_comparison = candidate_comparisons_by_images[image] + return Gate( + reference_comparison=reference_comparison.summary, + candidate_comparison=candidate_comparison.summary, + config=gate_config, + input_description=results_used(image, relative_comparison.results), + deltas=deltas, + ) + + +def tool_designations( + candidate_tool_label: str, scan_configs: list[artifact.ScanConfiguration] +) -> tuple[str, str]: + reference_tool, candidate_tool = None, None + if not candidate_tool_label: + reference_tool, candidate_tool = guess_tool_orientation( + [config.tool for config in scan_configs], + ) + logging.warning( + f"guessed tool orientation reference:{reference_tool} candidate:{candidate_tool}" + ) + logging.warning( + "to avoid guessing, specify reference_tool_label and candidate_tool_label in validation config and re-capture result set" + ) + if scan_configs[0].tool_label == candidate_tool_label: + candidate_tool = scan_configs[0].tool + reference_tool = scan_configs[1].tool + elif scan_configs[1].tool_label == candidate_tool_label: + candidate_tool = scan_configs[1].tool + reference_tool = scan_configs[0].tool + return candidate_tool, reference_tool diff --git a/tests/unit/validate/__init__.py b/tests/unit/validate/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/validate/test_delta.py b/tests/unit/validate/test_delta.py new file mode 100644 index 0000000..50c9b08 --- /dev/null +++ b/tests/unit/validate/test_delta.py @@ -0,0 +1,221 @@ +import typing + +from yardstick.artifact import Label, Package +from yardstick.validate.delta import Delta, DeltaType, compute_deltas + +import pytest +from unittest.mock import MagicMock +from yardstick.comparison import AgainstLabels, ByPreservedMatch + + +@pytest.mark.parametrize( + "tool, package_name, package_version, vulnerability_id, added, label, expected_outcome, expected_is_improved, expected_commentary", + [ + ( + "scanner1", + "libc", + "2.29", + "CVE-2023-1234", + True, + Label.TruePositive.name, + DeltaType.FixedFalseNegative, + True, + "(this is a new TP 🙌)", + ), + ( + "scanner1", + "nginx", + "1.17", + "CVE-2023-0002", + False, + Label.FalsePositive.name, + DeltaType.FixedFalsePositive, + True, + "(got rid of a former FP 🙌)", + ), + ( + "scanner2", + "bash", + "5.0", + "CVE-2023-5678", + False, + Label.TruePositive.name, + DeltaType.NewFalseNegative, + False, + "(this is a new FN 😱)", + ), + ( + "scanner3", + "zlib", + "1.2.11", + "CVE-2023-8888", + True, + Label.FalsePositive.name, + DeltaType.NewFalsePositive, + False, + "(this is a new FP 😱)", + ), + ( + "scanner4", + "openssl", + "1.1.1", + "CVE-2023-0001", + True, + None, + DeltaType.Unknown, + None, + "", + ), + ], +) +def test_delta_properties( + tool, + package_name, + package_version, + vulnerability_id, + added, + label, + expected_outcome, + expected_is_improved, + expected_commentary, +): + """Test Delta properties is_improved, outcome, and commentary based on logical combinations.""" + + delta = Delta( + tool=tool, + package_name=package_name, + package_version=package_version, + vulnerability_id=vulnerability_id, + added=added, + label=label, + ) + + assert delta.outcome == expected_outcome + assert delta.is_improved == expected_is_improved + assert delta.commentary == expected_commentary + + +@pytest.fixture +def reference_result(): + """Fixture for creating a mock reference result.""" + return MagicMock( + name="reference_results", ID="reference", config=MagicMock(tool="reference") + ) + + +@pytest.fixture +def candidate_result(): + """Fixture for creating a mock candidate result.""" + return MagicMock( + name="candidate_results", ID="candidate", config=MagicMock(tool="candidate") + ) + + +@pytest.fixture +def comparisons_by_result_id(): + """Fixture for setting up comparisons with expected label data (source of truth).""" + comparison = { + # skip post init calculations on against labels, since + # we're setting the comparison results directly below + "reference": typing.cast(AgainstLabels, object.__new__(AgainstLabels)), + "candidate": typing.cast(AgainstLabels, object.__new__(AgainstLabels)), + } + comparison["reference"].labels_by_match = { + "match1": [Label.TruePositive], + "match2": [Label.TruePositive], + "match3": [Label.FalsePositive], + "match4": [Label.FalsePositive], + } + comparison["candidate"].labels_by_match = { + "match1": [Label.TruePositive], + "match2": [Label.TruePositive], + "match3": [Label.FalsePositive], + "match4": [Label.FalsePositive], + } + return comparison + + +@pytest.fixture +def relative_comparison(reference_result, candidate_result): + """Fixture for creating a mock relative comparison of reference and candidate.""" + match1 = MagicMock( + name="match1", + ID="match1", + package=Package(name="libc", version="2.29"), + vulnerability=MagicMock(id="CVE-2023-1234"), + ) + match2 = MagicMock( + name="match2", + ID="match2", + package=Package(name="nginx", version="1.17"), + vulnerability=MagicMock(id="CVE-2023-0002"), + ) + match3 = MagicMock( + name="match3", + ID="match3", + package=Package(name="openssl", version="1.1.1"), + vulnerability=MagicMock(id="CVE-2023-5678"), + ) + match4 = MagicMock( + name="match4", + ID="match4", + package=Package(name="zlib", version="1.2.11"), + vulnerability=MagicMock(id="CVE-2023-8888"), + ) + + result = ByPreservedMatch( + results=[reference_result, candidate_result], + ) + result.unique = { + "reference": [match2, match3], + "candidate": [match1, match4], + } + return result + + +def test_compute_deltas(comparisons_by_result_id, relative_comparison): + """Test compute_deltas with realistic comparisons between reference and candidate results.""" + deltas = compute_deltas( + comparisons_by_result_id=comparisons_by_result_id, + reference_tool="reference", + relative_comparison=relative_comparison, + ) + + expected_deltas = [ + Delta( + tool="reference", + package_name="nginx", + package_version="1.17", + vulnerability_id="CVE-2023-0002", + added=False, + label="TruePositive", + ), + Delta( + tool="reference", + package_name="openssl", + package_version="1.1.1", + vulnerability_id="CVE-2023-5678", + added=False, + label="FalsePositive", + ), + Delta( + tool="candidate", + package_name="libc", + package_version="2.29", + vulnerability_id="CVE-2023-1234", + added=True, + label="TruePositive", + ), + Delta( + tool="candidate", + package_name="zlib", + package_version="1.2.11", + vulnerability_id="CVE-2023-8888", + added=True, + label="FalsePositive", + ), + ] + + assert len(deltas) == len(expected_deltas) + for idx, actual in enumerate(deltas): + assert actual == expected_deltas[idx], f"unequal at {idx}" diff --git a/tests/unit/validate/test_gate.py b/tests/unit/validate/test_gate.py new file mode 100644 index 0000000..b60d400 --- /dev/null +++ b/tests/unit/validate/test_gate.py @@ -0,0 +1,97 @@ +from yardstick.validate import Gate, GateConfig, GateInputDescription, Delta +from yardstick import artifact, comparison + + +import pytest +from unittest.mock import MagicMock + + +@pytest.fixture +def mock_label_comparison(): + """Fixture to create a mock LabelComparisonSummary with defaults.""" + summary = MagicMock() + summary.f1_score = 0.9 + summary.false_negatives = 5 + summary.indeterminate_percent = 2.0 + return summary + + +@pytest.mark.parametrize( + "config, reference_summary, candidate_summary, expected_reasons", + [ + # Case 1: Candidate has a lower F1 score beyond the allowed threshold -> gate fails + ( + GateConfig( + max_f1_regression=0.1, + max_new_false_negatives=5, + max_unlabeled_percent=10, + ), + MagicMock(f1_score=0.9, false_negatives=5, indeterminate_percent=2.0), + MagicMock(f1_score=0.7, false_negatives=5, indeterminate_percent=2.0), + ["current F1 score is lower than the latest release F1 score"], + ), + # Case 2: Candidate has too many false negatives -> gate fails + ( + GateConfig( + max_f1_regression=0.1, + max_new_false_negatives=1, + max_unlabeled_percent=10, + ), + MagicMock(f1_score=0.9, false_negatives=5, indeterminate_percent=2.0), + MagicMock(f1_score=0.85, false_negatives=7, indeterminate_percent=2.0), + [ + "current false negatives is greater than the latest release false negatives" + ], + ), + # Case 3: Candidate has too high indeterminate percent -> gate fails + ( + GateConfig( + max_f1_regression=0.1, + max_new_false_negatives=5, + max_unlabeled_percent=5, + ), + MagicMock(f1_score=0.9, false_negatives=5, indeterminate_percent=2.0), + MagicMock(f1_score=0.85, false_negatives=5, indeterminate_percent=6.0), + ["current indeterminate matches % is greater than"], + ), + # Case 4: Candidate passes all thresholds -> gate passes (no reasons) + ( + GateConfig( + max_f1_regression=0.1, + max_new_false_negatives=5, + max_unlabeled_percent=10, + ), + MagicMock(f1_score=0.9, false_negatives=5, indeterminate_percent=2.0), + MagicMock(f1_score=0.85, false_negatives=5, indeterminate_percent=3.0), + [], + ), + ], +) +def test_gate(config, reference_summary, candidate_summary, expected_reasons): + """Parameterized test for the Gate class that checks different pass/fail conditions.""" + + # Create the Gate instance with the given parameters + gate = Gate( + reference_comparison=reference_summary, + candidate_comparison=candidate_summary, + config=config, + input_description=MagicMock(image="test_image"), + ) + + # Check that the reasons list matches the expected outcome + assert len(gate.reasons) == len(expected_reasons) + for reason, expected_reason in zip(gate.reasons, expected_reasons): + assert expected_reason in reason + + +def test_gate_failing(): + input_description = GateInputDescription(image="some-image", configs=[]) + gate = Gate.failing(["sample failure reason"], input_description) + assert not gate.passed() + assert gate.reasons == ["sample failure reason"] + + +def test_gate_passing(): + input_description = GateInputDescription(image="some-image", configs=[]) + gate = Gate.passing(input_description) + assert gate.passed() diff --git a/tests/unit/validate/test_validate.py b/tests/unit/validate/test_validate.py new file mode 100644 index 0000000..d719b4f --- /dev/null +++ b/tests/unit/validate/test_validate.py @@ -0,0 +1,276 @@ +# Sample images +from unittest.mock import patch, MagicMock + +import pytest + +from yardstick import comparison +from yardstick.artifact import ( + ScanResult, + ScanConfiguration, + Package, + Vulnerability, + LabelEntry, + Label, + Match, +) +from yardstick.validate import validate_image, GateConfig, Delta + + +@pytest.fixture() +def compare_results_no_matches(): + return MagicMock(results=[MagicMock(matches=[]), MagicMock(matches=[])]) + + +@pytest.fixture() +def compare_results_identical_matches(): + return MagicMock( + results=[ + MagicMock( + matches=[MagicMock()], + unique={}, + ), + MagicMock( + matches=[MagicMock()], + unique={}, + ), + ] + ) + + +@patch("yardstick.compare_results") +def test_validate_fail_on_empty_matches( + mock_compare_results, compare_results_no_matches +): + mock_compare_results.return_value = compare_results_no_matches + gate = validate_image( + "some image", + GateConfig(fail_on_empty_match_set=True), + descriptions=["some-str", "another-str"], + always_run_label_comparison=False, + verbosity=0, + ) + assert not gate.passed() + assert ( + "gate configured to fail on empty matches, and no matches found" in gate.reasons + ) + assert mock_compare_results.called_once_with( + descriptions=["some-str", "another-str"], + year_max_limit=2021, + matches_filter=None, + ) + + +@patch("yardstick.compare_results") +def test_validate_dont_fail_on_empty_matches( + mock_compare_results, compare_results_no_matches +): + mock_compare_results.return_value = compare_results_no_matches + gate = validate_image( + "some image", + GateConfig(fail_on_empty_match_set=False), + descriptions=["some-str", "another-str"], + always_run_label_comparison=False, + verbosity=0, + ) + assert gate.passed() + assert mock_compare_results.called_once_with( + descriptions=["some-str", "another-str"], + year_max_limit=2021, + matches_filter=None, + ) + + +@patch("yardstick.compare_results") +def test_validate_pass_early_identical_match_sets( + mock_compare_results, compare_results_identical_matches +): + mock_compare_results.return_value = compare_results_identical_matches + gate = validate_image( + "some image", + GateConfig(fail_on_empty_match_set=False), + descriptions=["some-str", "another-str"], + always_run_label_comparison=False, + verbosity=0, + ) + assert gate.passed() + assert mock_compare_results.called_once_with( + descriptions=["some-str", "another-str"], + year_max_limit=2021, + matches_filter=None, + ) + + +@pytest.fixture() +def reference_config(): + return ScanConfiguration( + image_repo="docker.io/anchore/test_images", + image_digest="f" * 64, + tool_name="grype", + tool_version="123", + tool_label="reference", + ID="reference-config-uuid", + ) + + +@pytest.fixture() +def candidate_config(): + return ScanConfiguration( + image_repo="docker.io/anchore/test_images", + image_digest="f" * 64, + tool_name="grype", + tool_version="1234", + tool_label="candidate", + ID="candidate-config-uuid", + ) + + +@pytest.fixture() +def matches(packages, vulns): + libc, nginx, openssl, zlib = packages + vuln1, vuln2, vuln3, vuln4 = vulns + match1 = Match( + package=libc, + vulnerability=vuln1, + ) + match2 = Match( + package=nginx, + vulnerability=vuln2, + ) + match3 = Match( + package=openssl, + vulnerability=vuln3, + ) + match4 = Match( + package=zlib, + vulnerability=vuln4, + ) + return [match1, match2, match3, match4] + + +@pytest.fixture() +def reference_results(reference_config, packages, matches): + match1, match2, match3, match4 = matches + return ScanResult( + config=reference_config, + matches=[match1, match2, match3], + packages=packages, + ) + + +@pytest.fixture() +def candidate_results(candidate_config, packages, matches): + match1, match2, match3, match4 = matches + return ScanResult( + config=candidate_config, + matches=[match1, match2, match3, match4], + packages=packages, + ) + + +@pytest.fixture() +def non_identical_results(reference_results, candidate_results): + return comparison.ByPreservedMatch(results=[reference_results, reference_results]) + + +@pytest.fixture() +def vulns(): + vuln1 = Vulnerability(id="CVE-2021-1234") + vuln2 = Vulnerability(id="CVE-2021-0002") + vuln3 = Vulnerability(id="CVE-2021-5678") + vuln4 = Vulnerability(id="CVE-2021-8888") + return vuln1, vuln2, vuln3, vuln4 + + +@pytest.fixture() +def packages(): + libc = Package(name="libc", version="2.29") + nginx = Package(name="nginx", version="1.17") + openssl = Package(name="openssl", version="1.1.1") + zlib = Package(name="zlib", version="1.2.11") + return [libc, nginx, openssl, zlib] + + +@pytest.fixture() +def deltas(): + return [ + MagicMock(spec=Delta), + MagicMock(spec=Delta), + ] + + +@pytest.fixture() +def label_entries(matches): + match1, match2, match3, match4 = matches + return [ + LabelEntry( + Label.TruePositive, + vulnerability_id=match1.vulnerability.id, + package=match1.package, + ), + LabelEntry( + Label.FalsePositive, + vulnerability_id=match2.vulnerability.id, + package=match2.package, + ), + LabelEntry( + Label.TruePositive, + vulnerability_id=match3.vulnerability.id, + package=match3.package, + ), + LabelEntry( + Label.TruePositive, + vulnerability_id=match4.vulnerability.id, + package=match4.package, + ), + ] + + +@pytest.fixture() +def label_comparison_results(reference_results, candidate_results, label_entries): + compare_configuration = { + "year_max_limit": 2021, + "year_from_cve_only": True, + } + return ( + [reference_results, candidate_results], + [], # label_entries is not used + { + reference_results.ID: comparison.AgainstLabels( + result=reference_results, + label_entries=label_entries, + lineage=[], + compare_configuration=compare_configuration, + ), + candidate_results.ID: comparison.AgainstLabels( + result=candidate_results, + label_entries=label_entries, + lineage=[], + compare_configuration=compare_configuration, + ), + }, + MagicMock(name="stats_by_image_tool_pair"), + ) + + +@patch("yardstick.compare_results") +@patch("yardstick.compare_results_against_labels") +@patch("yardstick.validate.delta.compute_deltas") +def test_validate_non_identical_match_sets( + mock_compute_deltas, + mock_compare_against_labels, + mock_compare_results, + non_identical_results, + deltas, + label_comparison_results, +): + mock_compare_results.return_value = non_identical_results + mock_compare_against_labels.return_value = label_comparison_results + mock_compute_deltas.return_value = deltas + gate = validate_image( + f"docker.io/anchore/test_images@{'f' * 64}", + GateConfig(fail_on_empty_match_set=False), + descriptions=["some-str", "another-str"], + always_run_label_comparison=False, + verbosity=0, + ) + assert gate.passed()