Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/content/en/open_source/upgrading/2.55.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
title: 'Upgrading to DefectDojo Version 2.55.x'
toc_hide: true
weight: -20260105
description: No special instructions.
description: Authorization related optimizations
---
There are no special instructions for upgrading to 2.55.x. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.55.0) for the contents of the release.

## Authorization related optimizations

The queries related to authorizations have been optmized. For example retrieving the list of authorized findings for the logged in user.
Some of these are now also cached during that duration of a request. This should have no functional effects and only results in better performance.

Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.55.0) for the contents of the release.
4 changes: 2 additions & 2 deletions dojo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
from .celery import app as celery_app # noqa: F401

__version__ = "2.55.0-dev"
__url__ = "https://github.com/DefectDojo/django-DefectDojo"
__docs__ = "https://documentation.defectdojo.com"
__url__ = "https://github.com/DefectDojo/django-DefectDojo" # noqa: RUF067

Check failure on line 8 in dojo/__init__.py

View workflow job for this annotation

GitHub Actions / ruff-linting

Ruff (RUF102)

dojo/__init__.py:8:62: RUF102 Invalid rule code in `# noqa`: RUF067

Check failure on line 8 in dojo/__init__.py

View workflow job for this annotation

GitHub Actions / ruff-linting

Ruff (RUF100)

dojo/__init__.py:8:62: RUF100 Unused `noqa` directive (unknown: `RUF067`)
__docs__ = "https://documentation.defectdojo.com" # noqa: RUF067

Check failure on line 9 in dojo/__init__.py

View workflow job for this annotation

GitHub Actions / ruff-linting

Ruff (RUF100)

dojo/__init__.py:9:52: RUF100 Unused `noqa` directive (unknown: `RUF067`)

Check failure on line 9 in dojo/__init__.py

View workflow job for this annotation

GitHub Actions / ruff-linting

Ruff (RUF102)

dojo/__init__.py:9:52: RUF102 Invalid rule code in `# noqa`: RUF067
90 changes: 68 additions & 22 deletions dojo/cred/queries.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from crum import get_current_user
from django.db.models import Exists, OuterRef, Q
from django.db.models import Q, Subquery

from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission
from dojo.models import Cred_Mapping, Product_Group, Product_Member, Product_Type_Group, Product_Type_Member
from dojo.request_cache import cache_for_request


def get_authorized_cred_mappings(permission, queryset=None):
# Cached: all parameters are hashable, no dynamic queryset filtering
@cache_for_request
def get_authorized_cred_mappings(permission):
"""Cached - returns all cred mappings the user is authorized to see."""
user = get_current_user()

if user is None:
return Cred_Mapping.objects.none()

cred_mappings = Cred_Mapping.objects.all().order_by("id") if queryset is None else queryset
cred_mappings = Cred_Mapping.objects.all().order_by("id")

if user.is_superuser:
return cred_mappings
Expand All @@ -20,27 +24,69 @@ def get_authorized_cred_mappings(permission, queryset=None):
return cred_mappings

roles = get_roles_for_permission(permission)

# Get authorized product/product_type IDs via subqueries
authorized_product_type_roles = Product_Type_Member.objects.filter(
product_type=OuterRef("product__prod_type_id"),
user=user,
role__in=roles)
user=user, role__in=roles,
).values("product_type_id")

authorized_product_roles = Product_Member.objects.filter(
product=OuterRef("product_id"),
user=user,
role__in=roles)
user=user, role__in=roles,
).values("product_id")

authorized_product_type_groups = Product_Type_Group.objects.filter(
product_type=OuterRef("product__prod_type_id"),
group__users=user,
role__in=roles)
group__users=user, role__in=roles,
).values("product_type_id")

authorized_product_groups = Product_Group.objects.filter(
product=OuterRef("product_id"),
group__users=user,
role__in=roles)
cred_mappings = cred_mappings.annotate(
product__prod_type__member=Exists(authorized_product_type_roles),
product__member=Exists(authorized_product_roles),
product__prod_type__authorized_group=Exists(authorized_product_type_groups),
product__authorized_group=Exists(authorized_product_groups))
group__users=user, role__in=roles,
).values("product_id")

# Filter using IN with Subquery - no annotations needed
return cred_mappings.filter(
Q(product__prod_type__member=True) | Q(product__member=True)
| Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True))
Q(product__prod_type_id__in=Subquery(authorized_product_type_roles))
| Q(product_id__in=Subquery(authorized_product_roles))
| Q(product__prod_type_id__in=Subquery(authorized_product_type_groups))
| Q(product_id__in=Subquery(authorized_product_groups)),
)


def get_authorized_cred_mappings_for_queryset(permission, queryset):
"""Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter."""
user = get_current_user()

if user is None:
return Cred_Mapping.objects.none()

if user.is_superuser:
return queryset

if user_has_global_permission(user, permission):
return queryset

roles = get_roles_for_permission(permission)

# Get authorized product/product_type IDs via subqueries
authorized_product_type_roles = Product_Type_Member.objects.filter(
user=user, role__in=roles,
).values("product_type_id")

authorized_product_roles = Product_Member.objects.filter(
user=user, role__in=roles,
).values("product_id")

authorized_product_type_groups = Product_Type_Group.objects.filter(
group__users=user, role__in=roles,
).values("product_type_id")

authorized_product_groups = Product_Group.objects.filter(
group__users=user, role__in=roles,
).values("product_id")

# Filter using IN with Subquery - no annotations needed
return queryset.filter(
Q(product__prod_type_id__in=Subquery(authorized_product_type_roles))
| Q(product_id__in=Subquery(authorized_product_roles))
| Q(product__prod_type_id__in=Subquery(authorized_product_type_groups))
| Q(product_id__in=Subquery(authorized_product_groups)),
)
4 changes: 2 additions & 2 deletions dojo/cred/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dojo.authorization.authorization_decorators import user_is_authorized, user_is_configuration_authorized
from dojo.authorization.roles_permissions import Permissions
from dojo.cred.queries import get_authorized_cred_mappings
from dojo.cred.queries import get_authorized_cred_mappings_for_queryset
from dojo.forms import CredMappingForm, CredMappingFormProd, CredUserForm, NoteForm
from dojo.models import Cred_Mapping, Cred_User, Engagement, Finding, Product, Test
from dojo.utils import Product_Tab, add_breadcrumb, dojo_crypto_encrypt, prepare_for_view
Expand Down Expand Up @@ -85,7 +85,7 @@ def view_cred_details(request, ttid):
notes = cred.notes.all()
cred_products = Cred_Mapping.objects.select_related("product").filter(
product_id__isnull=False, cred_id=ttid).order_by("product__name")
cred_products = get_authorized_cred_mappings(Permissions.Product_View, cred_products)
cred_products = get_authorized_cred_mappings_for_queryset(Permissions.Product_View, cred_products)

if request.method == "POST":
form = NoteForm(request.POST)
Expand Down
179 changes: 135 additions & 44 deletions dojo/endpoint/queries.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from crum import get_current_user
from django.db.models import Exists, OuterRef, Q
from django.db.models import Q, Subquery

from dojo.authorization.authorization import get_roles_for_permission, user_has_global_permission
from dojo.models import (
Expand All @@ -10,17 +10,20 @@
Product_Type_Group,
Product_Type_Member,
)
from dojo.request_cache import cache_for_request


def get_authorized_endpoints(permission, queryset=None, user=None):

# Cached: all parameters are hashable, no dynamic queryset filtering
@cache_for_request
def get_authorized_endpoints(permission, user=None):
"""Cached - returns all endpoints the user is authorized to see."""
if user is None:
user = get_current_user()

if user is None:
return Endpoint.objects.none()

endpoints = Endpoint.objects.all().order_by("id") if queryset is None else queryset
endpoints = Endpoint.objects.all().order_by("id")

if user.is_superuser:
return endpoints
Expand All @@ -29,41 +32,86 @@ def get_authorized_endpoints(permission, queryset=None, user=None):
return endpoints

roles = get_roles_for_permission(permission)

# Get authorized product/product_type IDs via subqueries
authorized_product_type_roles = Product_Type_Member.objects.filter(
product_type=OuterRef("product__prod_type_id"),
user=user,
role__in=roles)
user=user, role__in=roles,
).values("product_type_id")

authorized_product_roles = Product_Member.objects.filter(
product=OuterRef("product_id"),
user=user,
role__in=roles)
user=user, role__in=roles,
).values("product_id")

authorized_product_type_groups = Product_Type_Group.objects.filter(
product_type=OuterRef("product__prod_type_id"),
group__users=user,
role__in=roles)
group__users=user, role__in=roles,
).values("product_type_id")

authorized_product_groups = Product_Group.objects.filter(
product=OuterRef("product_id"),
group__users=user,
role__in=roles)
endpoints = endpoints.annotate(
product__prod_type__member=Exists(authorized_product_type_roles),
product__member=Exists(authorized_product_roles),
product__prod_type__authorized_group=Exists(authorized_product_type_groups),
product__authorized_group=Exists(authorized_product_groups))
group__users=user, role__in=roles,
).values("product_id")

# Filter using IN with Subquery - no annotations needed
return endpoints.filter(
Q(product__prod_type__member=True) | Q(product__member=True)
| Q(product__prod_type__authorized_group=True) | Q(product__authorized_group=True))
Q(product__prod_type_id__in=Subquery(authorized_product_type_roles))
| Q(product_id__in=Subquery(authorized_product_roles))
| Q(product__prod_type_id__in=Subquery(authorized_product_type_groups))
| Q(product_id__in=Subquery(authorized_product_groups)),
)


def get_authorized_endpoint_status(permission, queryset=None, user=None):
def get_authorized_endpoints_for_queryset(permission, queryset, user=None):
"""Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter."""
if user is None:
user = get_current_user()

if user is None:
return Endpoint.objects.none()

if user.is_superuser:
return queryset

if user_has_global_permission(user, permission):
return queryset

roles = get_roles_for_permission(permission)

# Get authorized product/product_type IDs via subqueries
authorized_product_type_roles = Product_Type_Member.objects.filter(
user=user, role__in=roles,
).values("product_type_id")

authorized_product_roles = Product_Member.objects.filter(
user=user, role__in=roles,
).values("product_id")

authorized_product_type_groups = Product_Type_Group.objects.filter(
group__users=user, role__in=roles,
).values("product_type_id")

authorized_product_groups = Product_Group.objects.filter(
group__users=user, role__in=roles,
).values("product_id")

# Filter using IN with Subquery - no annotations needed
return queryset.filter(
Q(product__prod_type_id__in=Subquery(authorized_product_type_roles))
| Q(product_id__in=Subquery(authorized_product_roles))
| Q(product__prod_type_id__in=Subquery(authorized_product_type_groups))
| Q(product_id__in=Subquery(authorized_product_groups)),
)


# Cached: all parameters are hashable, no dynamic queryset filtering
@cache_for_request
def get_authorized_endpoint_status(permission, user=None):
"""Cached - returns all endpoint statuses the user is authorized to see."""
if user is None:
user = get_current_user()

if user is None:
return Endpoint_Status.objects.none()

endpoint_status = Endpoint_Status.objects.all().order_by("id") if queryset is None else queryset
endpoint_status = Endpoint_Status.objects.all().order_by("id")

if user.is_superuser:
return endpoint_status
Expand All @@ -72,27 +120,70 @@ def get_authorized_endpoint_status(permission, queryset=None, user=None):
return endpoint_status

roles = get_roles_for_permission(permission)

# Get authorized product/product_type IDs via subqueries
authorized_product_type_roles = Product_Type_Member.objects.filter(
product_type=OuterRef("endpoint__product__prod_type_id"),
user=user,
role__in=roles)
user=user, role__in=roles,
).values("product_type_id")

authorized_product_roles = Product_Member.objects.filter(
product=OuterRef("endpoint__product_id"),
user=user,
role__in=roles)
user=user, role__in=roles,
).values("product_id")

authorized_product_type_groups = Product_Type_Group.objects.filter(
product_type=OuterRef("endpoint__product__prod_type_id"),
group__users=user,
role__in=roles)
group__users=user, role__in=roles,
).values("product_type_id")

authorized_product_groups = Product_Group.objects.filter(
product=OuterRef("endpoint__product_id"),
group__users=user,
role__in=roles)
endpoint_status = endpoint_status.annotate(
endpoint__product__prod_type__member=Exists(authorized_product_type_roles),
endpoint__product__member=Exists(authorized_product_roles),
endpoint__product__prod_type__authorized_group=Exists(authorized_product_type_groups),
endpoint__product__authorized_group=Exists(authorized_product_groups))
group__users=user, role__in=roles,
).values("product_id")

# Filter using IN with Subquery - no annotations needed
return endpoint_status.filter(
Q(endpoint__product__prod_type__member=True) | Q(endpoint__product__member=True)
| Q(endpoint__product__prod_type__authorized_group=True) | Q(endpoint__product__authorized_group=True))
Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_roles))
| Q(endpoint__product_id__in=Subquery(authorized_product_roles))
| Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_groups))
| Q(endpoint__product_id__in=Subquery(authorized_product_groups)),
)


def get_authorized_endpoint_status_for_queryset(permission, queryset, user=None):
"""Filters a provided queryset for authorization. Not cached due to dynamic queryset parameter."""
if user is None:
user = get_current_user()

if user is None:
return Endpoint_Status.objects.none()

if user.is_superuser:
return queryset

if user_has_global_permission(user, permission):
return queryset

roles = get_roles_for_permission(permission)

# Get authorized product/product_type IDs via subqueries
authorized_product_type_roles = Product_Type_Member.objects.filter(
user=user, role__in=roles,
).values("product_type_id")

authorized_product_roles = Product_Member.objects.filter(
user=user, role__in=roles,
).values("product_id")

authorized_product_type_groups = Product_Type_Group.objects.filter(
group__users=user, role__in=roles,
).values("product_type_id")

authorized_product_groups = Product_Group.objects.filter(
group__users=user, role__in=roles,
).values("product_id")

# Filter using IN with Subquery - no annotations needed
return queryset.filter(
Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_roles))
| Q(endpoint__product_id__in=Subquery(authorized_product_roles))
| Q(endpoint__product__prod_type_id__in=Subquery(authorized_product_type_groups))
| Q(endpoint__product_id__in=Subquery(authorized_product_groups)),
)
Loading
Loading