Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize vulnerabilities view #1728

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
import hashlib
import json
import logging
import typing
from contextlib import suppress
from functools import cached_property
from typing import Optional
from itertools import groupby
from operator import attrgetter
from typing import Union

from cvss.exceptions import CVSS2MalformedError
from cvss.exceptions import CVSS3MalformedError
from cvss.exceptions import CVSS4MalformedError
from cwe2.database import Database
from django.contrib.auth import get_user_model
from django.contrib.auth.models import UserManager
Expand Down Expand Up @@ -45,6 +48,7 @@

from aboutcode import hashid
from vulnerabilities import utils
from vulnerabilities.severity_systems import EPSS
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.utils import normalize_purl
from vulnerabilities.utils import purl_to_dict
Expand Down Expand Up @@ -371,6 +375,95 @@ def get_related_purls(self):
"""
return [p.package_url for p in self.packages.distinct().all()]

def aggregate_fixed_and_affected_packages(self):
from vulnerabilities.views import get_purl_version_class

sorted_fixed_by_packages = self.fixed_by_packages.filter(is_ghost=False).order_by(
"type", "namespace", "name", "qualifiers", "subpath"
)

sorted_affected_packages = self.affected_packages.all()

grouped_fixed_by_packages = {
key: list(group)
for key, group in groupby(
sorted_fixed_by_packages,
key=attrgetter("type", "namespace", "name", "qualifiers", "subpath"),
)
}

all_affected_fixed_by_matches = []

for sorted_affected_package in sorted_affected_packages:
affected_fixed_by_matches = {
"affected_package": sorted_affected_package,
"matched_fixed_by_packages": [],
}

# Build the key to find matching group
key = (
sorted_affected_package.type,
sorted_affected_package.namespace,
sorted_affected_package.name,
sorted_affected_package.qualifiers,
sorted_affected_package.subpath,
)

# Get matching group from pre-grouped fixed_by_packages
matching_fixed_packages = grouped_fixed_by_packages.get(key, [])

# Get version classes for comparison
affected_version_class = get_purl_version_class(sorted_affected_package)
affected_version = affected_version_class(sorted_affected_package.version)

# Compare versions and filter valid matches
matched_fixed_by_packages = [
fixed_by_package.purl
for fixed_by_package in matching_fixed_packages
if get_purl_version_class(fixed_by_package)(fixed_by_package.version)
> affected_version
]

affected_fixed_by_matches["matched_fixed_by_packages"] = matched_fixed_by_packages
all_affected_fixed_by_matches.append(affected_fixed_by_matches)
return sorted_fixed_by_packages, sorted_affected_packages, all_affected_fixed_by_matches

def get_severity_vectors_and_values(self):
"""
Collect severity vectors and values, excluding EPSS scoring systems and handling errors gracefully.
"""
severity_vectors = []
severity_values = set()

# Exclude EPSS scoring system
base_severities = self.severities.exclude(scoring_system=EPSS.identifier)

# QuerySet for severities with valid scoring_elements and scoring_system in SCORING_SYSTEMS
valid_scoring_severities = base_severities.filter(
scoring_elements__isnull=False, scoring_system__in=SCORING_SYSTEMS.keys()
)

for severity in valid_scoring_severities:
try:
vector_values = SCORING_SYSTEMS[severity.scoring_system].get(
severity.scoring_elements
)
if vector_values:
severity_vectors.append(vector_values)
except (
CVSS2MalformedError,
CVSS3MalformedError,
CVSS4MalformedError,
NotImplementedError,
) as e:
logging.error(f"CVSSMalformedError for {severity.scoring_elements}: {e}")

valid_value_severities = base_severities.filter(value__isnull=False).exclude(value="")

severity_values.update(valid_value_severities.values_list("value", flat=True))

return severity_vectors, severity_values


class Weakness(models.Model):
"""
Expand Down
Loading
Loading