diff --git a/cve_bin_tool/cli.py b/cve_bin_tool/cli.py index ce3841b81f..b898c7c7ae 100644 --- a/cve_bin_tool/cli.py +++ b/cve_bin_tool/cli.py @@ -70,8 +70,8 @@ from cve_bin_tool.merge import MergeReports from cve_bin_tool.output_engine import OutputEngine from cve_bin_tool.package_list_parser import PackageListParser -from cve_bin_tool.sbom_detection import sbom_detection -from cve_bin_tool.sbom_manager import SBOMManager +from cve_bin_tool.sbom_manager.parse import SBOMParse +from cve_bin_tool.sbom_manager.sbom_detection import sbom_detection from cve_bin_tool.util import ProductInfo from cve_bin_tool.version import VERSION from cve_bin_tool.version_scanner import VersionScanner @@ -1048,13 +1048,13 @@ def main(argv=None): if args["sbom_file"]: sbom_root = args["sbom_file"] # Process SBOM file - sbom_list = SBOMManager( + sbom_list = SBOMParse( args["sbom_file"], sbom_type=args["sbom"], logger=LOGGER, validate=not args["disable_validation_check"], ) - parsed_data = sbom_list.scan_file() + parsed_data = sbom_list.parse_sbom() LOGGER.info( f"The number of products to process from SBOM - {len(parsed_data)}" ) diff --git a/cve_bin_tool/output_engine/__init__.py b/cve_bin_tool/output_engine/__init__.py index 0298f14e81..b47af491ce 100644 --- a/cve_bin_tool/output_engine/__init__.py +++ b/cve_bin_tool/output_engine/__init__.py @@ -13,20 +13,13 @@ from pathlib import Path from typing import IO, Any -from lib4sbom.data.package import SBOMPackage -from lib4sbom.data.relationship import SBOMRelationship -from lib4sbom.generator import SBOMGenerator -from lib4sbom.sbom import SBOM - -from ..cve_scanner import CVEData -from ..cvedb import CVEDB -from ..error_handler import ErrorHandler, ErrorMode -from ..log import LOGGER -from ..util import ProductInfo, Remarks, VersionInfo -from ..version import VERSION -from .console import output_console -from .html import output_html -from .util import ( +from cve_bin_tool.cve_scanner import CVEData +from cve_bin_tool.cvedb import CVEDB +from cve_bin_tool.error_handler import ErrorHandler, ErrorMode +from cve_bin_tool.log import LOGGER +from cve_bin_tool.output_engine.console import output_console +from cve_bin_tool.output_engine.html import output_html +from cve_bin_tool.output_engine.util import ( add_extension_if_not, format_output, format_path, @@ -35,6 +28,9 @@ get_cve_summary, intermediate_output, ) +from cve_bin_tool.sbom_manager.generate import SBOMGenerate +from cve_bin_tool.util import ProductInfo, Remarks, VersionInfo +from cve_bin_tool.version import VERSION def output_json( @@ -794,13 +790,15 @@ def output_cves(self, outfile, output_type="console"): if self.vex_filename != "": self.generate_vex(self.all_cve_data, self.vex_filename) if self.sbom_filename != "": - self.generate_sbom( + sbomgen = SBOMGenerate( self.all_product_data, - filename=self.sbom_filename, - sbom_type=self.sbom_type, - sbom_format=self.sbom_format, - sbom_root=self.sbom_root, + self.sbom_filename, + self.sbom_type, + self.sbom_format, + self.sbom_root, + self.logger, ) + sbomgen.generate_sbom() def generate_vex(self, all_cve_data: dict[ProductInfo, CVEData], filename: str): """Generate a vex file and create vulnerability entry.""" @@ -906,78 +904,6 @@ def generate_vex(self, all_cve_data: dict[ProductInfo, CVEData], filename: str): with open(filename, "w") as outfile: json.dump(vex_output, outfile, indent=" ") - def generate_sbom( - self, - all_product_data, - filename="", - sbom_type="spdx", - sbom_format="tag", - sbom_root="CVE-SCAN", - ): - """Create SBOM package and generate SBOM file.""" - # Create SBOM - sbom_relationships = [] - my_package = SBOMPackage() - sbom_relationship = SBOMRelationship() - - # Create root package - my_package.initialise() - root_package = f'CVEBINTOOL-{Path(sbom_root).name.replace(".", "-")}' - parent = f"SBOM_{root_package}" - my_package.set_name(root_package) - my_package.set_type("application") - my_package.set_filesanalysis(False) - my_package.set_downloadlocation(sbom_root) - license = "NOASSERTION" - my_package.set_licensedeclared(license) - my_package.set_licenseconcluded(license) - my_package.set_supplier("UNKNOWN", "NOASSERTION") - - # Store package data - self.sbom_packages[(my_package.get_name(), my_package.get_value("version"))] = ( - my_package.get_package() - ) - sbom_relationship.initialise() - sbom_relationship.set_relationship(parent, "DESCRIBES", root_package) - sbom_relationships.append(sbom_relationship.get_relationship()) - - # Add dependent products - for product_data in all_product_data: - my_package.initialise() - my_package.set_name(product_data.product) - my_package.set_version(product_data.version) - if product_data.vendor.casefold() != "UNKNOWN".casefold(): - my_package.set_supplier("Organization", product_data.vendor) - my_package.set_licensedeclared(license) - my_package.set_licenseconcluded(license) - if not ( - (my_package.get_name(), my_package.get_value("version")) - in self.sbom_packages - and product_data.vendor == "unknown" - ): - location = product_data.location - my_package.set_evidence(location) # Set location directly - self.sbom_packages[ - (my_package.get_name(), my_package.get_value("version")) - ] = my_package.get_package() - sbom_relationship.initialise() - sbom_relationship.set_relationship( - root_package, "DEPENDS_ON", product_data.product - ) - sbom_relationships.append(sbom_relationship.get_relationship()) - - # Generate SBOM - my_sbom = SBOM() - my_sbom.add_packages(self.sbom_packages) - my_sbom.add_relationships(sbom_relationships) - my_generator = SBOMGenerator( - sbom_type=sbom_type, - format=sbom_format, - application="cve-bin-tool", - version=VERSION, - ) - my_generator.generate(parent, my_sbom.get_sbom(), filename=filename) - def output_file_wrapper(self, output_types=["console"]): """Call output_file method for all output types.""" for output_type in output_types: diff --git a/cve_bin_tool/sbom_manager/__init__.py b/cve_bin_tool/sbom_manager/__init__.py index 9d92107873..1fcaf47a55 100644 --- a/cve_bin_tool/sbom_manager/__init__.py +++ b/cve_bin_tool/sbom_manager/__init__.py @@ -1,397 +1,2 @@ # Copyright (C) 2021 Anthony Harrison # SPDX-License-Identifier: GPL-3.0-or-later - -from __future__ import annotations - -import re -import sys -from collections import defaultdict -from logging import Logger -from pathlib import Path - -import defusedxml.ElementTree as ET -from lib4sbom.parser import SBOMParser -from packageurl import PackageURL - -from cve_bin_tool.cvedb import CVEDB -from cve_bin_tool.input_engine import TriageData -from cve_bin_tool.log import LOGGER -from cve_bin_tool.util import ( - ProductInfo, - Remarks, - find_product_location, - validate_location, -) -from cve_bin_tool.validator import validate_cyclonedx, validate_spdx - -from .swid_parser import SWIDParser - - -class SBOMManager: - """ - Class: SBOMManager - - This class is responsible for parsing various SBOM file formats (SPDX, CycloneDX, SWID) in the CVE Bin Tool. - - It provides methods for scanning SBOM files, parsing them, and retrieving vendor information. - - Attributes: - - sbom_data (DefaultDict[ProductInfo, TriageData]): Dictionary containing parsed SBOM data. - - """ - - SBOMtype = ["spdx", "cyclonedx", "swid"] - - sbom_data: defaultdict[ProductInfo, TriageData] - - def __init__( - self, - filename: str, - sbom_type: str = "spdx", - logger: Logger | None = None, - validate: bool = True, - ): - self.filename = filename - self.sbom_data = defaultdict(dict) - self.type = "unknown" - if sbom_type in self.SBOMtype: - self.type = sbom_type - self.logger = logger or LOGGER.getChild(self.__class__.__name__) - self.validate = validate - - # Connect to the database - self.cvedb = CVEDB(version_check=False) - - def common_prefix_split(self, product, version) -> list[ProductInfo]: - """If the product have '-' in name try splitting it and try common prefixes. - currently not being used, proposed to be used in future""" - parsed_data: list[ProductInfo] = [] - found_common_prefix = False - common_prefix = ( - "perl-", - "golang-", - "rubygem-", - "python-", - "py3-", - "python3-", - "python2-", - "rust-", - "nodejs-", - ) - for prefix in common_prefix: - if product.startswith(prefix): - common_prefix_product = product[len(prefix) :] - common_prefix_vendor = self.get_vendor(common_prefix_product) - if len(common_prefix_vendor) > 1 or ( - len(common_prefix_vendor) == 1 - and common_prefix_vendor[0] != "UNKNOWN" - ): - location = find_product_location(common_prefix_product) - if location is None: - location = "NotFound" - if validate_location(location) is False: - raise ValueError(f"Invalid location {location} for {product}") - found_common_prefix = True - for vendor in common_prefix_vendor: - parsed_data.append( - ProductInfo( - vendor, common_prefix_product, version, location - ) - ) - break - if not found_common_prefix: - # if vendor not found after removing common prefix try splitting it - LOGGER.debug( - f"No Vendor found for {product}, trying splitted product. " - "Some results may be inaccurate due to vendor identification limitations." - ) - splitted_product = product.split("-") - for sp in splitted_product: - temp = self.get_vendor(sp) - if len(temp) > 1 or (len(temp) == 1 and temp[0] != "UNKNOWN"): - for vendor in temp: - location = find_product_location(sp) - if location is None: - location = "NotFound" - if validate_location(location) is False: - raise ValueError( - f"Invalid location {location} for {product}" - ) - # if vendor is not None: - parsed_data.append(ProductInfo(vendor, sp, version, location)) - return parsed_data - - def scan_file(self) -> dict[ProductInfo, TriageData]: - """ - Parses the SBOM input file and returns the product information and - corresponding triage data. - - Returns: - - dict[ProductInfo, TriageData]: Parsed SBOM data. - - """ - self.logger.debug( - f"Processing SBOM {self.filename} of type {self.type.upper()}" - ) - modules = [] - try: - if Path(self.filename).exists(): - if self.type == "swid": - swid = SWIDParser(self.validate) - modules = swid.parse(self.filename) - else: - modules = self.parse_sbom() - except (KeyError, FileNotFoundError, ET.ParseError) as e: - LOGGER.debug(e, exc_info=True) - - LOGGER.debug( - f"The number of modules identified in SBOM - {len(modules)}\n{modules}" - ) - - # Now process list of modules to create [vendor, product, version] tuples - parsed_data: list[ProductInfo] = [] - for module_vendor, product, version in modules: - # Using lower to normalize product names across databases - product = product.lower() - - if module_vendor is None: - # Now add vendor to create product record.... - vendor_set = self.get_vendor(product) - for vendor in vendor_set: - # if vendor is not None: - location = find_product_location(product) - if location is None: - location = "NotFound" - if validate_location(location) is False: - raise ValueError(f"Invalid location {location} for {product}") - parsed_data.append(ProductInfo(vendor, product, version, location)) - else: - location = find_product_location(product) - if location is None: - location = "NotFound" - if validate_location(location) is False: - raise ValueError(f"Invalid location {location} for {product}") - parsed_data.append( - ProductInfo(module_vendor, product, version, location) - ) - - for row in parsed_data: - self.sbom_data[row]["default"] = { - "remarks": Remarks.NewFound, - "comments": "", - "severity": "", - } - self.sbom_data[row]["paths"] = set(map(lambda x: x.strip(), "".split(","))) - - LOGGER.debug(f"SBOM Data {self.sbom_data}") - return self.sbom_data - - def get_vendor(self, product: str) -> list: - """ - Get the list of vendors for the product name. - - There may be more than one vendor for a given product name and all - matches are returned. - - Args: - - product (str): Product name. - - Returns: - - list: The list of vendors for the product - - """ - vendorlist: list[str] = [] - vendor_package_pair = self.cvedb.get_vendor_product_pairs(product) - if vendor_package_pair: - # To handle multiple vendors, return all combinations of product/vendor mappings - for v in vendor_package_pair: - vendor = v["vendor"] - vendorlist.append(vendor) - else: - vendorlist.append("UNKNOWN") - return vendorlist - - def is_valid_string(self, string_type: str, ref_string: str) -> bool: - """ - Validate the PURL, CPE string is the correct form. - - Args: - - ref_string (str): PURL, CPE strings - - string_type (str): ref_string type. (purl, cpe22 or cpe23) - - Returns: - - bool: True if the ref_string parameter is a valid purl or cpe string, False otherwise. - - """ - string_pattern: str - if string_type == "purl": - string_pattern = r"^(?P.+):(?P.+)/(?P.+)/(?P.+)@(?P.+)\??(?P.*)#?(?P.*)$" - - elif string_type == "cpe23": - string_pattern = r"^cpe:2\.3:[aho\*\-](:(((\?*|\*?)([a-zA-Z0-9\-\._]|(\\[\\\*\?\!\"#\$%&'\(\)\+,\-\.\/:;<=>@\[\]\^`\{\|}~]))+(\?*|\*?))|[\*\-])){5}(:(([a-zA-Z]{2,3}(-([a-zA-Z]{2}|[0-9]{3}))?)|[\*\-]))(:(((\?*|\*?)([a-zA-Z0-9\-\._]|(\\[\\\*\?\!\"#\$%&'\(\)\+,\-\.\/:;<=>@\[\]\^`\{\|}~]))+(\?*|\*?))|[\*\-])){4}" - - elif string_type == "cpe22": - string_pattern = r"^[c][pP][eE]:/[AHOaho]?(:[A-Za-z0-9\._\-~%]*){0,6}" - - return re.match(string_pattern, ref_string) is not None - - def parse_sbom(self) -> [(str, str, str)]: - """ - Parse the SBOM to extract a list of modules, including vendor, product, and version information. - - The parsed product information can be retrieved from different components of the SBOM, with the following order of preference: - 1. CPE 2.3 Identifiers - 2. CPE 2.2 Identifiers - 3. Package URLs (purl) - 4. Name and Version from the SBOM (Vendor will be unspecified) - - Returns: - - List[(str, str, str)]: A list of tuples, each containing vendor, product, and version information for a module. - - """ - - # Set up SBOM parser - sbom_parser = SBOMParser(sbom_type=self.type) - # Load SBOM - sbom_parser.parse_file(self.filename) - modules = [] - if self.validate and self.filename.endswith(".xml"): - # Only for XML files - if sbom_parser.get_type() == "spdx": - valid_xml = validate_spdx(self.filename) - else: - valid_xml = validate_cyclonedx(self.filename) - if not valid_xml: - return modules - packages = [x for x in sbom_parser.get_sbom()["packages"].values()] - LOGGER.debug(f"Parsed SBOM {self.filename} {packages}") - for package in packages: - vendor = None - package_name = None - version = None - - # If Package URL or CPE record found, use this data in preference to package data - ext_ref = package.get("externalreference") - if ext_ref is not None: - vendor, package_name, version = self.parse_ext_ref(ext_ref=ext_ref) - - # For any data not found in CPE or the Package URL get from package data - if not vendor: - pass # Because no vendor was detected then all vendors with this named package - # will be included in the output. - - if not package_name: - package_name = package["name"] - - if (not version) and (package.get("version") is not None): - version = package["version"] - else: - LOGGER.debug(f"No version found in {package}") - - if version: - # Found at least package and version, save the results - modules.append([vendor, package_name, version]) - - LOGGER.debug(f"Parsed SBOM {self.filename} {modules}") - return modules - - def parse_ext_ref(self, ext_ref) -> (str | None, str | None, str | None): - """ - Parse external references in an SBOM to extract module information. - - Two passes are made through the external references, giving priority to CPE types, - which will always match the CVE database. - - Args: - - ext_ref (List[List[str]]): List of lists representing external references. - Each inner list contains [category, type, locator]. - - Returns: - - Optional[Tuple[str | None, str | None, str | None]]: A tuple containing the vendor, product, and version - information extracted from the external references, or None if not found. - - """ - decoded = {} - for ref in ext_ref: - ref_type = ref[1] - ref_string = ref[2] - if ref_type == "cpe23Type" and self.is_valid_string("cpe23", ref_string): - decoded["cpe23Type"] = self.decode_cpe23(ref_string) - - elif ref_type == "cpe22Type" and self.is_valid_string("cpe22", ref_string): - decoded["cpe22Type"] = self.decode_cpe22(ref_string) - - elif ref_type == "purl" and self.is_valid_string("purl", ref_string): - decoded["purl"] = self.decode_purl(ref_string) - - # No ext-ref matches, return none - return decoded.get( - "cpe23Type", - decoded.get("cpe22Type", decoded.get("purl", (None, None, None))), - ) - - def decode_cpe22(self, cpe22) -> (str | None, str | None, str | None): - """ - Decode a CPE 2.2 formatted string to extract vendor, product, and version information. - - Args: - - cpe22 (str): CPE 2.2 formatted string. - - Returns: - - Tuple[str | None, str | None, str | None]: A tuple containing the vendor, product, and version - information extracted from the CPE 2.2 string, or None if the information is incomplete. - - """ - - # split on `:` only if it's not escaped - cpe = re.split(r"(? (str | None, str | None, str | None): - """ - Decode a CPE 2.3 formatted string to extract vendor, product, and version information. - - Args: - - cpe23 (str): CPE 2.3 formatted string. - - Returns: - - Tuple[str | None, str | None, str | None]: A tuple containing the vendor, product, and version - information extracted from the CPE 2.3 string, or None if the information is incomplete. - - """ - - # split on `:` only if it's not escaped - cpe = re.split(r"(? (str | None, str | None, str | None): - """ - Decode a Package URL (purl) to extract version information. - - Args: - - purl (str): Package URL (purl) string. - - Returns: - - Tuple[str | None, str | None, str | None]: A tuple containing the vendor (which is always None for purl), - product, and version information extracted from the purl string, or None if the purl is invalid or incomplete. - - """ - vendor = None # Because the vendor and product identifiers in the purl don't always align - product = None # with the CVE DB, only the version is parsed. - version = None - # Process purl identifier - purl_info = PackageURL.from_string(purl).to_dict() - version = purl_info.get("version") - - return [vendor or None, product or None, version or None] - - -if __name__ == "__main__": - - file = sys.argv[1] - sbom = SBOMManager(file) - sbom.scan_file() diff --git a/cve_bin_tool/sbom_manager/generate.py b/cve_bin_tool/sbom_manager/generate.py new file mode 100644 index 0000000000..dd96dafb01 --- /dev/null +++ b/cve_bin_tool/sbom_manager/generate.py @@ -0,0 +1,105 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +from logging import Logger +from pathlib import Path +from typing import Optional + +from lib4sbom.data.package import SBOMPackage +from lib4sbom.data.relationship import SBOMRelationship +from lib4sbom.generator import SBOMGenerator +from lib4sbom.sbom import SBOM + +from cve_bin_tool.log import LOGGER +from cve_bin_tool.version import VERSION + + +class SBOMGenerate: + """ + Class for generating SBOM (Software Bill of Materials) + + Methods: + generate_sbom: Create SBOM package and generate SBOM file. + """ + + def __init__( + self, + all_product_data, + filename="", + sbom_type="spdx", + sbom_format="tag", + sbom_root="CVE-SCAN", + logger: Optional[Logger] = None, + ): + self.all_product_data = all_product_data + self.filename = filename + self.sbom_type = sbom_type + self.sbom_format = sbom_format + self.sbom_root = sbom_root + self.logger = logger or LOGGER.getChild(self.__class__.__name__) + self.sbom_packages = {} + + def generate_sbom(self) -> None: + """Create SBOM package and generate SBOM file.""" + # Create SBOM + sbom_relationships = [] + my_package = SBOMPackage() + sbom_relationship = SBOMRelationship() + + # Create root package + my_package.initialise() + root_package = f'CVEBINTOOL-{Path(self.sbom_root).name.replace(".", "-")}' + parent = f"SBOM_{root_package}" + my_package.set_name(root_package) + my_package.set_type("application") + my_package.set_filesanalysis(False) + my_package.set_downloadlocation(self.sbom_root) + license = "NOASSERTION" + my_package.set_licensedeclared(license) + my_package.set_licenseconcluded(license) + my_package.set_supplier("UNKNOWN", "NOASSERTION") + + # Store package data + self.sbom_packages[(my_package.get_name(), my_package.get_value("version"))] = ( + my_package.get_package() + ) + sbom_relationship.initialise() + sbom_relationship.set_relationship(parent, "DESCRIBES", root_package) + sbom_relationships.append(sbom_relationship.get_relationship()) + + # Add dependent products + for product_data in self.all_product_data: + my_package.initialise() + my_package.set_name(product_data.product) + my_package.set_version(product_data.version) + if product_data.vendor.casefold() != "UNKNOWN".casefold(): + my_package.set_supplier("Organization", product_data.vendor) + my_package.set_licensedeclared(license) + my_package.set_licenseconcluded(license) + if not ( + (my_package.get_name(), my_package.get_value("version")) + in self.sbom_packages + and product_data.vendor == "unknown" + ): + location = product_data.location + my_package.set_evidence(location) # Set location directly + self.sbom_packages[ + (my_package.get_name(), my_package.get_value("version")) + ] = my_package.get_package() + sbom_relationship.initialise() + sbom_relationship.set_relationship( + root_package, "DEPENDS_ON", product_data.product + ) + sbom_relationships.append(sbom_relationship.get_relationship()) + + # Generate SBOM + my_sbom = SBOM() + my_sbom.add_packages(self.sbom_packages) + my_sbom.add_relationships(sbom_relationships) + my_generator = SBOMGenerator( + sbom_type=self.sbom_type, + format=self.sbom_format, + application="cve-bin-tool", + version=VERSION, + ) + my_generator.generate(parent, my_sbom.get_sbom(), filename=self.filename) diff --git a/cve_bin_tool/sbom_manager/parse.py b/cve_bin_tool/sbom_manager/parse.py new file mode 100644 index 0000000000..dfe1f54306 --- /dev/null +++ b/cve_bin_tool/sbom_manager/parse.py @@ -0,0 +1,430 @@ +# Copyright (C) 2021 Anthony Harrison +# SPDX-License-Identifier: GPL-3.0-or-later + +from __future__ import annotations + +import re +import sys +from collections import defaultdict +from logging import Logger +from pathlib import Path + +import defusedxml.ElementTree as ET +from lib4sbom.parser import SBOMParser +from packageurl import PackageURL + +from cve_bin_tool.cvedb import CVEDB +from cve_bin_tool.input_engine import TriageData +from cve_bin_tool.log import LOGGER +from cve_bin_tool.util import ( + ProductInfo, + Remarks, + find_product_location, + validate_location, +) +from cve_bin_tool.validator import validate_cyclonedx, validate_spdx, validate_swid + + +class SBOMParse: + """ + Class: SBOMParse + + This class is responsible for parsing various SBOM file formats (SPDX, CycloneDX, SWID) in the CVE Bin Tool. + + It provides methods for scanning SBOM files, parsing them, and retrieving vendor information. + + Attributes: + - sbom_data (DefaultDict[ProductInfo, TriageData]): Dictionary containing parsed SBOM data. + + """ + + SBOMtype = ["spdx", "cyclonedx", "swid"] + + sbom_data: defaultdict[ProductInfo, TriageData] + + def __init__( + self, + filename: str, + sbom_type: str = "spdx", + logger: Logger | None = None, + validate: bool = True, + ): + self.filename = filename + self.sbom_data = defaultdict(dict) + self.type = "unknown" + if sbom_type in self.SBOMtype: + self.type = sbom_type + self.logger = logger or LOGGER.getChild(self.__class__.__name__) + self.validate = validate + + # Connect to the database + self.cvedb = CVEDB(version_check=False) + + def parse_sbom(self) -> dict[ProductInfo, TriageData]: + """ + Parses the SBOM input file and returns the product information and + corresponding triage data. + + Returns: + - dict[ProductInfo, TriageData]: Parsed SBOM data. + + """ + self.logger.debug( + f"Processing SBOM {self.filename} of type {self.type.upper()}" + ) + modules = [] + try: + if Path(self.filename).exists(): + if self.type == "swid": + modules = self.parse_swid(self.filename) + else: + modules = self.parse_cyclonedx_spdx() + except (KeyError, FileNotFoundError, ET.ParseError) as e: + LOGGER.debug(e, exc_info=True) + + LOGGER.debug( + f"The number of modules identified in SBOM - {len(modules)}\n{modules}" + ) + + # Now process list of modules to create [vendor, product, version] tuples + parsed_data: list[ProductInfo] = [] + for module_vendor, product, version in modules: + # Using lower to normalize product names across databases + product = product.lower() + + if module_vendor is None: + # Now add vendor to create product record.... + vendor_set = self.get_vendor(product) + for vendor in vendor_set: + # if vendor is not None: + location = find_product_location(product) + if location is None: + location = "NotFound" + if validate_location(location) is False: + raise ValueError(f"Invalid location {location} for {product}") + parsed_data.append(ProductInfo(vendor, product, version, location)) + else: + location = find_product_location(product) + if location is None: + location = "NotFound" + if validate_location(location) is False: + raise ValueError(f"Invalid location {location} for {product}") + parsed_data.append( + ProductInfo(module_vendor, product, version, location) + ) + + for row in parsed_data: + self.sbom_data[row]["default"] = { + "remarks": Remarks.NewFound, + "comments": "", + "severity": "", + } + self.sbom_data[row]["paths"] = set(map(lambda x: x.strip(), "".split(","))) + + LOGGER.debug(f"SBOM Data {self.sbom_data}") + return self.sbom_data + + def common_prefix_split(self, product, version) -> list[ProductInfo]: + """If the product have '-' in name try splitting it and try common prefixes. + currently not being used, proposed to be used in future""" + parsed_data: list[ProductInfo] = [] + found_common_prefix = False + common_prefix = ( + "perl-", + "golang-", + "rubygem-", + "python-", + "py3-", + "python3-", + "python2-", + "rust-", + "nodejs-", + ) + for prefix in common_prefix: + if product.startswith(prefix): + common_prefix_product = product[len(prefix) :] + common_prefix_vendor = self.get_vendor(common_prefix_product) + if len(common_prefix_vendor) > 1 or ( + len(common_prefix_vendor) == 1 + and common_prefix_vendor[0] != "UNKNOWN" + ): + location = find_product_location(common_prefix_product) + if location is None: + location = "NotFound" + if validate_location(location) is False: + raise ValueError(f"Invalid location {location} for {product}") + found_common_prefix = True + for vendor in common_prefix_vendor: + parsed_data.append( + ProductInfo( + vendor, common_prefix_product, version, location + ) + ) + break + if not found_common_prefix: + # if vendor not found after removing common prefix try splitting it + LOGGER.debug( + f"No Vendor found for {product}, trying splitted product. " + "Some results may be inaccurate due to vendor identification limitations." + ) + splitted_product = product.split("-") + for sp in splitted_product: + temp = self.get_vendor(sp) + if len(temp) > 1 or (len(temp) == 1 and temp[0] != "UNKNOWN"): + for vendor in temp: + location = find_product_location(sp) + if location is None: + location = "NotFound" + if validate_location(location) is False: + raise ValueError( + f"Invalid location {location} for {product}" + ) + # if vendor is not None: + parsed_data.append(ProductInfo(vendor, sp, version, location)) + return parsed_data + + def get_vendor(self, product: str) -> list: + """ + Get the list of vendors for the product name. + + There may be more than one vendor for a given product name and all + matches are returned. + + Args: + - product (str): Product name. + + Returns: + - list: The list of vendors for the product + + """ + vendorlist: list[str] = [] + vendor_package_pair = self.cvedb.get_vendor_product_pairs(product) + if vendor_package_pair: + # To handle multiple vendors, return all combinations of product/vendor mappings + for v in vendor_package_pair: + vendor = v["vendor"] + vendorlist.append(vendor) + else: + vendorlist.append("UNKNOWN") + return vendorlist + + def is_valid_string(self, string_type: str, ref_string: str) -> bool: + """ + Validate the PURL, CPE string is the correct form. + + Args: + - ref_string (str): PURL, CPE strings + - string_type (str): ref_string type. (purl, cpe22 or cpe23) + + Returns: + - bool: True if the ref_string parameter is a valid purl or cpe string, False otherwise. + + """ + string_pattern: str + if string_type == "purl": + string_pattern = r"^(?P.+):(?P.+)/(?P.+)/(?P.+)@(?P.+)\??(?P.*)#?(?P.*)$" + + elif string_type == "cpe23": + string_pattern = r"^cpe:2\.3:[aho\*\-](:(((\?*|\*?)([a-zA-Z0-9\-\._]|(\\[\\\*\?\!\"#\$%&'\(\)\+,\-\.\/:;<=>@\[\]\^`\{\|}~]))+(\?*|\*?))|[\*\-])){5}(:(([a-zA-Z]{2,3}(-([a-zA-Z]{2}|[0-9]{3}))?)|[\*\-]))(:(((\?*|\*?)([a-zA-Z0-9\-\._]|(\\[\\\*\?\!\"#\$%&'\(\)\+,\-\.\/:;<=>@\[\]\^`\{\|}~]))+(\?*|\*?))|[\*\-])){4}" + + elif string_type == "cpe22": + string_pattern = r"^[c][pP][eE]:/[AHOaho]?(:[A-Za-z0-9\._\-~%]*){0,6}" + + return re.match(string_pattern, ref_string) is not None + + def parse_cyclonedx_spdx(self) -> [(str, str, str)]: + """ + Parse the cyclonedx/spdx to extract a list of modules, including vendor, product, and version information. + + The parsed product information can be retrieved from different components of the SBOM, with the following order of preference: + 1. CPE 2.3 Identifiers + 2. CPE 2.2 Identifiers + 3. Package URLs (purl) + 4. Name and Version from the SBOM (Vendor will be unspecified) + + Returns: + - List[(str, str, str)]: A list of tuples, each containing vendor, product, and version information for a module. + + """ + + # Set up SBOM parser + sbom_parser = SBOMParser(sbom_type=self.type) + # Load SBOM + sbom_parser.parse_file(self.filename) + modules = [] + if self.validate and self.filename.endswith(".xml"): + # Only for XML files + if sbom_parser.get_type() == "spdx": + valid_xml = validate_spdx(self.filename) + else: + valid_xml = validate_cyclonedx(self.filename) + if not valid_xml: + return modules + packages = [x for x in sbom_parser.get_sbom()["packages"].values()] + LOGGER.debug(f"Parsed SBOM {self.filename} {packages}") + for package in packages: + vendor = None + package_name = None + version = None + + # If Package URL or CPE record found, use this data in preference to package data + ext_ref = package.get("externalreference") + if ext_ref is not None: + vendor, package_name, version = self.parse_ext_ref(ext_ref=ext_ref) + + # For any data not found in CPE or the Package URL get from package data + if not vendor: + pass # Because no vendor was detected then all vendors with this named package + # will be included in the output. + + if not package_name: + package_name = package["name"] + + if (not version) and (package.get("version") is not None): + version = package["version"] + else: + LOGGER.debug(f"No version found in {package}") + + if version: + # Found at least package and version, save the results + modules.append([vendor, package_name, version]) + + LOGGER.debug(f"Parsed SBOM {self.filename} {modules}") + return modules + + def parse_swid(self, sbom_file: str) -> list[list[str]]: + """Parse SWID XML BOM file extracting package name and version""" + modules: list[list[str]] = [] + if self.validate and not validate_swid(sbom_file): + return modules + tree = ET.parse(sbom_file) + # Find root element + root = tree.getroot() + # Extract schema + schema = root.tag[: root.tag.find("}") + 1] + # schema = '{http://standards.iso.org/iso/19770/-2/2015/schema.xsd}' + for component in root.findall(schema + "Link"): + # Only if a component .... + if component.get("rel") == "component": + swid = component.get("href") + if not swid: + raise KeyError(f"Could not find href in {component}") + swid = swid.replace("%20", " ") + modules.append(self.extract(swid)) + + return modules + + def extract(self, swid: str) -> list[str]: + """ + Extracts the product name and version from a SWID entry. + args: + swid: SWID entry + returns: + list containing product name and version + """ + # Return parsed swid entry as [product, version] list item + # Format of swid is "URI: --" + item = swid[swid.find(":") + 1 :].split("-") + # As some version numbers have leading 'v', it is removed + return [item[0].strip(" "), item[1], item[2].upper().replace("V", "")] + + def parse_ext_ref(self, ext_ref) -> (str | None, str | None, str | None): + """ + Parse external references in an SBOM to extract module information. + + Two passes are made through the external references, giving priority to CPE types, + which will always match the CVE database. + + Args: + - ext_ref (List[List[str]]): List of lists representing external references. + Each inner list contains [category, type, locator]. + + Returns: + - Optional[Tuple[str | None, str | None, str | None]]: A tuple containing the vendor, product, and version + information extracted from the external references, or None if not found. + + """ + decoded = {} + for ref in ext_ref: + ref_type = ref[1] + ref_string = ref[2] + if ref_type == "cpe23Type" and self.is_valid_string("cpe23", ref_string): + decoded["cpe23Type"] = self.decode_cpe23(ref_string) + + elif ref_type == "cpe22Type" and self.is_valid_string("cpe22", ref_string): + decoded["cpe22Type"] = self.decode_cpe22(ref_string) + + elif ref_type == "purl" and self.is_valid_string("purl", ref_string): + decoded["purl"] = self.decode_purl(ref_string) + + # No ext-ref matches, return none + return decoded.get( + "cpe23Type", + decoded.get("cpe22Type", decoded.get("purl", (None, None, None))), + ) + + def decode_cpe22(self, cpe22) -> (str | None, str | None, str | None): + """ + Decode a CPE 2.2 formatted string to extract vendor, product, and version information. + + Args: + - cpe22 (str): CPE 2.2 formatted string. + + Returns: + - Tuple[str | None, str | None, str | None]: A tuple containing the vendor, product, and version + information extracted from the CPE 2.2 string, or None if the information is incomplete. + + """ + + # split on `:` only if it's not escaped + cpe = re.split(r"(? (str | None, str | None, str | None): + """ + Decode a CPE 2.3 formatted string to extract vendor, product, and version information. + + Args: + - cpe23 (str): CPE 2.3 formatted string. + + Returns: + - Tuple[str | None, str | None, str | None]: A tuple containing the vendor, product, and version + information extracted from the CPE 2.3 string, or None if the information is incomplete. + + """ + + # split on `:` only if it's not escaped + cpe = re.split(r"(? (str | None, str | None, str | None): + """ + Decode a Package URL (purl) to extract version information. + + Args: + - purl (str): Package URL (purl) string. + + Returns: + - Tuple[str | None, str | None, str | None]: A tuple containing the vendor (which is always None for purl), + product, and version information extracted from the purl string, or None if the purl is invalid or incomplete. + + """ + vendor = None # Because the vendor and product identifiers in the purl don't always align + product = None # with the CVE DB, only the version is parsed. + version = None + # Process purl identifier + purl_info = PackageURL.from_string(purl).to_dict() + version = purl_info.get("version") + + return [vendor or None, product or None, version or None] + + +if __name__ == "__main__": + + file = sys.argv[1] + sbom = SBOMParse(file) + sbom.scan_file() diff --git a/cve_bin_tool/sbom_detection.py b/cve_bin_tool/sbom_manager/sbom_detection.py similarity index 100% rename from cve_bin_tool/sbom_detection.py rename to cve_bin_tool/sbom_manager/sbom_detection.py diff --git a/cve_bin_tool/sbom_manager/swid_parser.py b/cve_bin_tool/sbom_manager/swid_parser.py deleted file mode 100644 index 8881c1cf13..0000000000 --- a/cve_bin_tool/sbom_manager/swid_parser.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (C) 2021 Anthony Harrison -# SPDX-License-Identifier: GPL-3.0-or-later - -from __future__ import annotations - -import defusedxml.ElementTree as ET - -from cve_bin_tool.validator import validate_swid - - -class SWIDParser: - """ - Class responsible for parsing SWID (Software Identification Tags) XML BOM (Bill of Materials) files. - """ - - def __init__(self, validate: bool = True): - self.validate = validate - - def parse(self, sbom_file: str) -> list[list[str]]: - """parses SWID XML BOM file extracting package name and version""" - modules: list[list[str]] = [] - if self.validate and not validate_swid(sbom_file): - return modules - tree = ET.parse(sbom_file) - # Find root element - root = tree.getroot() - # Extract schema - schema = root.tag[: root.tag.find("}") + 1] - # schema = '{http://standards.iso.org/iso/19770/-2/2015/schema.xsd}' - for component in root.findall(schema + "Link"): - # Only if a component .... - if component.get("rel") == "component": - swid = component.get("href") - if not swid: - raise KeyError(f"Could not find href in {component}") - swid = swid.replace("%20", " ") - modules.append(self.extract(swid)) - - return modules - - def extract(self, swid: str) -> list[str]: - """ - Extracts the product name and version from a SWID entry. - args: - swid: SWID entry - returns: - list containing product name and version - """ - # Return parsed swid entry as [product, version] list item - # Format of swid is "URI: --" - item = swid[swid.find(":") + 1 :].split("-") - # As some version numbers have leading 'v', it is removed - return [item[0].strip(" "), item[1], item[2].upper().replace("V", "")] - - -if __name__ == "__main__": - import sys - - parser = SWIDParser() - file = sys.argv[1] - print(parser.parse(file)) diff --git a/fuzz/fuzz_cyclonedx.py b/fuzz/fuzz_cyclonedx.py index 535e48419f..da4700cfcd 100644 --- a/fuzz/fuzz_cyclonedx.py +++ b/fuzz/fuzz_cyclonedx.py @@ -17,7 +17,7 @@ import fuzz.generated.cyclonedx_pb2 as cyclonedx_pb2 with atheris.instrument_imports(): - from cve_bin_tool.sbom_manager import SBOMManager + from cve_bin_tool.sbom_manager.parse import SBOMParse def TestParseData(data): @@ -50,8 +50,8 @@ def TestParseData(data): with open(file_path, "w") as f: json.dump(json_data, f) - sbom_engine = SBOMManager(file_path, sbom_type="cyclonedx") - sbom_engine.scan_file() + sbom_engine = SBOMParse(file_path, sbom_type="cyclonedx") + sbom_engine.parse_sbom() except SystemExit: return diff --git a/test/test_output_engine.py b/test/test_output_engine.py index a0292458d2..2962b71367 100644 --- a/test/test_output_engine.py +++ b/test/test_output_engine.py @@ -20,6 +20,7 @@ from cve_bin_tool.output_engine import OutputEngine, output_csv, output_json, output_pdf from cve_bin_tool.output_engine.console import output_console from cve_bin_tool.output_engine.util import format_output +from cve_bin_tool.sbom_manager.generate import SBOMGenerate from cve_bin_tool.util import ( CVE, CVEData, @@ -1150,19 +1151,23 @@ def setUp(self) -> None: self.mock_file = tempfile.NamedTemporaryFile("w+", encoding="utf-8") def test_generate_sbom(self): + """Test SBOM generation""" with patch( - "cve_bin_tool.output_engine.SBOMPackage" - ) as mock_sbom_package, patch("cve_bin_tool.output_engine.SBOMRelationship"): + "cve_bin_tool.sbom_manager.generate.SBOMPackage" + ) as mock_sbom_package, patch( + "cve_bin_tool.sbom_manager.generate.SBOMRelationship" + ): mock_package_instance = MagicMock() mock_sbom_package.return_value = mock_package_instance - self.output_engine.generate_sbom( + sbomgen = SBOMGenerate( all_product_data=self.all_product_data, filename="test.sbom", sbom_type="spdx", sbom_format="tag", sbom_root="CVE-SCAN", ) + sbomgen.generate_sbom() # Assertions mock_package_instance.set_name.assert_any_call("CVEBINTOOL-CVE-SCAN") @@ -1206,9 +1211,7 @@ def test_generate_sbom(self): mock_package_instance.get_package.return_value, mock_package_instance.get_package.return_value, } - actual_packages = [ - package for package in self.output_engine.sbom_packages.values() - ] + actual_packages = [package for package in sbomgen.sbom_packages.values()] self.assertEqual(actual_packages, list(expected_packages)) def tearDown(self) -> None: diff --git a/test/test_sbom.py b/test/test_sbom.py index 134eafd327..a33b0b5956 100644 --- a/test/test_sbom.py +++ b/test/test_sbom.py @@ -7,9 +7,9 @@ import pytest from cve_bin_tool.input_engine import TriageData -from cve_bin_tool.sbom_detection import sbom_detection -from cve_bin_tool.sbom_manager import Remarks, SBOMManager -from cve_bin_tool.util import ProductInfo +from cve_bin_tool.sbom_manager.parse import SBOMParse +from cve_bin_tool.sbom_manager.sbom_detection import sbom_detection +from cve_bin_tool.util import ProductInfo, Remarks class TestSBOM: @@ -117,8 +117,8 @@ class TestSBOM: (str(SBOM_PATH / "nonexistent.spdx.json"),), ) def test_nonexistent_file(self, filepath: str): - sbom_engine = SBOMManager(filepath) - assert sbom_engine.scan_file() == {} + sbom_engine = SBOMParse(filepath) + assert sbom_engine.parse_sbom() == {} @pytest.mark.parametrize( "filename, sbom_type", @@ -129,8 +129,8 @@ def test_nonexistent_file(self, filepath: str): ), ) def test_invalid_file(self, filename: str, sbom_type: str): - sbom_engine = SBOMManager(filename, sbom_type) - assert sbom_engine.scan_file() == {} + sbom_engine = SBOMParse(filename, sbom_type) + assert sbom_engine.parse_sbom() == {} @pytest.mark.parametrize( "filename, sbom_type", @@ -140,8 +140,8 @@ def test_invalid_file(self, filename: str, sbom_type: str): ), ) def test_invalid_type(self, filename: str, sbom_type: str): - sbom_engine = SBOMManager(filename, sbom_type) - assert sbom_engine.scan_file() == {} + sbom_engine = SBOMParse(filename, sbom_type) + assert sbom_engine.parse_sbom() == {} @pytest.mark.parametrize( "filename, spdx_parsed_data", @@ -158,8 +158,8 @@ def test_invalid_type(self, filename: str, sbom_type: str): def test_valid_spdx_file( self, filename: str, spdx_parsed_data: dict[ProductInfo, TriageData] ): - sbom_engine = SBOMManager(filename, sbom_type="spdx") - scan_result = sbom_engine.scan_file() + sbom_engine = SBOMParse(filename, sbom_type="spdx") + scan_result = sbom_engine.parse_sbom() for p in spdx_parsed_data: assert p in scan_result @@ -175,8 +175,8 @@ def test_valid_spdx_file( def test_valid_cyclonedx_file( self, filename: str, cyclonedx_parsed_data: dict[ProductInfo, TriageData] ): - sbom_engine = SBOMManager(filename, sbom_type="cyclonedx") - scan_result = sbom_engine.scan_file() + sbom_engine = SBOMParse(filename, sbom_type="cyclonedx") + scan_result = sbom_engine.parse_sbom() for p in cyclonedx_parsed_data: assert p in scan_result @@ -191,8 +191,8 @@ def test_valid_cyclonedx_file( def test_bad_ext_ref_cyclonedx_file( self, filename: str, cyclonedx_parsed_data: dict[ProductInfo, TriageData] ): - sbom_engine = SBOMManager(filename, sbom_type="cyclonedx") - scan_result = sbom_engine.scan_file() + sbom_engine = SBOMParse(filename, sbom_type="cyclonedx") + scan_result = sbom_engine.parse_sbom() for p in cyclonedx_parsed_data: assert p in scan_result.keys() @@ -208,8 +208,8 @@ def test_bad_ext_ref_cyclonedx_file( def test_ext_ref_priority_cyclonedx_file( self, filename: str, cyclonedx_parsed_data: dict[ProductInfo, TriageData] ): - sbom_engine = SBOMManager(filename, sbom_type="cyclonedx") - scan_result = sbom_engine.scan_file() + sbom_engine = SBOMParse(filename, sbom_type="cyclonedx") + scan_result = sbom_engine.parse_sbom() for p in cyclonedx_parsed_data: assert p in scan_result.keys() @@ -220,8 +220,8 @@ def test_ext_ref_priority_cyclonedx_file( def test_valid_swid_file( self, filename: str, swid_parsed_data: dict[ProductInfo, TriageData] ): - sbom_engine = SBOMManager(filename, sbom_type="swid") - scan_result = sbom_engine.scan_file() + sbom_engine = SBOMParse(filename, sbom_type="swid") + scan_result = sbom_engine.parse_sbom() for p in swid_parsed_data: assert p in scan_result @@ -235,8 +235,8 @@ def test_valid_swid_file( def test_common_prefix_split(self, product, version, productinfo, no_existent_file): """Unit Test for common_prefix_split that try to split on hyphen if no vendors are are found and the product has hyphen, here a no_existent_file is used - with sole purpose for creating a SBOMManager instance""" - sbom_engine = SBOMManager(no_existent_file) + with sole purpose for creating a SBOMParse instance""" + sbom_engine = SBOMParse(no_existent_file) scanned_list = sbom_engine.common_prefix_split(product, version) assert productinfo in scanned_list @@ -263,8 +263,8 @@ def test_invalid_xml(self, filename: str, sbom_type: str, validate: bool): if file does not match schema or if xml data is parsed against wrong type of sbom (indicated by validate being set to False) """ - sbom_engine = SBOMManager(filename, sbom_type, validate=validate) - assert sbom_engine.scan_file() == {} + sbom_engine = SBOMParse(filename, sbom_type, validate=validate) + assert sbom_engine.parse_sbom() == {} @pytest.mark.parametrize( "filename, expected_sbom_type",