Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions dojo/db_migrations/0265_finding_component_purl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("dojo", "0264_alter_url_identity_hash_alter_urlevent_identity_hash"),
]

operations = [
migrations.AddField(
model_name="finding",
name="component_purl",
field=models.CharField(
blank=True,
help_text="Package URL (PURL) of the affected component (e.g. pkg:pypi/requests@2.25.1).",
max_length=500,
null=True,
verbose_name="Component PURL",
),
),
]
5 changes: 5 additions & 0 deletions dojo/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2704,6 +2704,11 @@ class Finding(BaseModel):
max_length=100,
verbose_name=_("Component version"),
help_text=_("Version of the affected component."))
component_purl = models.CharField(null=True,
blank=True,
max_length=500,
verbose_name=_("Component PURL"),
help_text=_("Package URL (PURL) of the affected component (e.g. pkg:pypi/requests@2.25.1)."))
found_by = models.ManyToManyField(Test_Type,
editable=False,
verbose_name=_("Found by"),
Expand Down
2 changes: 2 additions & 0 deletions dojo/tools/cyclonedx/json_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def _get_findings_json(self, file, test):
)
if not description:
description = "Description was not provided."
component_purl = components.get(reference, {}).get("purl")
finding = Finding(
title=f"{component_name}:{component_version} | {vulnerability.get('id')}",
test=test,
Expand All @@ -82,6 +83,7 @@ def _get_findings_json(self, file, test):
mitigation=vulnerability.get("recommendation", ""),
component_name=component_name,
component_version=component_version,
component_purl=component_purl,
references=references,
static_finding=True,
dynamic_finding=False,
Expand Down
1 change: 1 addition & 0 deletions dojo/tools/cyclonedx/xml_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def manage_vulnerability_legacy(
references=references,
component_name=component_name,
component_version=component_version,
component_purl=component_purl,
vuln_id_from_tool=vuln_id,
nb_occurences=1,
)
Expand Down
4 changes: 4 additions & 0 deletions dojo/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from rest_framework.authtoken import views as tokenviews
from rest_framework.routers import DefaultRouter

from dojo.vex.api.views import VexCycloneDxEngagementView, VexCycloneDxProductView

from dojo import views
from dojo.announcement.urls import urlpatterns as announcement_urls
from dojo.api_v2.views import (
Expand Down Expand Up @@ -242,6 +244,8 @@
# Django Rest Framework API v2
re_path(r"^{}api/v2/".format(get_system_setting("url_prefix")), include(v2_api.urls)),
re_path(r"^{}api/v2/user_profile/".format(get_system_setting("url_prefix")), UserProfileView.as_view(), name="user_profile"),
re_path(r"^{}api/v2/vex/cyclonedx/product/(?P<pk>\d+)/".format(get_system_setting("url_prefix")), VexCycloneDxProductView.as_view(), name="vex-cyclonedx-product"),
re_path(r"^{}api/v2/vex/cyclonedx/engagement/(?P<pk>\d+)/".format(get_system_setting("url_prefix")), VexCycloneDxEngagementView.as_view(), name="vex-cyclonedx-engagement"),
]

if hasattr(settings, "API_TOKENS_ENABLED") and hasattr(settings, "API_TOKEN_AUTH_ENDPOINT_ENABLED"):
Expand Down
Empty file added dojo/vex/__init__.py
Empty file.
Empty file added dojo/vex/api/__init__.py
Empty file.
144 changes: 144 additions & 0 deletions dojo/vex/api/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""CycloneDX VEX export — serializes Dojo triage decisions as a VEX document."""
import datetime
import json
import uuid

from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView

from dojo.authorization.authorization_decorators import user_is_authorized
from dojo.models import Engagement, Finding, Product
from dojo.authorization.roles_permissions import Permissions


CYCLONEDX_SPEC_VERSION = "1.4"
DOJO_TOOL_NAME = "DefectDojo"


def _finding_to_vex_state(finding: Finding) -> tuple[str, list[str]]:
"""Return (analysisState, responses[]) for a finding.

Priority: false_p > risk_accepted > mitigated > active+verified > in_triage
"""
if finding.false_p:
return "false_positive", []
if finding.risk_accepted:
return "exploitable", ["will_not_fix"]
if finding.is_mitigated and not finding.active:
return "resolved", []
if finding.active and finding.verified:
return "exploitable", []
return "in_triage", []


def _finding_to_vex_entry(finding: Finding) -> dict | None:
"""Serialize one finding as a CycloneDX vulnerability entry."""
vuln_id = finding.vuln_id_from_tool
if not vuln_id:
ids = list(finding.vulnerability_id_set.values_list("vulnerability_id", flat=True))
if not ids:
return None
vuln_id = ids[0]

purl = finding.component_purl
if not purl:
name = (finding.component_name or "").lower()
version = finding.component_version or ""
purl = f"pkg:generic/{name}@{version}" if name else None
if not purl:
return None

state, responses = _finding_to_vex_state(finding)
analysis: dict = {"state": state}
if responses:
analysis["response"] = responses

latest_note = finding.notes.order_by("-date").first()
if latest_note:
analysis["detail"] = latest_note.entry

entry: dict = {
"id": vuln_id,
"affects": [{"ref": purl}],
"analysis": analysis,
}
return entry


def _build_vex_document(findings, product_name: str) -> dict:
entries = []
for f in findings:
entry = _finding_to_vex_entry(f)
if entry is not None:
entries.append(entry)

return {
"bomFormat": "CycloneDX",
"specVersion": CYCLONEDX_SPEC_VERSION,
"serialNumber": f"urn:uuid:{uuid.uuid4()}",
"version": 1,
"metadata": {
"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"component": {"name": product_name, "type": "application"},
"tools": [{"name": DOJO_TOOL_NAME}],
},
"vulnerabilities": entries,
}


class VexCycloneDxProductView(APIView):
permission_classes = [IsAuthenticated]

@extend_schema(
parameters=[OpenApiParameter("pk", int, OpenApiParameter.PATH, description="Product ID")],
responses={(200, "application/json"): {}},
summary="Export CycloneDX VEX for a product",
description=(
"Returns a CycloneDX VEX document containing triage decisions for all "
"non-duplicate findings in the product. Excludes findings with no vuln ID or PURL."
),
)
def get(self, request: Request, pk: int) -> Response:
product = Product.objects.get(pk=pk)
user_is_authorized(request.user, Permissions.Product_View, product)

findings = (
Finding.objects.filter(
test__engagement__product=product,
duplicate=False,
)
.prefetch_related("notes", "vulnerability_id_set")
.order_by("id")
)
vex = _build_vex_document(findings, product.name)
return Response(vex, content_type="application/json")


class VexCycloneDxEngagementView(APIView):
permission_classes = [IsAuthenticated]

@extend_schema(
parameters=[OpenApiParameter("pk", int, OpenApiParameter.PATH, description="Engagement ID")],
responses={(200, "application/json"): {}},
summary="Export CycloneDX VEX for an engagement",
description=(
"Returns a CycloneDX VEX document scoped to a single engagement."
),
)
def get(self, request: Request, pk: int) -> Response:
engagement = Engagement.objects.get(pk=pk)
user_is_authorized(request.user, Permissions.Engagement_View, engagement)

findings = (
Finding.objects.filter(
test__engagement=engagement,
duplicate=False,
)
.prefetch_related("notes", "vulnerability_id_set")
.order_by("id")
)
vex = _build_vex_document(findings, engagement.name)
return Response(vex, content_type="application/json")