Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions dojo/tools/aws_inspector2/parser.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import json
from datetime import UTC, datetime

Expand Down Expand Up @@ -114,6 +115,7 @@ def get_cvss_details(self, finding: Finding, raw_finding: dict) -> Finding:

def get_package_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding:
vulnerability_details = raw_finding.get("packageVulnerabilityDetails", {})
vulnerable_packages = vulnerability_details.get("vulnerablePackages", [])
vulnerability_packages_descriptions = "\n".join(
[
(
Expand All @@ -123,14 +125,32 @@ def get_package_vulnerability(self, finding: Finding, raw_finding: dict) -> Find
f"\tfixed version: {vulnerability_package.get('fixedInVersion', 'N/A')}\n"
f"\tremediation: {vulnerability_package.get('remediation', 'N/A')}\n"
)
for vulnerability_package in vulnerability_details.get("vulnerablePackages", [])
for vulnerability_package in vulnerable_packages
],
)
if (vulnerability_id := vulnerability_details.get("vulnerabilityId", None)) is not None:
finding.unsaved_vulnerability_ids = [vulnerability_id]
vulnerability_source = vulnerability_details.get("source")
vulnerability_source_url = vulnerability_details.get("sourceUrl")
# populate fields
# component name/version/file_path from the first vulnerable package
if vulnerable_packages:
finding.component_name = vulnerable_packages[0].get("name")
finding.component_version = vulnerable_packages[0].get("version")
finding.file_path = vulnerable_packages[0].get("filePath")
# reference URLs from the advisory
reference_urls = vulnerability_details.get("referenceUrls", [])
if reference_urls:
finding.references = "\n".join(reference_urls)
# publish date from when the vendor first created the advisory
if vendor_created_at := vulnerability_details.get("vendorCreatedAt"):
with contextlib.suppress(ValueError):
finding.publish_date = date_parser.parse(vendor_created_at).date()
# CVSS v3 base score from the vendor-supplied CVSS entries
for cvss_entry in vulnerability_details.get("cvss", []):
if str(cvss_entry.get("version", "")).startswith("3") and cvss_entry.get("baseScore") is not None:
finding.cvssv3_score = float(cvss_entry["baseScore"])
break
# populate description fields
if vulnerability_source is not None and vulnerability_source_url is not None:
finding.url = vulnerability_source_url
finding.description += (
Expand All @@ -149,8 +169,8 @@ def get_code_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding
file_path_info = raw_finding.get("filePath", {})
file_name = file_path_info.get("fileName", "N/A")
file_path = file_path_info.get("filePath", "N/A")
start_line = file_path_info.get("startLine", "N/A")
end_line = file_path_info.get("endLine", "N/A")
start_line = file_path_info.get("startLine", None)
end_line = file_path_info.get("endLine", None)
detector_tags = ", ".join(raw_finding.get("detectorTags", []))
reference_urls = ", ".join(raw_finding.get("referenceUrls", []))
rule_id = raw_finding.get("ruleId", "N/A")
Expand All @@ -162,6 +182,10 @@ def get_code_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding
finding.sast_source_file_path = f"{file_path}{file_name}"
finding.line = start_line
finding.sast_source_line = start_line
if start_line is None:
start_line = "N/A"
if end_line is None:
end_line = "N/A"
finding.description += (
"\n**Additional info**\n"
f"CWEs: {string_cwes}\n"
Expand Down
Loading
Loading