diff --git a/vulnerabilities/api.py b/vulnerabilities/api.py
index e622d069a..9e66177fc 100644
--- a/vulnerabilities/api.py
+++ b/vulnerabilities/api.py
@@ -642,17 +642,13 @@ def filter_cpe(self, queryset, name, value):
return self.queryset.filter(vulnerabilityreference__reference_id__startswith=cpe).distinct()
-class CPEViewSet(viewsets.ReadOnlyModelViewSet):
- """
- Lookup for vulnerabilities by CPE (https://nvd.nist.gov/products/cpe)
- """
+class CPEViewSet(VulnerabilityViewSet):
+ """Lookup for vulnerabilities by CPE (https://nvd.nist.gov/products/cpe)"""
queryset = Vulnerability.objects.filter(
vulnerabilityreference__reference_id__startswith="cpe"
).distinct()
- serializer_class = VulnerabilitySerializer
- filter_backends = (filters.DjangoFilterBackend,)
- throttle_classes = [StaffUserRateThrottle, AnonRateThrottle]
+
filterset_class = CPEFilterSet
@action(detail=False, methods=["post"])
diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py
index 512e7a39c..3394dd989 100644
--- a/vulnerabilities/importers/__init__.py
+++ b/vulnerabilities/importers/__init__.py
@@ -26,7 +26,6 @@
from vulnerabilities.importers import oss_fuzz
from vulnerabilities.importers import postgresql
from vulnerabilities.importers import project_kb_msr2019
-from vulnerabilities.importers import pysec
from vulnerabilities.importers import redhat
from vulnerabilities.importers import retiredotnet
from vulnerabilities.importers import ruby
@@ -42,9 +41,9 @@
from vulnerabilities.pipelines import npm_importer
from vulnerabilities.pipelines import nvd_importer
from vulnerabilities.pipelines import pypa_importer
+from vulnerabilities.pipelines import pysec_importer
IMPORTERS_REGISTRY = [
- pysec.PyPIImporter,
alpine_linux.AlpineImporter,
openssl.OpensslImporter,
redhat.RedhatImporter,
@@ -78,6 +77,7 @@
gitlab_importer.GitLabImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
nvd_importer.NVDImporterPipeline,
+ pysec_importer.PyPIImporterPipeline,
]
IMPORTERS_REGISTRY = {
diff --git a/vulnerabilities/importers/pysec.py b/vulnerabilities/importers/pysec.py
deleted file mode 100644
index 84a052f35..000000000
--- a/vulnerabilities/importers/pysec.py
+++ /dev/null
@@ -1,44 +0,0 @@
-#
-# Copyright (c) nexB Inc. and others. All rights reserved.
-# VulnerableCode is a trademark of nexB Inc.
-# SPDX-License-Identifier: Apache-2.0
-# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
-# See https://github.com/aboutcode-org/vulnerablecode for support or download.
-# See https://aboutcode.org for more information about nexB OSS projects.
-#
-import json
-import logging
-from io import BytesIO
-from typing import Iterable
-from zipfile import ZipFile
-
-import requests
-
-from vulnerabilities.importer import AdvisoryData
-from vulnerabilities.importer import Importer
-from vulnerabilities.importers.osv import parse_advisory_data
-
-logger = logging.getLogger(__name__)
-
-
-class PyPIImporter(Importer):
- license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE"
- spdx_license_expression = "CC-BY-4.0"
- importer_name = "PyPI Importer"
-
- def advisory_data(self) -> Iterable[AdvisoryData]:
- """
- Yield AdvisoryData using a zipped data dump of OSV data
- """
- url = "https://osv-vulnerabilities.storage.googleapis.com/PyPI/all.zip"
- response = requests.get(url).content
- with ZipFile(BytesIO(response)) as zip_file:
- for file_name in zip_file.namelist():
- if not file_name.startswith("PYSEC-"):
- logger.error(f"Unsupported PyPI advisory data file: {file_name}")
- continue
- with zip_file.open(file_name) as f:
- vul_info = json.load(f)
- yield parse_advisory_data(
- raw_data=vul_info, supported_ecosystems=["pypi"], advisory_url=url
- )
diff --git a/vulnerabilities/migrations/0074_update_pysec_advisory_created_by.py b/vulnerabilities/migrations/0074_update_pysec_advisory_created_by.py
new file mode 100644
index 000000000..d0e73181a
--- /dev/null
+++ b/vulnerabilities/migrations/0074_update_pysec_advisory_created_by.py
@@ -0,0 +1,37 @@
+# Generated by Django 4.2.16 on 2024-10-24 13:51
+
+from django.db import migrations
+
+"""
+Update the created_by field on Advisory from the old qualified_name
+to the new pipeline_id.
+"""
+
+
+def update_created_by(apps, schema_editor):
+ from vulnerabilities.pipelines.pysec_importer import PyPIImporterPipeline
+
+ Advisory = apps.get_model("vulnerabilities", "Advisory")
+ Advisory.objects.filter(created_by="vulnerabilities.importers.pysec.PyPIImporter").update(
+ created_by=PyPIImporterPipeline.pipeline_id
+ )
+
+
+def reverse_update_created_by(apps, schema_editor):
+ from vulnerabilities.pipelines.pysec_importer import PyPIImporterPipeline
+
+ Advisory = apps.get_model("vulnerabilities", "Advisory")
+ Advisory.objects.filter(created_by=PyPIImporterPipeline.pipeline_id).update(
+ created_by="vulnerabilities.importers.pysec.PyPIImporter"
+ )
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("vulnerabilities", "0073_delete_packagerelatedvulnerability"),
+ ]
+
+ operations = [
+ migrations.RunPython(update_created_by, reverse_code=reverse_update_created_by),
+ ]
diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py
index 2fea8e200..47d57c37f 100644
--- a/vulnerabilities/models.py
+++ b/vulnerabilities/models.py
@@ -406,7 +406,7 @@ def __str__(self):
@property
def is_cpe(self):
"""
- Return Trueis this is a CPE reference.
+ Return True if this is a CPE reference.
"""
return self.reference_id.startswith("cpe")
@@ -557,7 +557,7 @@ def for_cve(self, cve):
def with_is_vulnerable(self):
"""
- Annotate Package with ``with_is_vulnerable`` boolean attribute.
+ Annotate Package with ``is_vulnerable`` boolean attribute.
"""
return self.annotate(
is_vulnerable=Exists(
diff --git a/vulnerabilities/pipelines/pypa_importer.py b/vulnerabilities/pipelines/pypa_importer.py
index 68d4615b9..bdda50c94 100644
--- a/vulnerabilities/pipelines/pypa_importer.py
+++ b/vulnerabilities/pipelines/pypa_importer.py
@@ -6,7 +6,7 @@
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
-import logging
+
from pathlib import Path
from typing import Iterable
diff --git a/vulnerabilities/pipelines/pysec_importer.py b/vulnerabilities/pipelines/pysec_importer.py
new file mode 100644
index 000000000..32a9fd896
--- /dev/null
+++ b/vulnerabilities/pipelines/pysec_importer.py
@@ -0,0 +1,66 @@
+#
+# Copyright (c) nexB Inc. and others. All rights reserved.
+# VulnerableCode is a trademark of nexB Inc.
+# SPDX-License-Identifier: Apache-2.0
+# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
+# See https://github.com/aboutcode-org/vulnerablecode for support or download.
+# See https://aboutcode.org for more information about nexB OSS projects.
+#
+import json
+import logging
+from io import BytesIO
+from typing import Iterable
+from zipfile import ZipFile
+
+import requests
+
+from vulnerabilities.importer import AdvisoryData
+from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
+
+
+class PyPIImporterPipeline(VulnerableCodeBaseImporterPipeline):
+ """Collect advisories from PyPI."""
+
+ pipeline_id = "pysec_importer"
+
+ license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE"
+ url = "https://osv-vulnerabilities.storage.googleapis.com/PyPI/all.zip"
+ spdx_license_expression = "CC-BY-4.0"
+ importer_name = "PyPI Importer"
+
+ @classmethod
+ def steps(cls):
+ return (
+ cls.fetch_zip,
+ cls.collect_and_store_advisories,
+ cls.import_new_advisories,
+ )
+
+ def fetch_zip(self):
+ self.log(f"Fetching `{self.url}`")
+ self.advisory_zip = requests.get(self.url).content
+
+ def advisories_count(self) -> int:
+ with ZipFile(BytesIO(self.advisory_zip)) as zip:
+ advisory_count = sum(1 for file in zip.namelist() if file.startswith("PYSEC-"))
+ return advisory_count
+
+ def collect_advisories(self) -> Iterable[AdvisoryData]:
+ """Yield AdvisoryData using a zipped data dump of OSV data"""
+ from vulnerabilities.importers.osv import parse_advisory_data
+
+ with ZipFile(BytesIO(self.advisory_zip)) as zip_file:
+ for file_name in zip_file.namelist():
+ if not file_name.startswith("PYSEC-"):
+ self.log(
+ f"Unsupported PyPI advisory data file: {file_name}",
+ level=logging.ERROR,
+ )
+ continue
+ with zip_file.open(file_name) as f:
+ vul_info = json.load(f)
+ yield parse_advisory_data(
+ raw_data=vul_info,
+ supported_ecosystems=["pypi"],
+ advisory_url=self.url,
+ )
diff --git a/vulnerabilities/templates/includes/pagination.html b/vulnerabilities/templates/includes/pagination.html
index b57e83850..a2b084d0a 100644
--- a/vulnerabilities/templates/includes/pagination.html
+++ b/vulnerabilities/templates/includes/pagination.html
@@ -1,6 +1,7 @@
{% if is_paginated %}
diff --git a/vulnerabilities/templates/packages.html b/vulnerabilities/templates/packages.html
index 9e3e23eb2..e5e40d37c 100644
--- a/vulnerabilities/templates/packages.html
+++ b/vulnerabilities/templates/packages.html
@@ -21,7 +21,16 @@
+<<<<<<< HEAD
{{ pagination_form.page_size }}
+=======
+
+>>>>>>> 939aba34f0d5961044c4a244f02e183a7be7b2ca
{% if is_paginated %}
@@ -85,6 +94,18 @@
{% endif %}
{% endif %}
+<<<<<<< HEAD
{% endblock %}
+=======
+
+{% endblock %}
+>>>>>>> 939aba34f0d5961044c4a244f02e183a7be7b2ca
diff --git a/vulnerabilities/templates/vulnerabilities.html b/vulnerabilities/templates/vulnerabilities.html
index d7d2484aa..9663fd20e 100644
--- a/vulnerabilities/templates/vulnerabilities.html
+++ b/vulnerabilities/templates/vulnerabilities.html
@@ -21,12 +21,26 @@
+<<<<<<< HEAD
{{ pagination_form.page_size }}
{% if is_paginated %}
{% include 'includes/pagination.html' with page_obj=page_obj %}
{% endif %}
+=======
+
+
+
+ {% if is_paginated %}
+ {% include 'includes/pagination.html' with page_obj=page_obj %}
+ {% endif %}
+>>>>>>> 939aba34f0d5961044c4a244f02e183a7be7b2ca
@@ -80,6 +94,19 @@
{% endif %}
{% endif %}
+<<<<<<< HEAD
{% endblock %}
+=======
+
+
+{% endblock %}
+>>>>>>> 939aba34f0d5961044c4a244f02e183a7be7b2ca
diff --git a/vulnerabilities/tests/test_pysec.py b/vulnerabilities/tests/pipelines/test_pysec_importer_pipeline.py
similarity index 72%
rename from vulnerabilities/tests/test_pysec.py
rename to vulnerabilities/tests/pipelines/test_pysec_importer_pipeline.py
index d8e6e1c5b..b596c8f1d 100644
--- a/vulnerabilities/tests/test_pysec.py
+++ b/vulnerabilities/tests/pipelines/test_pysec_importer_pipeline.py
@@ -7,24 +7,23 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
-import os
+from pathlib import Path
from unittest import TestCase
from vulnerabilities.importers.osv import parse_advisory_data
from vulnerabilities.tests.util_tests import VULNERABLECODE_REGEN_TEST_FIXTURES as REGEN
from vulnerabilities.tests.util_tests import check_results_against_json
-BASE_DIR = os.path.dirname(os.path.abspath(__file__))
-TEST_DATA = os.path.join(BASE_DIR, "test_data/pysec")
+TEST_DATA = Path(__file__).parent.parent / "test_data" / "pysec"
class TestPyPIImporter(TestCase):
def test_to_advisories_with_summary(self):
- with open(os.path.join(TEST_DATA, "pysec-advisories_with_summary.json")) as f:
+ with open(TEST_DATA / "pysec-advisories_with_summary.json") as f:
mock_response = json.load(f)
results = parse_advisory_data(mock_response, ["pypi"], "https://test.com").to_dict()
- expected_file = os.path.join(TEST_DATA, "pysec-advisories_with_summary-expected.json")
+ expected_file = TEST_DATA / "pysec-advisories_with_summary-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
@@ -32,12 +31,12 @@ def test_to_advisories_with_summary(self):
)
def test_to_advisories_without_summary(self):
- with open(os.path.join(TEST_DATA, "pysec-advisories_without_summary.json")) as f:
+ with open(TEST_DATA / "pysec-advisories_without_summary.json") as f:
mock_response = json.load(f)
results = parse_advisory_data(mock_response, ["pypi"], "https://test.com").to_dict()
- expected_file = os.path.join(TEST_DATA, "pysec-advisories_without_summary-expected.json")
+ expected_file = TEST_DATA / "pysec-advisories_without_summary-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
@@ -45,14 +44,14 @@ def test_to_advisories_without_summary(self):
)
def test_to_advisories_with_cwe(self):
- with open(os.path.join(TEST_DATA, "pysec-advisory_with_cwe.json")) as f:
+ with open(TEST_DATA / "pysec-advisory_with_cwe.json") as f:
mock_response = json.load(f)
results = parse_advisory_data(
raw_data=mock_response, supported_ecosystems=["pypi"], advisory_url="https://tes.com"
).to_dict()
- expected_file = os.path.join(TEST_DATA, "pysec-advisories_with_cwe-expected.json")
+ expected_file = TEST_DATA / "pysec-advisories_with_cwe-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
diff --git a/vulnerabilities/tests/test_api.py b/vulnerabilities/tests/test_api.py
index af05e8be5..7147e849e 100644
--- a/vulnerabilities/tests/test_api.py
+++ b/vulnerabilities/tests/test_api.py
@@ -705,6 +705,46 @@ def test_api_response(self):
self.assertEqual(response["count"], 1)
+class TestCPEApiWithPackageVulnerabilityRelation(TestCase):
+ def setUp(self):
+ self.user = ApiUser.objects.create_api_user(username="e@mail.com")
+ self.auth = f"Token {self.user.auth_token.key}"
+ self.csrf_client = APIClient(enforce_csrf_checks=True)
+ self.csrf_client.credentials(HTTP_AUTHORIZATION=self.auth)
+ self.vulnerability = Vulnerability.objects.create(summary="test")
+ self.affected_package, _ = Package.objects.get_or_create_from_purl(
+ purl="pkg:nginx/nginx@v3.4"
+ )
+ self.fixed_package, _ = Package.objects.get_or_create_from_purl(purl="pkg:nginx/nginx@v4.0")
+ AffectedByPackageRelatedVulnerability.objects.create(
+ vulnerability=self.vulnerability,
+ created_by="test",
+ package=self.affected_package,
+ confidence=100,
+ )
+ FixingPackageRelatedVulnerability.objects.create(
+ vulnerability=self.vulnerability,
+ created_by="test",
+ package=self.fixed_package,
+ confidence=100,
+ )
+ for i in range(0, 10):
+ ref, _ = VulnerabilityReference.objects.get_or_create(
+ reference_id=f"cpe:/a:nginx:{i}",
+ url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query=cpe:/a:nginx:{i}",
+ )
+ VulnerabilityRelatedReference.objects.create(
+ reference=ref, vulnerability=self.vulnerability
+ )
+
+ def test_cpe_api(self):
+ response = self.csrf_client.get("/api/cpes/", format="json")
+ self.assertEqual(status.HTTP_200_OK, response.status_code)
+
+ response_data = response.json()
+ self.assertEqual(1, response_data["count"])
+
+
class AliasApi(TestCase):
def setUp(self):
self.user = ApiUser.objects.create_api_user(username="e@mail.com")
diff --git a/vulnerabilities/tests/test_data_migrations.py b/vulnerabilities/tests/test_data_migrations.py
index df51b12bb..046c86ce5 100644
--- a/vulnerabilities/tests/test_data_migrations.py
+++ b/vulnerabilities/tests/test_data_migrations.py
@@ -672,7 +672,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)
- def test_removal_of_duped_purls(self):
+ def test_update_npm_pypa_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()
@@ -714,7 +714,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)
- def test_removal_of_duped_purls(self):
+ def test_update_nginx_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()
@@ -753,7 +753,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)
- def test_removal_of_duped_purls(self):
+ def test_update_gitlab_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()
@@ -794,7 +794,7 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)
- def test_removal_of_duped_purls(self):
+ def test_update_github_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()
@@ -835,9 +835,48 @@ def setUpBeforeMigration(self, apps):
date_collected=timezone.now(),
)
- def test_removal_of_duped_purls(self):
+ def test_update_nvd_created_by_field(self):
Advisory = apps.get_model("vulnerabilities", "Advisory")
adv = Advisory.objects.all()
assert adv.filter(created_by="vulnerabilities.importers.nvd.NVDImporter").count() == 0
assert adv.filter(created_by="nvd_importer").count() == 1
+
+
+class TestUpdatePysecAdvisoryCreatedByField(TestMigrations):
+ app_name = "vulnerabilities"
+ migrate_from = "0073_delete_packagerelatedvulnerability"
+ migrate_to = "0074_update_pysec_advisory_created_by"
+
+ advisory_data1 = AdvisoryData(
+ aliases=["CVE-2020-13371337"],
+ summary="vulnerability description here",
+ affected_packages=[
+ AffectedPackage(
+ package=PackageURL(type="pypi", name="foobar"),
+ affected_version_range=VersionRange.from_string("vers:pypi/>=1.0.0|<=2.0.0"),
+ )
+ ],
+ references=[Reference(url="https://example.com/with/more/info/CVE-2020-13371337")],
+ date_published=timezone.now(),
+ url="https://test.com",
+ )
+
+ def setUpBeforeMigration(self, apps):
+ Advisory = apps.get_model("vulnerabilities", "Advisory")
+ adv1 = Advisory.objects.create(
+ aliases=self.advisory_data1.aliases,
+ summary=self.advisory_data1.summary,
+ affected_packages=[pkg.to_dict() for pkg in self.advisory_data1.affected_packages],
+ references=[ref.to_dict() for ref in self.advisory_data1.references],
+ url=self.advisory_data1.url,
+ created_by="vulnerabilities.importers.pysec.PyPIImporter",
+ date_collected=timezone.now(),
+ )
+
+ def test_update_pysec_created_by_field(self):
+ Advisory = apps.get_model("vulnerabilities", "Advisory")
+ adv = Advisory.objects.all()
+
+ assert adv.filter(created_by="vulnerabilities.importers.pysec.PyPIImporter").count() == 0
+ assert adv.filter(created_by="pysec_importer").count() == 1
diff --git a/vulnerablecode/settings.py b/vulnerablecode/settings.py
index 1ae664d82..0e545e0f2 100644
--- a/vulnerablecode/settings.py
+++ b/vulnerablecode/settings.py
@@ -349,3 +349,9 @@
},
},
}
+
+if DEBUG:
+ LOGGING["django"] = {
+ "handlers": ["console"],
+ "level": "ERROR",
+ }