Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions component_catalog/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3064,37 +3064,6 @@ def test_vulnerablecode_get_vulnerable_purls(self):
vulnerable_purls = vulnerablecode.get_vulnerable_purls(packages=[self.package1])
self.assertEqual(["pkg:pypi/django@2.1"], vulnerable_purls)

def test_vulnerablecode_get_vulnerable_cpes(self):
vulnerablecode = VulnerableCode(self.dataspace)
vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=[])
self.assertEqual([], vulnerable_cpes)

components = [self.component1, self.component2]
vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=components)
self.assertEqual([], vulnerable_cpes)

self.component1.cpe = "cpe:2.3:a:djangoproject:django:0.95:*:*:*:*:*:*:*"
self.component1.save()

with mock.patch(
"dejacode_toolkit.vulnerablecode.VulnerableCode.bulk_search_by_cpes"
) as bulk_search:
bulk_search.return_value = [
{
"vulnerability_id": "VCID-188m-1bke-aaae",
"summary": "The administrative interface in django.contrib.admin ",
"references": [
{"reference_id": ""},
],
}
]
vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=components)
self.assertEqual([], vulnerable_cpes)

bulk_search.return_value[0]["references"] = [{"reference_id": self.component1.cpe}]
vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=components)
self.assertEqual([self.component1.cpe], vulnerable_cpes)

@mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.request_get")
def test_vulnerablecode_get_vulnerabilities_cache(self, mock_request_get):
vulnerablecode = VulnerableCode(self.dataspace)
Expand Down
6 changes: 5 additions & 1 deletion dejacode_toolkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_settings(var_name, default=None):
def is_service_available(label, session, url, raise_exceptions):
"""Check if a configured integration service is available."""
try:
response = session.head(url, timeout=REQUESTS_TIMEOUT)
response = session.head(url, allow_redirects=True, timeout=REQUESTS_TIMEOUT)
response.raise_for_status()
except requests.exceptions.RequestException as request_exception:
logger.debug(f"{label} is_available() error: {request_exception}")
Expand All @@ -43,6 +43,7 @@ class BaseService:
settings_prefix = None
url_field_name = None
api_key_field_name = None
api_version = None
default_timeout = REQUESTS_TIMEOUT

def __init__(self, dataspace):
Expand Down Expand Up @@ -71,6 +72,9 @@ def __init__(self, dataspace):

self.api_url = f"{self.service_url.rstrip('/')}/api/"

if self.api_version:
self.api_url = f"{self.api_url}{self.api_version.rstrip('/')}/"

def get_session(self):
session = requests.Session()

Expand Down
69 changes: 9 additions & 60 deletions dejacode_toolkit/vulnerablecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class VulnerableCode(BaseService):
settings_prefix = "VULNERABLECODE"
url_field_name = "vulnerablecode_url"
api_key_field_name = "vulnerablecode_api_key"
api_version = "v3"

def get_vulnerabilities(
self,
Expand Down Expand Up @@ -47,59 +48,30 @@ def get_vulnerabilities_by_purl(
):
"""Get list of vulnerabilities providing a package `purl`."""
return self.get_vulnerabilities(
url=f"{self.api_url}packages/",
url=f"{self.api_url}packages",
field_name="purl",
field_value=get_plain_purl(purl),
timeout=timeout,
)

def get_vulnerabilities_by_cpe(
self,
cpe,
timeout=None,
):
"""Get list of vulnerabilities providing a package or component `cpe`."""
return self.get_vulnerabilities(
url=f"{self.api_url}cpes/",
field_name="cpe",
field_value=cpe,
timeout=timeout,
)

def bulk_search_by_purl(
self,
purls,
purl_only,
details=True,
timeout=None,
):
"""Bulk search of vulnerabilities using the provided list of `purls`."""
url = f"{self.api_url}packages/bulk_search"
url = f"{self.api_url}packages"

data = {
"purls": purls,
"purl_only": purl_only,
"plain_purl": True,
"details": details,
}

logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}")
return self.request_post(url=url, json=data, timeout=timeout)

def bulk_search_by_cpes(
self,
cpes,
timeout=None,
):
"""Bulk search of vulnerabilities using the provided list of `cpes`."""
url = f"{self.api_url}cpes/bulk_search"

data = {
"cpes": cpes,
}

logger.debug(f"VulnerableCode: url={url} cpes_count={len(cpes)}")
return self.request_post(url, json=data, timeout=timeout)

def get_vulnerable_purls(self, packages, purl_only=True, timeout=10):
def get_vulnerable_purls(self, packages, details=False, timeout=10):
"""
Return a list of PURLs for which at least one `affected_by_vulnerabilities`
was found in the VulnerableCodeDB for the given list of `packages`.
Expand All @@ -110,34 +82,11 @@ def get_vulnerable_purls(self, packages, purl_only=True, timeout=10):
return []

vulnerable_purls = self.bulk_search_by_purl(
plain_purls,
purl_only=purl_only,
purls=plain_purls,
details=details,
timeout=timeout,
)
return vulnerable_purls or []

def get_vulnerable_cpes(self, components):
"""
Return a list of vulnerable CPEs found in the VulnerableCodeDB for the given
list of `components`.
"""
cpes = [component.cpe for component in components if component.cpe]

if not cpes:
return []

search_results = self.bulk_search_by_cpes(cpes)
if not search_results:
return []

vulnerable_cpes = [
reference.get("reference_id")
for entry in search_results
for reference in entry.get("references")
if reference.get("reference_id").startswith("cpe")
]

return list(set(vulnerable_cpes))
return vulnerable_purls.get("results") or []

def get_package_url_available_types(self):
# Replace by fetching the endpoint once available.
Expand Down
18 changes: 12 additions & 6 deletions vulnerabilities/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from django.urls import reverse
from django.utils import timezone

from packageurl import PackageURL

from component_catalog.models import PACKAGE_URL_FIELDS
from component_catalog.models import Package
from dejacode_toolkit.vulnerablecode import VulnerableCode
Expand Down Expand Up @@ -84,20 +86,21 @@ def fetch_for_packages(
log_func(f"Progress: {intcomma(progress_count)}/{intcomma(object_count)}")

batch_affected_packages = []
vc_entries = vulnerablecode.get_vulnerable_purls(batch, purl_only=False, timeout=timeout)
vc_entries = vulnerablecode.get_vulnerable_purls(batch, details=True, timeout=timeout)
for vc_entry in vc_entries:
affected_by_vulnerabilities = vc_entry.get("affected_by_vulnerabilities")
if not affected_by_vulnerabilities:
continue

purl = PackageURL.from_string(vc_entry.get("purl"))
affected_packages = queryset.filter(
type=vc_entry.get("type"),
namespace=vc_entry.get("namespace") or "",
name=vc_entry.get("name"),
version=vc_entry.get("version") or "",
type=purl.type,
namespace=purl.namespace,
name=purl.name,
version=purl.version,
)
if not affected_packages:
raise CommandError("Could not find package!")
raise CommandError("Could not find packages!")

# Store all packages of that batch to then trigger the update_weighted_risk_score
batch_affected_packages.extend(affected_packages)
Expand All @@ -119,6 +122,9 @@ def fetch_for_packages(
def create_or_update_vulnerability(
vulnerability_data, dataspace, affected_packages, update, results
):
# Adapt to the previous `vulnerability_id` based system.
vulnerability_data["vulnerability_id"] = vulnerability_data["advisory_id"]

vulnerability_id = vulnerability_data["vulnerability_id"]
vulnerability_qs = Vulnerability.objects.scope(dataspace)
vulnerability = vulnerability_qs.get_or_none(vulnerability_id=vulnerability_id)
Expand Down
32 changes: 21 additions & 11 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,24 @@ class Vulnerability(HistoryDateFieldsMixin, DataspacedModel):
"For example, 'VCID-2024-0001'."
),
)
# avid = models.CharField(
# max_length=500,
# blank=False,
# null=False,
# help_text=_(
# "Unique ID for the datasource used for this advisory ."
# "e.g.: pysec_importer_v2/PYSEC-2020-2233"
# ),
# )
# advisory_id = models.CharField(
# max_length=500,
# blank=False,
# null=False,
# help_text=_(
# "An advisory is a unique vulnerability identifier in some database, "
# "such as PYSEC-2020-2233"
# ),
# )
resource_url = models.URLField(
_("Resource URL"),
max_length=1024,
Expand Down Expand Up @@ -191,6 +209,9 @@ class Meta:
unique_together = (("dataspace", "vulnerability_id"), ("dataspace", "uuid"))
indexes = [
models.Index(fields=["vulnerability_id"]),
# models.Index(fields=["exploitability"]),
# models.Index(fields=["weighted_severity"]),
# models.Index(fields=["risk_score"]),
models.Index(fields=["risk_level"]),
]

Expand Down Expand Up @@ -488,16 +509,7 @@ def get_entry_for_package(self, vulnerablecode):
affected_by_vulnerabilities = vulnerable_packages[0].get("affected_by_vulnerabilities")
return affected_by_vulnerabilities

def get_entry_for_component(self, vulnerablecode):
if not self.cpe:
return

# Support for Component is paused as the CPES endpoint do not work properly.
# https://github.com/aboutcode-org/vulnerablecode/issues/1557
# vulnerabilities = vulnerablecode.get_vulnerabilities_by_cpe(self.cpe, timeout=10)

def get_entry_from_vulnerablecode(self):
from component_catalog.models import Component
from component_catalog.models import Package
from dejacode_toolkit.vulnerablecode import VulnerableCode

Expand All @@ -513,8 +525,6 @@ def get_entry_from_vulnerablecode(self):
if not is_vulnerablecode_enabled:
return

if isinstance(self, Component):
return self.get_entry_for_component(vulnerablecode)
elif isinstance(self, Package):
return self.get_entry_for_package(vulnerablecode)

Expand Down
Loading