diff --git a/README.md b/README.md index 915613b6ce..ffc8c8efae 100644 --- a/README.md +++ b/README.md @@ -455,6 +455,8 @@ Output: specify multiple output formats by using comma (',') as a separator note: don't use spaces between comma (',') and the output formats. -c CVSS, --cvss CVSS minimum CVSS score (as integer in range 0 to 10) to report (default: 0) + --metrics + check for metrics (e.g., EPSS) from found cves --epss-percentile minimum EPSS percentile of CVE range between 0 to 100 to report (input value can also be floating point) (default: 0) --epss-probability diff --git a/cve_bin_tool/cli.py b/cve_bin_tool/cli.py index 43f532aef8..b1a619d28f 100644 --- a/cve_bin_tool/cli.py +++ b/cve_bin_tool/cli.py @@ -269,6 +269,12 @@ def main(argv=None): help="minimum CVE severity to report (default: low)", default="low", ) + output_group.add_argument( + "--metrics", + action="store_true", + help="check for metrics (e.g., EPSS) from found cves", + default=False, + ) output_group.add_argument( "--epss-percentile", action="store", @@ -893,6 +899,7 @@ def main(argv=None): with CVEScanner( score=score, + check_metrics=args["metrics"], epss_percentile=epss_percentile, epss_probability=epss_probability, check_exploits=args["exploits"], @@ -1017,6 +1024,7 @@ def main(argv=None): merge_report=merged_reports, affected_versions=args["affected_versions"], exploits=args["exploits"], + metrics=args["metrics"], detailed=args["detailed"], vex_filename=args["vex"], sbom_filename=args["sbom_output"], diff --git a/cve_bin_tool/cve_scanner.py b/cve_bin_tool/cve_scanner.py index 7b359ed53f..a941e8f253 100644 --- a/cve_bin_tool/cve_scanner.py +++ b/cve_bin_tool/cve_scanner.py @@ -38,6 +38,7 @@ class CVEScanner: def __init__( self, score: int = 0, + check_metrics: bool = False, epss_percentile: float = 0.0, epss_probability: float = 0.0, logger: Logger = None, @@ -49,6 +50,7 @@ def __init__( self.logger = logger or LOGGER.getChild(self.__class__.__name__) self.error_mode = error_mode self.score = score + self.check_metrics = check_metrics self.epss_percentile = epss_percentile self.epss_probability = epss_probability self.products_with_cve = 0 @@ -243,29 +245,34 @@ def get_cves(self, product_info: ProductInfo, triage_data: TriageData): row_dict["cvss_version"] or row["cvss_version"] ) # executing query to get metric for CVE - metric_result = self.metric( - (row["cve_number"],), - self.epss_percentile, - self.epss_probability, - ) - # row_dict doesnt have metric as key. As it based on result from query on cve_severity table - # declaring row_dict[metric] - row_dict["metric"] = {} - # looping for result of query for metrics. - for key, value in metric_result.items(): - row_dict["metric"][key] = [ - value[0], - value[1], - ] - # checking if epss percentile filter is applied - if self.epss_percentile > 0.0 or self.epss_probability > 0.0: - # if epss filter is applied and condition is failed to satisfy row_dict["metric"] will be empty - if not row_dict["metric"]: - # continue to not include that particular cve - continue - self.logger.debug( - f'metrics found in CVE {row_dict["cve_number"]} is {row_dict["metric"]}' - ) + if self.check_metrics: + metric_result = self.metric( + (row["cve_number"],), + self.epss_percentile, + self.epss_probability, + ) + # row_dict doesnt have metric as key. As it based on result from query on + # cve_severity table declaring row_dict[metric] + row_dict["metric"] = {} + # looping for result of query for metrics. + for key, value in metric_result.items(): + row_dict["metric"][key] = [ + value[0], + value[1], + ] + # checking if epss percentile filter is applied + if ( + self.epss_percentile > 0.0 + or self.epss_probability > 0.0 + ): + # if epss filter is applied and condition is failed to satisfy + # row_dict["metric"] will be empty + if not row_dict["metric"]: + # continue to not include that particular cve + continue + self.logger.debug( + f'metrics found in CVE {row_dict["cve_number"]} is {row_dict["metric"]}' + ) cve = CVE(**row_dict) cves.append(cve) diff --git a/cve_bin_tool/output_engine/__init__.py b/cve_bin_tool/output_engine/__init__.py index 1913ea43d3..f588da58b8 100644 --- a/cve_bin_tool/output_engine/__init__.py +++ b/cve_bin_tool/output_engine/__init__.py @@ -43,10 +43,11 @@ def output_json( outfile: IO, detailed: bool = False, affected_versions: int = 0, + metrics: bool = False, ): """Output a JSON of CVEs""" formatted_output = format_output( - all_cve_data, all_cve_version_info, detailed, affected_versions + all_cve_data, all_cve_version_info, detailed, affected_versions, metrics ) json.dump(formatted_output, outfile, indent=" ") @@ -80,10 +81,15 @@ def output_csv( outfile, detailed: bool = False, affected_versions: int = 0, + metrics: bool = False, ): """Output a CSV of CVEs""" formatted_output = format_output( - all_cve_data, all_cve_version_info, detailed, affected_versions + all_cve_data, + all_cve_version_info, + detailed, + affected_versions, + metrics, ) # Trim any leading -, =, +, @, tab or CR to avoid excel macros @@ -101,12 +107,13 @@ def output_csv( "source", "cvss_version", "cvss_vector", - "epss_probability", - "epss_percentile", "paths", "remarks", "comments", ] + if metrics: + fieldnames.append("epss_probability") + fieldnames.append("epss_percentile") if detailed: fieldnames.append("description") if affected_versions != 0: @@ -131,6 +138,7 @@ def output_pdf( merge_report, affected_versions: int = 0, exploits: bool = False, + metrics: bool = False, all_product_data=None, ): """Output a PDF of CVEs""" @@ -468,73 +476,76 @@ def output_pdf( "Applicationlist", widths=[3 * cm, 3 * cm, 2 * cm, 4 * cm, 3 * cm] ) - pdfdoc.heading(1, "List of Vulnerabilities with different metric") - pdfdoc.paragraph( - "The table given below gives CVE found with there score on different metrics." - ) - cve_by_metrics: defaultdict[Remarks, list[dict[str, str]]] = defaultdict( - list - ) - col_headings = [ - "CVE Number", - "CVSS_version", - "CVSS_score", - "EPSS_probability", - "EPSS_percentile", - ] - # group cve_data by its remarks and separately by paths - for product_info, cve_data in all_cve_data.items(): - for cve in cve_data["cves"]: - probability = "-" - percentile = "-" - for metric, field in cve.metric.items(): - if metric == "EPSS": - probability = round(field[0], 5) - percentile = field[1] + if metrics: + pdfdoc.heading(1, "List of Vulnerabilities with different metric") + pdfdoc.paragraph( + "The table given below gives CVE found with there score on different metrics." + ) + cve_by_metrics: defaultdict[ + Remarks, list[dict[str, str]] + ] = defaultdict(list) + col_headings = [ + "CVE Number", + "CVSS_version", + "CVSS_score", + "EPSS_probability", + "EPSS_percentile", + ] + # group cve_data by its remarks and separately by paths + for product_info, cve_data in all_cve_data.items(): + for cve in cve_data["cves"]: + probability = "-" + percentile = "-" + for metric, field in cve.metric.items(): + if metric == "EPSS": + probability = round(field[0], 5) + percentile = field[1] - cve_by_metrics[cve.remarks].append( - { - "cve_number": cve.cve_number, - "cvss_version": str(cve.cvss_version), - "cvss_score": str(cve.score), - "epss_probability": str(probability), - "epss_percentile": str(percentile), - "severity": cve.severity, - } - ) + cve_by_metrics[cve.remarks].append( + { + "cve_number": cve.cve_number, + "cvss_version": str(cve.cvss_version), + "cvss_score": str(cve.score), + "epss_probability": str(probability), + "epss_percentile": str(percentile), + "severity": cve.severity, + } + ) - for remarks in sorted(cve_by_metrics): - pdfdoc.createtable( - "cvemetric", - col_headings, - pdfdoc.tblStyle, - ) - row = 1 - for cve in cve_by_metrics[remarks]: - entry = [ - cve["cve_number"], - cve["cvss_version"], - str(cve["cvss_score"]), - str(cve["epss_probability"]), - str(cve["epss_percentile"]), - ] - pdfdoc.addrow( + for remarks in sorted(cve_by_metrics): + pdfdoc.createtable( "cvemetric", - entry, - [ - ( - "TEXTCOLOR", - (0, row), - (4, row), - severity_colour[cve["severity"].split("-")[0].upper()], - ), - ("FONT", (0, row), (4, row), "Helvetica-Bold"), - ], + col_headings, + pdfdoc.tblStyle, ) - row += 1 - pdfdoc.showtable( - "cvemetric", widths=[4 * cm, 4 * cm, 3 * cm, 4 * cm, 4 * cm] - ) + row = 1 + for cve in cve_by_metrics[remarks]: + entry = [ + cve["cve_number"], + cve["cvss_version"], + str(cve["cvss_score"]), + str(cve["epss_probability"]), + str(cve["epss_percentile"]), + ] + pdfdoc.addrow( + "cvemetric", + entry, + [ + ( + "TEXTCOLOR", + (0, row), + (4, row), + severity_colour[ + cve["severity"].split("-")[0].upper() + ], + ), + ("FONT", (0, row), (4, row), "Helvetica-Bold"), + ], + ) + row += 1 + pdfdoc.showtable( + "cvemetric", widths=[4 * cm, 4 * cm, 3 * cm, 4 * cm, 4 * cm] + ) # List of scanned products with no identified vulnerabilities if all_product_data is not None: @@ -622,6 +633,7 @@ class OutputEngine: detailed (bool) vex_filename (str) exploits (bool) + metrics (bool) all_product_data sbom_filename (str) sbom_type (str) @@ -660,6 +672,7 @@ def __init__( detailed: bool = False, vex_filename: str = "", exploits: bool = False, + metrics: bool = False, all_product_data=None, sbom_filename: str = "", sbom_type: str = "spdx", @@ -687,6 +700,7 @@ def __init__( self.detailed = detailed self.vex_filename = vex_filename self.exploits = exploits + self.metrics = metrics self.all_product_data = all_product_data self.sbom_filename = sbom_filename self.sbom_type = sbom_type @@ -706,6 +720,7 @@ def output_cves(self, outfile, output_type="console"): outfile, self.detailed, self.affected_versions, + self.metrics, ) elif output_type == "csv": output_csv( @@ -714,6 +729,7 @@ def output_cves(self, outfile, output_type="console"): outfile, self.detailed, self.affected_versions, + self.metrics, ) elif output_type == "pdf": output_pdf( @@ -725,6 +741,7 @@ def output_cves(self, outfile, output_type="console"): self.merge_report, self.affected_versions, self.exploits, + self.metrics, ) elif output_type == "html": output_html( @@ -748,6 +765,7 @@ def output_cves(self, outfile, output_type="console"): self.time_of_last_update, self.affected_versions, self.exploits, + self.metrics, self.all_product_data, self.offline, outfile, diff --git a/cve_bin_tool/output_engine/console.py b/cve_bin_tool/output_engine/console.py index fc31632ace..5bf364d7fd 100644 --- a/cve_bin_tool/output_engine/console.py +++ b/cve_bin_tool/output_engine/console.py @@ -50,6 +50,7 @@ def _output_console_nowrap( time_of_last_update: datetime, affected_versions: int, exploits: bool = False, + metrics: bool = False, all_product_data=None, offline: bool = False, width: int = None, @@ -218,8 +219,9 @@ def _output_console_nowrap( table.add_column("Source") table.add_column("Severity") table.add_column("Score (CVSS Version)") - table.add_column("EPSS probability") - table.add_column("EPSS percentile") + if metrics: + table.add_column("EPSS probability") + table.add_column("EPSS percentile") if affected_versions != 0: table.add_column("Affected Versions") @@ -239,9 +241,10 @@ def _output_console_nowrap( Text.styled(cve_data["source"], color), Text.styled(cve_data["severity"], color), Text.styled(cvss_text, color), - Text.styled(cve_data["epss_probability"], color), - Text.styled(cve_data["epss_percentile"], color), ] + if metrics: + cells.append(Text.styled(cve_data["epss_probability"], color)) + cells.append(Text.styled(cve_data["epss_percentile"], color)) if affected_versions != 0: cells.append(Text.styled(cve_data["affected_versions"], color)) table.add_row(*cells) @@ -338,46 +341,47 @@ def validate_cell_length(cell_name, cell_type): # Print the table to the console console.print(table) - table = Table() - # Add Head Columns to the Table - table.add_column("CVE") - table.add_column("CVSS_version") - table.add_column("CVSS_score") - table.add_column("EPSS_probability") - table.add_column("EPSS_percentile") - color = "green" - - cve_by_metrics: defaultdict[Remarks, list[dict[str, str]]] = defaultdict(list) - # group cve_data by its remarks and separately by paths - for product_info, cve_data in all_cve_data.items(): - for cve in cve_data["cves"]: - probability = "-" - percentile = "-" - for metric, field in cve.metric.items(): - if metric == "EPSS": - probability = round(field[0], 5) - percentile = field[1] - cve_by_metrics[cve.remarks].append( - { - "cve_number": cve.cve_number, - "cvss_version": str(cve.cvss_version), - "cvss_score": str(cve.score), - "epss_probability": str(probability), - "epss_percentile": str(percentile), - "severity": cve.severity, - } - ) + if metrics: + table = Table() + # Add Head Columns to the Table + table.add_column("CVE") + table.add_column("CVSS_version") + table.add_column("CVSS_score") + table.add_column("EPSS_probability") + table.add_column("EPSS_percentile") + color = "green" - for remarks in sorted(cve_by_remarks): - color = remarks_colors[remarks] - for cve in cve_by_metrics[remarks]: - color = cve["severity"].split("-")[0].lower() - cells = [ - Text.styled(cve["cve_number"], color), - Text.styled(cve["cvss_version"], color), - Text.styled(str(cve["cvss_score"]), color), - Text.styled(cve["epss_probability"], color), - Text.styled(cve["epss_percentile"], color), - ] - table.add_row(*cells) - console.print(table) + cve_by_metrics: defaultdict[Remarks, list[dict[str, str]]] = defaultdict(list) + # group cve_data by its remarks and separately by paths + for product_info, cve_data in all_cve_data.items(): + for cve in cve_data["cves"]: + probability = "-" + percentile = "-" + for metric, field in cve.metric.items(): + if metric == "EPSS": + probability = round(field[0], 5) + percentile = field[1] + cve_by_metrics[cve.remarks].append( + { + "cve_number": cve.cve_number, + "cvss_version": str(cve.cvss_version), + "cvss_score": str(cve.score), + "epss_probability": str(probability), + "epss_percentile": str(percentile), + "severity": cve.severity, + } + ) + + for remarks in sorted(cve_by_remarks): + color = remarks_colors[remarks] + for cve in cve_by_metrics[remarks]: + color = cve["severity"].split("-")[0].lower() + cells = [ + Text.styled(cve["cve_number"], color), + Text.styled(cve["cvss_version"], color), + Text.styled(str(cve["cvss_score"]), color), + Text.styled(cve["epss_probability"], color), + Text.styled(cve["epss_percentile"], color), + ] + table.add_row(*cells) + console.print(table) diff --git a/cve_bin_tool/output_engine/util.py b/cve_bin_tool/output_engine/util.py index e3118a1261..aa491d2726 100644 --- a/cve_bin_tool/output_engine/util.py +++ b/cve_bin_tool/output_engine/util.py @@ -153,6 +153,7 @@ def format_output( all_cve_version_info: dict[str, VersionInfo] | None = None, detailed: bool = False, affected_versions: int = 0, + metrics: bool = False, ) -> list[dict[str, str]]: """ summary: format output in the list of dictionary format. @@ -199,12 +200,13 @@ def format_output( "source": cve.data_source, "cvss_version": str(cve.cvss_version), "cvss_vector": cve.cvss_vector, - "epss_probability": str(probability), - "epss_percentile": str(percentile), "paths": ", ".join(cve_data["paths"]), "remarks": cve.remarks.name, "comments": cve.comments, } + if metrics: + details["epss_probability"] = str(probability) + details["epss_percentile"] = str(percentile) if detailed: details["description"] = cve.description if affected_versions != 0: diff --git a/test/test_output_engine.py b/test/test_output_engine.py index dde479a4bc..fa4dc414c7 100644 --- a/test/test_output_engine.py +++ b/test/test_output_engine.py @@ -853,24 +853,26 @@ def tearDown(self) -> None: def test_formatted_output(self): """Test reformatting products""" - self.assertEqual(format_output(self.MOCK_OUTPUT, None), self.FORMATTED_OUTPUT) + self.assertEqual( + format_output(self.MOCK_OUTPUT, None, metrics=True), self.FORMATTED_OUTPUT + ) def test_formatted_detailed_output(self): """Test detailed flag output""" self.assertEqual( - format_output(self.MOCK_DETAILED_OUTPUT, None, detailed=True), + format_output(self.MOCK_DETAILED_OUTPUT, None, detailed=True, metrics=True), self.FORMATTED_DETAILED_OUTPUT, ) def test_output_json(self): """Test formatting output as JSON""" - output_json(self.MOCK_OUTPUT, None, self.mock_file) + output_json(self.MOCK_OUTPUT, None, self.mock_file, metrics=True) self.mock_file.seek(0) # reset file position self.assertEqual(json.load(self.mock_file), self.FORMATTED_OUTPUT) def test_output_csv(self): """Test formatting output as CSV""" - output_csv(self.MOCK_OUTPUT, None, self.mock_file) + output_csv(self.MOCK_OUTPUT, None, self.mock_file, metrics=True) self.mock_file.seek(0) # reset file position reader = csv.DictReader(self.mock_file) expected_value = [dict(x) for x in reader] @@ -930,6 +932,7 @@ def test_output_console(self): time_of_last_update = datetime.today() affected_versions = 0 exploits = False + metrics = True console = Console(file=self.mock_file) outfile = None all_product_data = None @@ -940,6 +943,7 @@ def test_output_console(self): time_of_last_update, affected_versions, exploits, + metrics, all_product_data, True, 120, @@ -973,6 +977,7 @@ def test_output_console_affected_versions(self): time_of_last_update = datetime.today() affected_versions = 1 exploits = False + metrics = True console = Console(file=self.mock_file) outfile = None all_product_data = None @@ -983,6 +988,7 @@ def test_output_console_affected_versions(self): time_of_last_update, affected_versions, exploits, + metrics, all_product_data, True, 120, @@ -1022,6 +1028,7 @@ def test_output_console_outfile(self): time_of_last_update = datetime.today() affected_versions = 0 exploits = False + metrics = True outfile = tmpf.name all_product_data = None @@ -1031,6 +1038,7 @@ def test_output_console_outfile(self): time_of_last_update, affected_versions, exploits, + metrics, all_product_data, True, 120, @@ -1227,7 +1235,7 @@ def test_csv_macros(self): }, ] - output_csv(bad_input, None, self.mock_file) + output_csv(bad_input, None, self.mock_file, metrics=True) self.mock_file.seek(0) # reset file position reader = csv.DictReader(self.mock_file) actual_output = [dict(x) for x in reader]