Skip to content

Commit

Permalink
Add pipeline to add CVSSv3.1 score for CVEs
Browse files Browse the repository at this point in the history
Signed-off-by: Tushar Goel <[email protected]>
  • Loading branch information
TG1999 committed Jan 13, 2025
1 parent fff12d6 commit c80c7a3
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 0 deletions.
2 changes: 2 additions & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from vulnerabilities.pipelines import enhance_with_kev
from vulnerabilities.pipelines import enhance_with_metasploit
from vulnerabilities.pipelines import flag_ghost_packages
from vulnerabilities.pipelines import add_cvss31_to_CVEs

IMPROVERS_REGISTRY = [
valid_versions.GitHubBasicImprover,
Expand All @@ -43,6 +44,7 @@
compute_package_risk.ComputePackageRiskPipeline,
compute_package_version_rank.ComputeVersionRankPipeline,
collect_commits.CollectFixCommitsPipeline,
add_cvss31_to_CVEs.CVEAdvisoryMappingPipeline,
]

IMPROVERS_REGISTRY = {
Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ class VulnerabilitySeverity(models.Model):
blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity"
)

objects = BaseQuerySet.as_manager()

class Meta:
ordering = ["url", "scoring_system", "value"]

Expand Down
105 changes: 105 additions & 0 deletions vulnerabilities/pipelines/add_cvss31_to_CVEs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import re

from aboutcode.pipeline import LoopProgress
from django.db import transaction

from vulnerabilities import severity_systems
from vulnerabilities.models import Advisory
from vulnerabilities.models import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodePipeline


class CVEAdvisoryMappingPipeline(VulnerableCodePipeline):
"""
Pipeline to map CVEs from VulnerabilitySeverity to corresponding Advisories with CVSS3.1 scores.
"""

pipeline_id = "add_cvssv3.1_to_CVEs"

@classmethod
def steps(cls):
return (cls.process_cve_advisory_mapping,)

def process_cve_advisory_mapping(self):
nvd_severities = (
VulnerabilitySeverity.objects.filter(
url__startswith="https://nvd.nist.gov/vuln/detail/CVE-", scoring_system="cvssv3"
)
.prefetch_related("vulnerabilities")
.distinct()
)

self.log(f"Processing {nvd_severities.count():,d} CVE severity records")

progress = LoopProgress(
total_iterations=nvd_severities.count(),
logger=self.log,
progress_step=5,
)

batch_size = 1000
results = []

for severity in progress.iter(nvd_severities.paginated(per_page=batch_size)):
print(severity.url)
cve_pattern = re.compile(r"(CVE-\d{4}-\d{4,7})").search
cve_match = cve_pattern(severity.url)
if cve_match:
cve_id = cve_match.group()
else:
self.log(f"Could not find CVE ID in URL: {severity.url}")
continue

matching_advisories = Advisory.objects.filter(
aliases=[cve_id],
created_by="nvd_importer",
)

for advisory in matching_advisories:
for reference in advisory.references:
for sev in reference.get("severities", []):
if sev.get("system") == "cvssv3.1":
results.append(
{
"cve_id": cve_id,
"cvss31_score": sev.get("value"),
"cvss31_vector": sev.get("scoring_elements"),
"vulnerabilities": severity.vulnerabilities.all(),
}
)

if results:
print(results)
self._process_batch(results)

self.log(f"Completed processing CVE to Advisory mappings")

def _process_batch(self, results):
"""
Process a batch of results. Transactions are used to ensure data consistency.
"""
self.log(f"Processing batch of {len(results)} mappings")

with transaction.atomic():
for result in results:
self.log(
f"CVE: {result['cve_id']}, "
f"CVSS3.1: {result['cvss31_score']}, "
f"Vector: {result['cvss31_vector']}"
)

for vulnerability in result["vulnerabilities"]:
vuln_severity, _ = VulnerabilitySeverity.objects.update_or_create(
scoring_system=severity_systems.CVSSV31.identifier,
url=f"https://nvd.nist.gov/vuln/detail/{result['cve_id']}",
value=result["cvss31_score"],
scoring_elements=result["cvss31_vector"],
)
vulnerability.severities.add(vuln_severity)
56 changes: 56 additions & 0 deletions vulnerabilities/tests/test_add_cvsssv31.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import unittest
from unittest.mock import Mock
from unittest.mock import patch

from django.test import TestCase

from vulnerabilities.models import Advisory
from vulnerabilities.models import Alias
from vulnerabilities.models import Vulnerability
from vulnerabilities.models import VulnerabilitySeverity
from vulnerabilities.pipelines.add_cvss31_to_CVEs import CVEAdvisoryMappingPipeline
from vulnerabilities.severity_systems import CVSSV3
from vulnerabilities.severity_systems import CVSSV31


class TestCVEAdvisoryMappingPipeline(TestCase):
def setUp(self):
self.pipeline = CVEAdvisoryMappingPipeline()
Advisory.objects.create(
created_by="nvd_importer",
aliases=["CVE-2024-1234"],
references=[
{
"severities": [
{
"system": "cvssv3.1",
"value": "7.5",
"scoring_elements": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
}
],
"url": "https://nvd.nist.gov/vuln/detail/CVE-2024-1234",
}
],
date_collected="2024-09-27T19:38:00Z",
)
vuln = Vulnerability.objects.create(vulnerability_id="CVE-2024-1234")
sev = VulnerabilitySeverity.objects.create(
scoring_system=CVSSV3.identifier,
url="https://nvd.nist.gov/vuln/detail/CVE-2024-1234",
value="7.5",
scoring_elements="CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
)
vuln.severities.add(sev)

def test_process_cve_advisory_mapping_single_record(self):
self.pipeline.process_cve_advisory_mapping()
self.assertEqual(VulnerabilitySeverity.objects.count(), 2)
# check if severity with cvssv3.1 is created
sev = VulnerabilitySeverity.objects.get(scoring_system=CVSSV31.identifier)
self.assertEqual(sev.url, "https://nvd.nist.gov/vuln/detail/CVE-2024-1234")
self.assertEqual(sev.value, "7.5")
self.assertEqual(sev.scoring_elements, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N")
# check if severity is added to existing vulnerability
vuln = Vulnerability.objects.get(vulnerability_id="CVE-2024-1234")
self.assertEqual(vuln.severities.count(), 2)
self.assertIn(sev, vuln.severities.all())

0 comments on commit c80c7a3

Please sign in to comment.