Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add importer for vmware/photon/wiki/Security-Advisories vulnerabilities advisories #1683

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Empty file added .qdrant-initialized
Empty file.
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from vulnerabilities.pipelines import nvd_importer
from vulnerabilities.pipelines import pypa_importer
from vulnerabilities.pipelines import pysec_importer
from vulnerabilities.pipelines import vmwarephoton_importer

IMPORTERS_REGISTRY = [
alpine_linux.AlpineImporter,
Expand Down Expand Up @@ -78,6 +79,7 @@
github_importer.GitHubAPIImporterPipeline,
nvd_importer.NVDImporterPipeline,
pysec_importer.PyPIImporterPipeline,
vmwarephoton_importer.VMWAREPHOTONImporterPipeline,
]

IMPORTERS_REGISTRY = {
Expand Down
153 changes: 153 additions & 0 deletions vulnerabilities/pipelines/vmwarephoton_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import gzip
import json
import logging
from datetime import date
from traceback import format_exc as traceback_format_exc
from typing import Iterable

import attr
import requests
from dateutil import parser as dateparser
from packageurl import PackageURL
from univers.version_range import VersionRange
from univers.versions import GenericVersion # Import GenericVersion

from vulnerabilities import severity_systems
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_item


class VMWAREPHOTONImporterPipeline(VulnerableCodeBaseImporterPipeline):
"""Collect advisories from VMWARE_PHOTON."""

'''
EXAMPLE:
{
"cve_id": "CVE-2020-11979",
"pkg": "apache-ant",
"cve_score": 7.5,
"aff_ver": "all versions before 1.10.8-2.ph1 are vulnerable",
"res_ver": "1.10.8-2.ph1"
}
'''

pipeline_id = "vmwarephoton_importer"
repo_url = "https://github.com/vmware/photon/wiki/Security-Advisories"
spdx_license_expression = "CC BY-SA 4.0"
license_url = "https://creativecommons.org/licenses/by-sa/4.0/"

importer_name = "PHOTON Importer"

urls = [
"https://packages.vmware.com/photon/photon_cve_metadata/cve_data_photon1.0.json",
"https://packages.vmware.com/photon/photon_cve_metadata/cve_data_photon2.0.json",
"https://packages.vmware.com/photon/photon_cve_metadata/cve_data_photon3.0.json",
"https://packages.vmware.com/photon/photon_cve_metadata/cve_data_photon4.0.json",
"https://packages.vmware.com/photon/photon_cve_metadata/cve_data_photon5.0.json",
]

def advisories_count(self):
advisory_count = 0
for url in self.urls:
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
advisory_count += len(data)
except requests.HTTPError as http_err:
self.log(
f"HTTP error occurred while fetching {url}: {http_err} \n {traceback_format_exc()}",
level=logging.ERROR,
)
except requests.RequestException as req_err:
self.log(
f"Request exception occurred while fetching {url}: {req_err} \n {traceback_format_exc()}",
level=logging.ERROR,
)
except Exception as e:
self.log(f"Unexpected error: {e} \n {traceback_format_exc()}", level=logging.ERROR)
return advisory_count

def collect_advisories(self) -> Iterable[AdvisoryData]:
advisories = []
for url in self.urls:
try:
response = requests.get(url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why fetch these twice? both in advisories_count and here.

response.raise_for_status()
advisories_data = response.json() # Fetch the data from the API
advisories.extend(self.to_advisory(advisories_data)) # Collect advisories for each URL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the right approach, advisory should always be yielded. No need to compute all advisory at once. See the other importer pipelines like https://github.com/aboutcode-org/vulnerablecode/blob/main/vulnerabilities/pipelines/nginx_importer.py

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thankyou @keshav-space for the suggestions. I will look into them and make the changes. Thankyou.

except requests.HTTPError as http_err:
self.log(
f"HTTP error occurred while fetching {url}: {http_err} \n {traceback_format_exc()}",
level=logging.ERROR,
)
except requests.RequestException as req_err:
self.log(
f"Request exception occurred while fetching {url}: {req_err} \n {traceback_format_exc()}",
level=logging.ERROR,
)
except Exception as e:
self.log(f"Unexpected error: {e} \n {traceback_format_exc()}", level=logging.ERROR)
return advisories

def to_advisory(self, data) -> Iterable[AdvisoryData]:
advisories = []
for cve in data:
cve_id = cve.get("cve_id")
pkg_name = cve.get("pkg")
aff_ver = cve.get("aff_ver")
rev_ver = cve.get("res_ver")
url = cve.get("url", "https://github.com/vmware/photon/wiki/Security-Advisories") # Default URL

# Validate required fields and skip invalid entries
if not cve_id or not pkg_name or not aff_ver or not rev_ver:
logging.warning(f"Skipping advisory due to missing fields: {cve}")
continue

try:
# Create a PackageURL object
pkg = PackageURL(name=pkg_name, type="generic")

# Use GenericVersion to handle non-semver versions
try:
fixed_version = GenericVersion(rev_ver)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add support for PhotonVersion, or could we instead use RpmVersion?

except ValueError as e:
logging.warning(f"Skipping advisory {cve_id} due to invalid version: {rev_ver} - {e}")
continue

affected_version_range = None
if "all versions before" in aff_ver.lower():
affected_version_range = f"vers:generic/<{rev_ver}"

# Handle version range errors
try:
affected_packages = [
AffectedPackage(
package=pkg,
affected_version_range=VersionRange.from_string(affected_version_range)
if affected_version_range
else None,
fixed_version=fixed_version,
)
]
except ValueError as ve:
logging.warning(f"Skipping advisory {cve_id} due to invalid version range: {aff_ver} - {ve}")
continue

advisories.append(
AdvisoryData(
aliases=[cve_id],
affected_packages=affected_packages,
url=url, # Ensure a valid URL is provided
)
)
except Exception as e:
logging.error(f"Error processing advisory {cve_id}: {e}")
continue

return advisories