From 94e5a2db4e7d765172636fd2fbaad02a1e23a36b Mon Sep 17 00:00:00 2001 From: Meet Soni <92802561+inosmeet@users.noreply.github.com> Date: Wed, 12 Jun 2024 00:11:00 +0530 Subject: [PATCH] feat: add purl2cpe as a data source (#4179) * feat: added purl2cpe as a data source * feat: Separated data source integration from previous PR Signed-off-by: Meet Soni --- cve_bin_tool/cli.py | 5 ++ cve_bin_tool/cvedb.py | 2 + cve_bin_tool/data_sources/purl2cpe_source.py | 76 ++++++++++++++++++++ cve_bin_tool/parsers/python.py | 8 +-- 4 files changed, 86 insertions(+), 5 deletions(-) create mode 100644 cve_bin_tool/data_sources/purl2cpe_source.py diff --git a/cve_bin_tool/cli.py b/cve_bin_tool/cli.py index 999033b0ca..ddedd7714b 100644 --- a/cve_bin_tool/cli.py +++ b/cve_bin_tool/cli.py @@ -48,6 +48,7 @@ gad_source, nvd_source, osv_source, + purl2cpe_source, redhat_source, ) from cve_bin_tool.error_handler import ( @@ -722,6 +723,10 @@ def main(argv=None): source_curl = curl_source.Curl_Source() enabled_sources.append(source_curl) + if "PURL2CPE" not in disabled_sources: + source_purl2cpe = purl2cpe_source.PURL2CPE_Source() + enabled_sources.append(source_purl2cpe) + if "NVD" not in disabled_sources: source_nvd = nvd_source.NVD_Source( nvd_type=nvd_type, diff --git a/cve_bin_tool/cvedb.py b/cve_bin_tool/cvedb.py index 8d21eca46a..1451eaa996 100644 --- a/cve_bin_tool/cvedb.py +++ b/cve_bin_tool/cvedb.py @@ -28,6 +28,7 @@ gad_source, nvd_source, osv_source, + purl2cpe_source, ) from cve_bin_tool.error_handler import ERROR_CODES, CVEDBError, ErrorMode, SigningError from cve_bin_tool.fetch_json_db import Fetch_JSON_DB @@ -57,6 +58,7 @@ class CVEDB: curl_source.Curl_Source, osv_source.OSV_Source, gad_source.GAD_Source, + purl2cpe_source.PURL2CPE_Source, nvd_source.NVD_Source, # last to avoid data overwrites ] diff --git a/cve_bin_tool/data_sources/purl2cpe_source.py b/cve_bin_tool/data_sources/purl2cpe_source.py new file mode 100644 index 0000000000..6fa17be6b5 --- /dev/null +++ b/cve_bin_tool/data_sources/purl2cpe_source.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +import zipfile +from io import BytesIO +from pathlib import Path + +import aiohttp + +from cve_bin_tool.data_sources import DISK_LOCATION_DEFAULT, Data_Source +from cve_bin_tool.error_handler import ErrorMode +from cve_bin_tool.log import LOGGER +from cve_bin_tool.version import HTTP_HEADERS + + +class PURL2CPE_Source(Data_Source): + """Class to retrieve purl-cpe mapping database (PURL2CPE)""" + + SOURCE = "PURL2CPE" + CACHEDIR = DISK_LOCATION_DEFAULT + LOGGER = LOGGER.getChild("CVEDB") + PURL2CPE_URL = "https://github.com/scanoss/purl2cpe/raw/main/purl2cpe.db.zip" + + def __init__( + self, error_mode: ErrorMode = ErrorMode.TruncTrace, incremental_update=False + ): + self.cachedir = self.CACHEDIR + self.purl2cpe_path = str(Path(self.cachedir) / "purl2cpe") + self.source_name = self.SOURCE + self.error_mode = error_mode + self.incremental_update = incremental_update + self.purl2cpe_url = self.PURL2CPE_URL + self.session = None + + async def fetch_cves(self): + """Fetches PURL2CPE database and places it in purl2cpe_path.""" + LOGGER.info("Getting PURL2CPE data...") + + if not Path(self.purl2cpe_path).exists(): + Path(self.purl2cpe_path).mkdir() + + if not self.session: + connector = aiohttp.TCPConnector(limit_per_host=10) + self.session = aiohttp.ClientSession( + connector=connector, headers=HTTP_HEADERS, trust_env=True + ) + + try: + response = await self.session.get(self.purl2cpe_url) + if response.status == 200: + data = await response.read() + with zipfile.ZipFile(BytesIO(data), "r") as zip_ref: + zip_ref.extractall(self.purl2cpe_path) + else: + LOGGER.debug(f"Failed to download file. Status code: {response.status}") + + except Exception as e: + LOGGER.debug(f"Error fetching PURL2CPE data: {e}") + + await self.session.close() + self.session = None + + async def get_cve_data(self): + """Fetches PURL2CPE Database.""" + # skip if connection fails + try: + await self.fetch_cves() + except Exception as e: + LOGGER.debug(f"Error while fetching PURL2CPE Data: {e}") + LOGGER.error("Unable to fetch PURL2CPE Data, skipping PURL2CPE.") + if self.session is not None: + await self.session.close() + return (list(), list()), self.source_name + + if self.session is not None: + await self.session.close() + return (list(), list()), self.source_name diff --git a/cve_bin_tool/parsers/python.py b/cve_bin_tool/parsers/python.py index 751fc07855..4dfe1b72ef 100644 --- a/cve_bin_tool/parsers/python.py +++ b/cve_bin_tool/parsers/python.py @@ -22,13 +22,12 @@ class PythonRequirementsParser(Parser): def __init__(self, cve_db, logger): """Initialize the python requirements file parser.""" - self.purl_pkg_type = "pypi" super().__init__(cve_db, logger) + self.purl_pkg_type = "pypi" def generate_purl(self, product, vendor, qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() - vendor = "UNKNOWN" if not product: return None @@ -98,6 +97,7 @@ def run_checker(self, filename): product = line["metadata"]["name"] version = line["metadata"]["version"] vendor = self.find_vendor(product, version) + if vendor is not None: yield from vendor self.logger.debug(f"Done scanning file: {self.filename}") @@ -112,13 +112,12 @@ class PythonParser(Parser): def __init__(self, cve_db, logger): """Initialize the python package metadata parser.""" - self.purl_pkg_type = "pypi" super().__init__(cve_db, logger) + self.purl_pkg_type = "pypi" def generate_purl(self, product, vendor, qualifier={}, subpath=None): """Generates PURL after normalizing all components.""" product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower() - vendor = "UNKNOWN" if not product: return None @@ -154,7 +153,6 @@ def run_checker(self, filename): yield ScanInfo( ProductInfo(vendor, product, version, location), file_path ) - # There are packages with a METADATA file in them containing different data from what the tool expects except AttributeError: self.logger.debug(f"{filename} is an invalid METADATA/PKG-INFO")