Skip to content

Commit 0511425

Browse files
committed
wip
1 parent af6e58f commit 0511425

File tree

1 file changed

+67
-71
lines changed

1 file changed

+67
-71
lines changed

backend/application/vex/services/cyclonedx_parser.py

Lines changed: 67 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -36,41 +36,35 @@ def parse_cyclonedx_data(data: dict) -> None:
3636

3737

3838
def _create_cyclonedx_document(data: dict) -> VEX_Document:
39-
# Use serial number as document ID, fallback to a generated one
4039
document_id = data.get("serialNumber")
4140
if not document_id:
42-
bom_format = data.get("bomFormat", "CycloneDX")
43-
spec_version = data.get("specVersion", "1.0")
44-
document_id = f"{bom_format}-{spec_version}-{hash(str(data))}"
41+
raise ValidationError("serialNumber is missing")
4542

46-
version = str(data.get("version", 1))
43+
version_value = data.get("version")
44+
if version_value is None:
45+
raise ValidationError("version is missing")
46+
version = str(version_value)
4747

48-
# Extract metadata for author and timestamps
4948
metadata = data.get("metadata", {})
5049

51-
# Get timestamp from metadata or use current time
5250
timestamp = metadata.get("timestamp")
5351
if not timestamp:
54-
# If no timestamp, use a default ISO format string
55-
from datetime import datetime
56-
timestamp = datetime.now().isoformat() + "Z"
57-
58-
# Extract author from tools or component
59-
author = "Unknown"
60-
tools = metadata.get("tools", [])
61-
if tools:
62-
if isinstance(tools, list) and tools:
63-
author = tools[0].get("name", "Unknown")
64-
elif isinstance(tools, dict):
65-
components = tools.get("components", [])
66-
if components:
67-
author = components[0].get("name", "Unknown")
68-
69-
# If no author from tools, try to get from component
70-
if author == "Unknown":
71-
component = metadata.get("component", {})
72-
if component:
73-
author = component.get("name", "Unknown")
52+
raise ValidationError("metadata/timestamp is missing")
53+
54+
author = None
55+
# Prefer authors list if available
56+
authors = metadata.get("authors")
57+
if authors and isinstance(authors, list) and len(authors) > 0:
58+
# Find the first author with a name set
59+
author = next((item.get("name") for item in authors
60+
if isinstance(item, dict) and item.get("name")), None)
61+
62+
# Fall back to manufacturer or supplier if no authors
63+
if not author:
64+
author = metadata.get("manufacturer", {}).get("name") or metadata.get("supplier", {}).get("name")
65+
66+
if not author:
67+
raise ValidationError("author is missing")
7468

7569
try:
7670
cyclonedx_document = VEX_Document.objects.get(document_id=document_id, author=author)
@@ -98,9 +92,13 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
9892
if not isinstance(vulnerabilities, list):
9993
raise ValidationError("vulnerabilities is not a list")
10094

101-
# Build components mapping
10295
components_map = _build_components_map(data)
10396

97+
product_purl = data.get("metadata", {}).get("component", {}).get("purl", "")
98+
if not product_purl:
99+
raise ValidationError("metadata/component/purl is missing")
100+
validate_purl(product_purl)
101+
104102
product_purls: set[str] = set()
105103
vex_statements: set[VEX_Statement] = set()
106104

@@ -115,13 +113,12 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
115113

116114
analysis = vulnerability.get("analysis", {})
117115
if not analysis:
118-
# Skip vulnerabilities without analysis (not VEX statements)
116+
# Skip vulnerabilities without analysis
119117
vulnerability_counter += 1
120118
continue
121119

122120
cyclonedx_analysis = _parse_analysis(analysis, vulnerability_counter)
123121

124-
# Map CycloneDX state to VEX status
125122
vex_status = _map_cyclonedx_state_to_vex_status(cyclonedx_analysis.state)
126123
if not vex_status:
127124
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is not valid: {cyclonedx_analysis.state}")
@@ -131,26 +128,26 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
131128
if detail:
132129
description += f"\n\n{detail}"
133130

134-
# Build justification from CycloneDX justification
135-
justification = _map_cyclonedx_justification_to_vex_justification(cyclonedx_analysis.justification)
136-
137-
# Build remediation from response and recommendation
138131
remediation = _build_remediation_text(cyclonedx_analysis.response, vulnerability.get("recommendation", ""))
139132

140-
# Process affected components
141133
affects = vulnerability.get("affects", [])
142134
if not affects:
143-
# If no affects, this is a general statement - we'll need a product PURL
144-
# Try to extract from metadata component
145-
component = data.get("metadata", {}).get("component", {})
146-
product_purl = component.get("purl", "")
147-
if product_purl:
148-
validate_purl(product_purl)
149-
_create_vex_statement(
150-
cyclonedx_document, vulnerability_id, description, vex_status,
151-
justification, cyclonedx_analysis.detail, remediation,
152-
product_purl, "", product_purls, vex_statements
153-
)
135+
# General statement for the product
136+
_create_vex_statement(
137+
cyclonedx_document,
138+
vulnerability_id,
139+
description,
140+
vex_status,
141+
cyclonedx_analysis.justification,
142+
cyclonedx_analysis.detail,
143+
remediation,
144+
product_purl,
145+
"",
146+
product_purls,
147+
vex_statements,
148+
)
149+
elif not isinstance(affects, list):
150+
raise ValidationError(f"affects[{vulnerability_counter}] is not a list")
154151
else:
155152
_process_affected_components(
156153
cyclonedx_document=cyclonedx_document,
@@ -160,11 +157,12 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
160157
vulnerability_id=vulnerability_id,
161158
description=description,
162159
vex_status=vex_status,
163-
justification=justification,
160+
justification=cyclonedx_analysis.justification,
164161
impact=cyclonedx_analysis.detail,
165162
remediation=remediation,
166163
affects=affects,
167164
components_map=components_map,
165+
product_purl=product_purl,
168166
)
169167

170168
vulnerability_counter += 1
@@ -173,7 +171,6 @@ def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tup
173171

174172

175173
def _build_components_map(data: dict) -> dict[str, dict]:
176-
"""Build a mapping of bom-ref to component data for quick lookup."""
177174
components_map = {}
178175

179176
# Add root component from metadata
@@ -214,7 +211,6 @@ def _parse_analysis(analysis: dict, vulnerability_counter: int) -> CycloneDX_Ana
214211

215212

216213
def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
217-
"""Map CycloneDX analysis state to VEX status."""
218214
mapping = {
219215
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED: VEX_Status.VEX_STATUS_FIXED,
220216
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED_WITH_PEDIGREE: VEX_Status.VEX_STATUS_FIXED,
@@ -226,15 +222,7 @@ def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
226222
return mapping.get(state)
227223

228224

229-
def _map_cyclonedx_justification_to_vex_justification(justification: str) -> str:
230-
"""Map CycloneDX justification to VEX justification if possible."""
231-
# CycloneDX doesn't have standardized justification values like OpenVEX
232-
# We'll pass through the justification as is, or return empty string
233-
return justification if justification else ""
234-
235-
236225
def _build_remediation_text(response: list[str], recommendation: str) -> str:
237-
"""Build remediation text from response actions and recommendation."""
238226
remediation_parts = []
239227

240228
if response:
@@ -261,6 +249,7 @@ def _process_affected_components(
261249
remediation: str,
262250
affects: list,
263251
components_map: dict,
252+
product_purl: str,
264253
) -> None:
265254
affected_counter = 0
266255
for affected in affects:
@@ -271,24 +260,32 @@ def _process_affected_components(
271260
if not ref:
272261
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}]/ref is missing")
273262

274-
# Look up component by bom-ref
275263
component = components_map.get(ref)
276264
if not component:
277-
# Skip if we can't find the component
278-
affected_counter += 1
279-
continue
265+
raise ValidationError(
266+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' not found in components"
267+
)
280268

281-
# Extract PURL from component
282269
component_purl = component.get("purl", "")
283-
if component_purl:
284-
validate_purl(component_purl)
285-
286-
# For affected components, we'll use the component PURL as both product and component
287-
_create_vex_statement(
288-
cyclonedx_document, vulnerability_id, description, vex_status,
289-
justification, impact, remediation,
290-
component_purl, component_purl, product_purls, vex_statements
270+
if not component_purl:
271+
raise ValidationError(
272+
f"affects[{vulnerability_counter}][{affected_counter}]/ref '{ref}' component is missing purl"
291273
)
274+
validate_purl(component_purl)
275+
276+
_create_vex_statement(
277+
cyclonedx_document,
278+
vulnerability_id,
279+
description,
280+
vex_status,
281+
justification,
282+
impact,
283+
remediation,
284+
product_purl,
285+
component_purl,
286+
product_purls,
287+
vex_statements,
288+
)
292289

293290
affected_counter += 1
294291

@@ -306,7 +303,6 @@ def _create_vex_statement(
306303
product_purls: set,
307304
vex_statements: set,
308305
) -> None:
309-
"""Create and save a VEX statement."""
310306
vex_statement = VEX_Statement(
311307
document=cyclonedx_document,
312308
vulnerability_id=vulnerability_id,

0 commit comments

Comments
 (0)