Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dje/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,14 @@ def test_dataspace_admin_changelist_missing_in_filter_dataspace_value_validation
self.assertEqual(302, response.status_code)
self.assertIn("?e=1", response["Location"])

data = {MissingInFilter.parameter_name: 99}
data = {MissingInFilter.parameter_name: 999999}
response = self.client.get(url, data=data)
self.assertEqual(302, response.status_code)
self.assertIn("?e=1", response["Location"])

data = {
MissingInFilter.parameter_name: self.other_dataspace.id,
DataspaceFilter.parameter_name: 99,
DataspaceFilter.parameter_name: 999999,
}
response = self.client.get(url, data=data)
self.assertEqual(200, response.status_code)
Expand Down
33 changes: 30 additions & 3 deletions product_portfolio/importers.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,33 @@ def import_dependencies(self):
for dependency_data in self.dependencies:
self.import_dependency(dependency_data)

@staticmethod
def import_vulnerability(vulnerability_data, product_package):
from vulnerabilities.models import VulnerabilityAnalysis

package = product_package.package
vulnerabilities = package.create_vulnerabilities(vulnerabilities_data=[vulnerability_data])
if not vulnerabilities:
return

if cdx_vulnerability := vulnerability_data.get("cdx_vulnerability_data"):
if analysis_data := cdx_vulnerability.get("analysis"):
# CycloneDX model uses "response" while the local model uses "response"
if response_value := analysis_data.pop("response", None):
analysis_data["responses"] = response_value

VulnerabilityAnalysis.create_from_data(
user=product_package.dataspace,
data={
"product_package": product_package,
"vulnerability": vulnerabilities[0],
**analysis_data,
},
)

def import_package(self, package_data):
# Vulnerabilities are fetched post import.
package_data.pop("affected_by_vulnerabilities", None)
# Vulnerabilities are assigned after the package creation.
affected_by_vulnerabilities = package_data.pop("affected_by_vulnerabilities", [])

# Check if the package already exists to prevent duplication.
package = self.look_for_existing_package(package_data)
Expand Down Expand Up @@ -730,7 +754,7 @@ def import_package(self, package_data):
return
self.created["package"].append(str(package))

ProductPackage.objects.get_or_create(
product_package, _ = ProductPackage.objects.get_or_create(
product=self.product,
package=package,
dataspace=self.product.dataspace,
Expand All @@ -743,6 +767,9 @@ def import_package(self, package_data):
package_uid = package_data.get("package_uid") or package.uuid
self.package_uid_mapping[package_uid] = package

for vulnerability_data in affected_by_vulnerabilities:
self.import_vulnerability(vulnerability_data, product_package)

def import_dependency(self, dependency_data):
dependency_uid = dependency_data.get("dependency_uid")

Expand Down
51 changes: 51 additions & 0 deletions product_portfolio/tests/test_importers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1255,3 +1255,54 @@ def test_product_portfolio_import_packages_from_scio_importer_duplicate_dependen
self.assertEqual({}, errors)
self.assertEqual(2, self.product1.packages.count())
self.assertEqual(1, self.product1.dependencies.count())

@mock.patch("dejacode_toolkit.scancodeio.ScanCodeIO.fetch_project_dependencies")
@mock.patch("dejacode_toolkit.scancodeio.ScanCodeIO.fetch_project_packages")
def test_product_portfolio_import_packages_from_scio_importer_vex(
self, mock_fetch_packages, mock_fetch_dependencies
):
vulnerability_data = {
"id": "ID-0001", # In CycloneDX the field name is "id"
"summary": "complexity bugs may lead to a denial of service",
"cdx_vulnerability_data": {
"affects": [{"ref": "pkg:maven/abc/abc@1.0"}],
"bom-ref": "BomRef.1",
"description": "complexity bugs may lead to a denial of service",
"analysis": {
"detail": "AAAA",
"justification": "code_not_present",
"response": ["can_not_fix", "update"],
"state": "resolved",
},
},
}
mock_fetch_packages.return_value = [
{
"purl": "pkg:maven/abc/abc@1.0",
"type": "maven",
"namespace": "abc",
"name": "abc",
"version": "1.0",
"affected_by_vulnerabilities": [vulnerability_data],
}
]

importer = ImportPackageFromScanCodeIO(
user=self.super_user,
project_uuid=uuid.uuid4(),
product=self.product1,
)
created, existing, errors = importer.save()
created_package = self.product1.packages.get()
vulnerability = created_package.affected_by_vulnerabilities.get()
self.assertEqual(vulnerability_data["id"], vulnerability.vulnerability_id)
self.assertEqual(vulnerability_data["summary"], vulnerability.summary)

analysis = vulnerability.vulnerability_analyses.get()
self.assertEqual(vulnerability, analysis.vulnerability)
self.assertEqual(self.product1, analysis.product)
self.assertEqual(created_package, analysis.package)
self.assertEqual("resolved", analysis.state)
self.assertEqual("code_not_present", analysis.justification)
self.assertEqual("AAAA", analysis.detail)
self.assertEqual(["can_not_fix", "update"], analysis.responses)
45 changes: 34 additions & 11 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,36 @@ def add_affected(self, instances):

@classmethod
def create_from_data(cls, dataspace, data, validate=False, affecting=None):
"""Create a Vulnerability from provided ``data``."""
instance = super().create_from_data(user=dataspace, data=data, validate=False)

if affecting:
instance.add_affected(affecting)

return instance

@classmethod
def get_or_create_from_data(cls, dataspace, data, validate=False):
"""Get or create a Vulnerability from provided ``data``."""
vulnerability_qs = Vulnerability.objects.scope(dataspace)

# Support for CycloneDX data structure
data = data.copy()
vulnerability_id = data.get("vulnerability_id") or data.pop("id", None)
if not vulnerability_id:
return
data["vulnerability_id"] = vulnerability_id

vulnerability = vulnerability_qs.get_or_none(vulnerability_id=vulnerability_id)
if not vulnerability:
vulnerability = cls.create_from_data(
dataspace=dataspace,
data=data,
validate=validate,
)

return vulnerability

def as_cyclonedx(self, affected_instances, analysis=None):
affects = [
cdx_vulnerability.BomTarget(ref=instance.cyclonedx_bom_ref)
Expand Down Expand Up @@ -465,28 +488,28 @@ def fetch_vulnerabilities(self):
self.create_vulnerabilities(vulnerabilities_data=affected_by_vulnerabilities)

def create_vulnerabilities(self, vulnerabilities_data):
"""Create and assign Vulnerabilities to this instance from provided vulnerabilities_data."""
from component_catalog.models import Package

vulnerabilities = []
vulnerability_qs = Vulnerability.objects.scope(self.dataspace)

for vulnerability_data in vulnerabilities_data:
vulnerability_id = vulnerability_data["vulnerability_id"]
vulnerability = vulnerability_qs.get_or_none(vulnerability_id=vulnerability_id)
if not vulnerability:
vulnerability = Vulnerability.create_from_data(
dataspace=self.dataspace,
data=vulnerability_data,
)
vulnerability = Vulnerability.get_or_create_from_data(
dataspace=self.dataspace,
data=vulnerability_data,
)
vulnerabilities.append(vulnerability)

through_defaults = {"dataspace_id": self.dataspace_id}
self.affected_by_vulnerabilities.add(*vulnerabilities, through_defaults=through_defaults)
self.affected_by_vulnerabilities.add(
*vulnerabilities,
through_defaults={"dataspace_id": self.dataspace_id},
)

self.update_risk_score()
if isinstance(self, Package):
self.productpackages.update_weighted_risk_score()

return vulnerabilities


class VulnerabilityAnalysis(
VulnerabilityAnalysisMixin,
Expand Down
20 changes: 20 additions & 0 deletions vulnerabilities/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,26 @@ def test_vulnerability_model_create_from_data(self):
self.assertEqual(vulnerability_data["resource_url"], vulnerability1.resource_url)
self.assertQuerySetEqual(vulnerability1.affected_packages.all(), [package1])

def test_vulnerability_model_get_or_create_from_data(self):
vulnerability_data = {
"id": "VCID-q4q6-yfng-aaag",
"summary": "In Django 3.2 before 3.2.25, 4.2 before 4.2.11, and 5.0.",
}

vulnerability1 = Vulnerability.get_or_create_from_data(
dataspace=self.dataspace,
data=vulnerability_data,
)
self.assertEqual(vulnerability_data["id"], vulnerability1.vulnerability_id)
self.assertEqual(vulnerability_data["summary"], vulnerability1.summary)

vulnerability_data["vulnerability_id"] = vulnerability_data["id"]
vulnerability2 = Vulnerability.get_or_create_from_data(
dataspace=self.dataspace,
data=vulnerability_data,
)
self.assertEqual(vulnerability1.id, vulnerability2.id)

def test_vulnerability_model_queryset_count_methods(self):
package1 = make_package(self.dataspace)
package2 = make_package(self.dataspace)
Expand Down