Skip to content

Commit

Permalink
Merge branch 'main' into fix-migration
Browse files Browse the repository at this point in the history
  • Loading branch information
keshav-space authored Oct 28, 2024
2 parents 868810d + 590c91a commit 8f1f4a9
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 70 deletions.
10 changes: 3 additions & 7 deletions vulnerabilities/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,17 +642,13 @@ def filter_cpe(self, queryset, name, value):
return self.queryset.filter(vulnerabilityreference__reference_id__startswith=cpe).distinct()


class CPEViewSet(viewsets.ReadOnlyModelViewSet):
"""
Lookup for vulnerabilities by CPE (https://nvd.nist.gov/products/cpe)
"""
class CPEViewSet(VulnerabilityViewSet):
"""Lookup for vulnerabilities by CPE (https://nvd.nist.gov/products/cpe)"""

queryset = Vulnerability.objects.filter(
vulnerabilityreference__reference_id__startswith="cpe"
).distinct()
serializer_class = VulnerabilitySerializer
filter_backends = (filters.DjangoFilterBackend,)
throttle_classes = [StaffUserRateThrottle, AnonRateThrottle]

filterset_class = CPEFilterSet

@action(detail=False, methods=["post"])
Expand Down
4 changes: 2 additions & 2 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from vulnerabilities.importers import oss_fuzz
from vulnerabilities.importers import postgresql
from vulnerabilities.importers import project_kb_msr2019
from vulnerabilities.importers import pysec
from vulnerabilities.importers import redhat
from vulnerabilities.importers import retiredotnet
from vulnerabilities.importers import ruby
Expand All @@ -42,9 +41,9 @@
from vulnerabilities.pipelines import npm_importer
from vulnerabilities.pipelines import nvd_importer
from vulnerabilities.pipelines import pypa_importer
from vulnerabilities.pipelines import pysec_importer

IMPORTERS_REGISTRY = [
pysec.PyPIImporter,
alpine_linux.AlpineImporter,
openssl.OpensslImporter,
redhat.RedhatImporter,
Expand Down Expand Up @@ -78,6 +77,7 @@
gitlab_importer.GitLabImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
nvd_importer.NVDImporterPipeline,
pysec_importer.PyPIImporterPipeline,
]

IMPORTERS_REGISTRY = {
Expand Down
44 changes: 0 additions & 44 deletions vulnerabilities/importers/pysec.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Generated by Django 4.2.16 on 2024-10-24 13:51

from django.db import migrations

"""
Update the created_by field on Advisory from the old qualified_name
to the new pipeline_id.
"""


def update_created_by(apps, schema_editor):
from vulnerabilities.pipelines.pysec_importer import PyPIImporterPipeline

Advisory = apps.get_model("vulnerabilities", "Advisory")
Advisory.objects.filter(created_by="vulnerabilities.importers.pysec.PyPIImporter").update(
created_by=PyPIImporterPipeline.pipeline_id
)


def reverse_update_created_by(apps, schema_editor):
from vulnerabilities.pipelines.pysec_importer import PyPIImporterPipeline

Advisory = apps.get_model("vulnerabilities", "Advisory")
Advisory.objects.filter(created_by=PyPIImporterPipeline.pipeline_id).update(
created_by="vulnerabilities.importers.pysec.PyPIImporter"
)


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0073_delete_packagerelatedvulnerability"),
]

operations = [
migrations.RunPython(update_created_by, reverse_code=reverse_update_created_by),
]
4 changes: 2 additions & 2 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def __str__(self):
@property
def is_cpe(self):
"""
Return Trueis this is a CPE reference.
Return True if this is a CPE reference.
"""
return self.reference_id.startswith("cpe")

Expand Down Expand Up @@ -557,7 +557,7 @@ def for_cve(self, cve):

def with_is_vulnerable(self):
"""
Annotate Package with ``with_is_vulnerable`` boolean attribute.
Annotate Package with ``is_vulnerable`` boolean attribute.
"""
return self.annotate(
is_vulnerable=Exists(
Expand Down
2 changes: 1 addition & 1 deletion vulnerabilities/pipelines/pypa_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import logging

from pathlib import Path
from typing import Iterable

Expand Down
66 changes: 66 additions & 0 deletions vulnerabilities/pipelines/pysec_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import logging
from io import BytesIO
from typing import Iterable
from zipfile import ZipFile

import requests

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline


class PyPIImporterPipeline(VulnerableCodeBaseImporterPipeline):
"""Collect advisories from PyPI."""

pipeline_id = "pysec_importer"

license_url = "https://github.com/pypa/advisory-database/blob/main/LICENSE"
url = "https://osv-vulnerabilities.storage.googleapis.com/PyPI/all.zip"
spdx_license_expression = "CC-BY-4.0"
importer_name = "PyPI Importer"

@classmethod
def steps(cls):
return (
cls.fetch_zip,
cls.collect_and_store_advisories,
cls.import_new_advisories,
)

def fetch_zip(self):
self.log(f"Fetching `{self.url}`")
self.advisory_zip = requests.get(self.url).content

def advisories_count(self) -> int:
with ZipFile(BytesIO(self.advisory_zip)) as zip:
advisory_count = sum(1 for file in zip.namelist() if file.startswith("PYSEC-"))
return advisory_count

def collect_advisories(self) -> Iterable[AdvisoryData]:
"""Yield AdvisoryData using a zipped data dump of OSV data"""
from vulnerabilities.importers.osv import parse_advisory_data

with ZipFile(BytesIO(self.advisory_zip)) as zip_file:
for file_name in zip_file.namelist():
if not file_name.startswith("PYSEC-"):
self.log(
f"Unsupported PyPI advisory data file: {file_name}",
level=logging.ERROR,
)
continue
with zip_file.open(file_name) as f:
vul_info = json.load(f)
yield parse_advisory_data(
raw_data=vul_info,
supported_ecosystems=["pypi"],
advisory_url=self.url,
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,51 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import os
from pathlib import Path
from unittest import TestCase

from vulnerabilities.importers.osv import parse_advisory_data
from vulnerabilities.tests.util_tests import VULNERABLECODE_REGEN_TEST_FIXTURES as REGEN
from vulnerabilities.tests.util_tests import check_results_against_json

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/pysec")
TEST_DATA = Path(__file__).parent.parent / "test_data" / "pysec"


class TestPyPIImporter(TestCase):
def test_to_advisories_with_summary(self):
with open(os.path.join(TEST_DATA, "pysec-advisories_with_summary.json")) as f:
with open(TEST_DATA / "pysec-advisories_with_summary.json") as f:
mock_response = json.load(f)
results = parse_advisory_data(mock_response, ["pypi"], "https://test.com").to_dict()

expected_file = os.path.join(TEST_DATA, "pysec-advisories_with_summary-expected.json")
expected_file = TEST_DATA / "pysec-advisories_with_summary-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
regen=REGEN,
)

def test_to_advisories_without_summary(self):
with open(os.path.join(TEST_DATA, "pysec-advisories_without_summary.json")) as f:
with open(TEST_DATA / "pysec-advisories_without_summary.json") as f:
mock_response = json.load(f)

results = parse_advisory_data(mock_response, ["pypi"], "https://test.com").to_dict()

expected_file = os.path.join(TEST_DATA, "pysec-advisories_without_summary-expected.json")
expected_file = TEST_DATA / "pysec-advisories_without_summary-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
regen=REGEN,
)

def test_to_advisories_with_cwe(self):
with open(os.path.join(TEST_DATA, "pysec-advisory_with_cwe.json")) as f:
with open(TEST_DATA / "pysec-advisory_with_cwe.json") as f:
mock_response = json.load(f)

results = parse_advisory_data(
raw_data=mock_response, supported_ecosystems=["pypi"], advisory_url="https://tes.com"
).to_dict()

expected_file = os.path.join(TEST_DATA, "pysec-advisories_with_cwe-expected.json")
expected_file = TEST_DATA / "pysec-advisories_with_cwe-expected.json"
check_results_against_json(
results=results,
expected_file=expected_file,
Expand Down
40 changes: 40 additions & 0 deletions vulnerabilities/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,46 @@ def test_api_response(self):
self.assertEqual(response["count"], 1)


class TestCPEApiWithPackageVulnerabilityRelation(TestCase):
def setUp(self):
self.user = ApiUser.objects.create_api_user(username="[email protected]")
self.auth = f"Token {self.user.auth_token.key}"
self.csrf_client = APIClient(enforce_csrf_checks=True)
self.csrf_client.credentials(HTTP_AUTHORIZATION=self.auth)
self.vulnerability = Vulnerability.objects.create(summary="test")
self.affected_package, _ = Package.objects.get_or_create_from_purl(
purl="pkg:nginx/[email protected]"
)
self.fixed_package, _ = Package.objects.get_or_create_from_purl(purl="pkg:nginx/[email protected]")
AffectedByPackageRelatedVulnerability.objects.create(
vulnerability=self.vulnerability,
created_by="test",
package=self.affected_package,
confidence=100,
)
FixingPackageRelatedVulnerability.objects.create(
vulnerability=self.vulnerability,
created_by="test",
package=self.fixed_package,
confidence=100,
)
for i in range(0, 10):
ref, _ = VulnerabilityReference.objects.get_or_create(
reference_id=f"cpe:/a:nginx:{i}",
url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query=cpe:/a:nginx:{i}",
)
VulnerabilityRelatedReference.objects.create(
reference=ref, vulnerability=self.vulnerability
)

def test_cpe_api(self):
response = self.csrf_client.get("/api/cpes/", format="json")
self.assertEqual(status.HTTP_200_OK, response.status_code)

response_data = response.json()
self.assertEqual(1, response_data["count"])


class AliasApi(TestCase):
def setUp(self):
self.user = ApiUser.objects.create_api_user(username="[email protected]")
Expand Down
Loading

0 comments on commit 8f1f4a9

Please sign in to comment.