Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e9ec2cf
remove dojo_model_to/from_id decorator
valentijnscholten Dec 26, 2025
4bda3d3
remove dojo_model_from/to_id
valentijnscholten Dec 26, 2025
7a8579c
remove dojo_model_from/to_id
valentijnscholten Dec 26, 2025
a74981e
remove dojo_model_from/to_id
valentijnscholten Dec 26, 2025
b396e16
remove dojo_model_from/to_id
valentijnscholten Dec 26, 2025
e08cdff
fix tests
valentijnscholten Dec 26, 2025
597ba2f
remove leftover signature methods
valentijnscholten Dec 26, 2025
d735462
fix test counts
valentijnscholten Dec 26, 2025
de166e7
fix test counts
valentijnscholten Dec 26, 2025
9e74205
fix test counts
valentijnscholten Dec 26, 2025
8cf2810
Update dojo/settings/settings.dist.py
valentijnscholten Dec 29, 2025
8b90d52
remove dojo_model_from/to_id
valentijnscholten Dec 26, 2025
dd5a95f
initial base task
valentijnscholten Dec 26, 2025
2ed1533
replace dojo_async_task decorator with class+helper
valentijnscholten Dec 26, 2025
2869813
fix notifications
valentijnscholten Dec 27, 2025
917aa72
fix test
valentijnscholten Dec 27, 2025
a33fb20
fix test
valentijnscholten Jan 5, 2026
6c56dc6
Merge remote-tracking branch 'upstream/dev' into remove-dojo-async-ta…
valentijnscholten Jan 7, 2026
e43ef40
pghistory inherits from dojoasynctask
valentijnscholten Jan 7, 2026
548ce72
Merge remote-tracking branch 'upstream/dev' into remove-dojo-async-ta…
valentijnscholten Jan 7, 2026
6d73654
fix system settings celery_status
valentijnscholten Jan 7, 2026
057d538
Merge upstream/dev into remove-dojo-async-task-base-task
valentijnscholten Jan 12, 2026
d8a98f7
Merge remote-tracking branch 'upstream/dev' into remove-dojo-async-ta…
valentijnscholten Jan 17, 2026
ce84517
Enforce readonly name field for existing Test_Type instances in form
Maffooch Jan 14, 2026
764d7cc
Add TestTypeCreateSerializer and enforce readonly name field in TestT…
Maffooch Jan 14, 2026
8a40cd1
Add dynamic serializer selection in TestTypesViewSet for create action
Maffooch Jan 14, 2026
396bd08
Update test payload to set 'active' field instead of 'name'
Maffooch Jan 14, 2026
8eb4ee1
Update TestTypeTest payload to use 'name' and modify update_fields to…
Maffooch Jan 14, 2026
4d23535
fix async delete, add tests
valentijnscholten Jan 17, 2026
945e359
Add additional fields to AssetSerializer for business criticality, pl…
Maffooch Jan 17, 2026
70f381e
Correct some filters too
Maffooch Jan 17, 2026
4e60765
Update AssetSerializer fields to allow null values and set defaults
Maffooch Jan 12, 2026
cf5c84c
Refactor authorization functions to use type hints for better clarity…
Maffooch Jan 12, 2026
214186d
Enhance permission checks to support multiple primary key attributes …
Maffooch Jan 12, 2026
08d488b
Refactor check_post_permission to use list type for post_pk parameter
Maffooch Jan 12, 2026
34359c9
Refactor Organization serializers to handle default values for critic…
Maffooch Jan 12, 2026
7ca4e7c
Refactor API tests to include asset and organization endpoints, enhan…
Maffooch Jan 12, 2026
40808f0
Refactor permission classes to use asset and organization-specific pe…
Maffooch Jan 12, 2026
e4da98a
Add blank line before UserHasOrganizationGroupPermission class for im…
Maffooch Jan 12, 2026
fd84c2b
Merge branch 'dev' into remove-dojo-async-task-base-task
valentijnscholten Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions dojo/api_v2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from dojo.api_v2.prefetch.prefetcher import _Prefetcher
from dojo.authorization.roles_permissions import Permissions
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.cred.queries import get_authorized_cred_mappings
from dojo.endpoint.queries import (
get_authorized_endpoint_status,
Expand Down Expand Up @@ -678,13 +679,13 @@ def update_jira_epic(self, request, pk=None):
try:

if engagement.has_jira_issue:
jira_helper.update_epic(engagement.id, **request.data)
dojo_dispatch_task(jira_helper.update_epic, engagement.id, **request.data)
response = Response(
{"info": "Jira Epic update query sent"},
status=status.HTTP_200_OK,
)
else:
jira_helper.add_epic(engagement.id, **request.data)
dojo_dispatch_task(jira_helper.add_epic, engagement.id, **request.data)
response = Response(
{"info": "Jira Epic create query sent"},
status=status.HTTP_200_OK,
Expand Down
52 changes: 46 additions & 6 deletions dojo/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,56 @@
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dojo.settings.settings")


class PgHistoryTask(Task):
class DojoAsyncTask(Task):

"""
Base task class that provides dojo_async_task functionality without using a decorator.

This class:
- Injects user context into task kwargs
- Tracks task calls for performance testing
- Supports all Celery features (signatures, chords, groups, chains)
"""

def apply_async(self, args=None, kwargs=None, **options):
"""Override apply_async to inject user context and track tasks."""
from dojo.decorators import dojo_async_task_counter # noqa: PLC0415 circular import
from dojo.utils import get_current_user # noqa: PLC0415 circular import

if kwargs is None:
kwargs = {}

# Inject user context if not already present
if "async_user" not in kwargs:
kwargs["async_user"] = get_current_user()

# Control flag used for sync/async decision; never pass into the task itself
kwargs.pop("sync", None)

# Track dispatch
dojo_async_task_counter.incr(
self.name,
args=args,
kwargs=kwargs,
)

# Call parent to execute async
return super().apply_async(args=args, kwargs=kwargs, **options)


class PgHistoryTask(DojoAsyncTask):

"""
Custom Celery base task that automatically applies pghistory context.

When a task is dispatched via dojo_async_task, the current pghistory
context is captured and passed in kwargs as "_pgh_context". This base
class extracts that context and applies it before running the task,
ensuring all database events share the same context as the original
request.
This class inherits from DojoAsyncTask to provide:
- User context injection and task tracking (from DojoAsyncTask)
- Automatic pghistory context application (from this class)

When a task is dispatched via dojo_dispatch_task or dojo_async_task, the current
pghistory context is captured and passed in kwargs as "_pgh_context". This base
class extracts that context and applies it before running the task, ensuring all
database events share the same context as the original request.
"""

def __call__(self, *args, **kwargs):
Expand Down
90 changes: 90 additions & 0 deletions dojo/celery_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Protocol, cast

from celery.canvas import Signature

if TYPE_CHECKING:
from collections.abc import Mapping


class _SupportsSi(Protocol):
def si(self, *args: Any, **kwargs: Any) -> Signature: ...


class _SupportsApplyAsync(Protocol):
def apply_async(self, args: Any | None = None, kwargs: Any | None = None, **options: Any) -> Any: ...


def _inject_async_user(kwargs: Mapping[str, Any] | None) -> dict[str, Any]:
result: dict[str, Any] = dict(kwargs or {})
if "async_user" not in result:
from dojo.utils import get_current_user # noqa: PLC0415 circular import

result["async_user"] = get_current_user()
return result


def _inject_pghistory_context(kwargs: Mapping[str, Any] | None) -> dict[str, Any]:
"""Capture and inject pghistory context if available."""
result: dict[str, Any] = dict(kwargs or {})
if "_pgh_context" not in result:
from dojo.pghistory_utils import get_serializable_pghistory_context # noqa: PLC0415 circular import

if pgh_context := get_serializable_pghistory_context():
result["_pgh_context"] = pgh_context
return result


def dojo_create_signature(task_or_sig: _SupportsSi | Signature, *args: Any, **kwargs: Any) -> Signature:
"""
Build a Celery signature with DefectDojo user context and pghistory context injected.

- If passed a task, returns `task_or_sig.si(*args, **kwargs)`.
- If passed an existing signature, returns a cloned signature with merged kwargs.
"""
injected = _inject_async_user(kwargs)
injected = _inject_pghistory_context(injected)
injected.pop("countdown", None)

if isinstance(task_or_sig, Signature):
merged_kwargs = {**(task_or_sig.kwargs or {}), **injected}
return task_or_sig.clone(kwargs=merged_kwargs)

return task_or_sig.si(*args, **injected)


def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signature, *args: Any, **kwargs: Any) -> Any:
"""
Dispatch a task/signature using DefectDojo semantics.

- Inject `async_user` if missing.
- Capture and inject pghistory context if available.
- Respect `sync=True` (foreground execution) and user `block_execution`.
- Support `countdown=<seconds>` for async dispatch.

Returns:
- async: AsyncResult-like return from Celery
- sync: underlying return value of the task

"""
from dojo.decorators import dojo_async_task_counter, we_want_async # noqa: PLC0415 circular import

countdown = cast("int", kwargs.pop("countdown", 0))
injected = _inject_async_user(kwargs)
injected = _inject_pghistory_context(injected)

sig = dojo_create_signature(task_or_sig if isinstance(task_or_sig, Signature) else cast("_SupportsSi", task_or_sig), *args, **injected)
sig_kwargs = dict(sig.kwargs or {})

if we_want_async(*sig.args, func=getattr(sig, "type", None), **sig_kwargs):
# DojoAsyncTask.apply_async tracks async dispatch. Avoid double-counting here.
return sig.apply_async(countdown=countdown)

# Track foreground execution as a "created task" as well (matches historical dojo_async_task behavior)
dojo_async_task_counter.incr(str(sig.task), args=sig.args, kwargs=sig_kwargs)

sig_kwargs.pop("sync", None)
sig = sig.clone(kwargs=sig_kwargs)
eager = sig.apply()
return eager.get(propagate=True)
3 changes: 2 additions & 1 deletion dojo/endpoint/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dojo.authorization.authorization import user_has_permission_or_403
from dojo.authorization.authorization_decorators import user_is_authorized
from dojo.authorization.roles_permissions import Permissions
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.endpoint.queries import get_authorized_endpoints
from dojo.endpoint.utils import clean_hosts_run, endpoint_meta_import
from dojo.filters import EndpointFilter, EndpointFilterWithoutObjectLookups
Expand Down Expand Up @@ -373,7 +374,7 @@ def endpoint_bulk_update_all(request, pid=None):
product_calc = list(Product.objects.filter(endpoint__id__in=endpoints_to_update).distinct())
endpoints.delete()
for prod in product_calc:
calculate_grade(prod.id)
dojo_dispatch_task(calculate_grade, prod.id)

if skipped_endpoint_count > 0:
add_error_message_to_response(f"Skipped deletion of {skipped_endpoint_count} endpoints because you are not authorized.")
Expand Down
3 changes: 2 additions & 1 deletion dojo/engagement/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.dispatch import receiver

import dojo.jira_link.helper as jira_helper
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.models import Engagement

logger = logging.getLogger(__name__)
Expand All @@ -16,7 +17,7 @@ def close_engagement(eng):
eng.save()

if jira_helper.get_jira_project(eng):
jira_helper.close_epic(eng.id, push_to_jira=True)
dojo_dispatch_task(jira_helper.close_epic, eng.id, push_to_jira=True)


def reopen_engagement(eng):
Expand Down
3 changes: 2 additions & 1 deletion dojo/engagement/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from dojo.authorization.authorization import user_has_permission_or_403
from dojo.authorization.authorization_decorators import user_is_authorized
from dojo.authorization.roles_permissions import Permissions
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.endpoint.utils import save_endpoints_to_add
from dojo.engagement.queries import get_authorized_engagements
from dojo.engagement.services import close_engagement, reopen_engagement
Expand Down Expand Up @@ -390,7 +391,7 @@ def copy_engagement(request, eid):
form = DoneForm(request.POST)
if form.is_valid():
engagement_copy = engagement.copy()
calculate_grade(product.id)
dojo_dispatch_task(calculate_grade, product.id)
messages.add_message(
request,
messages.SUCCESS,
Expand Down
3 changes: 0 additions & 3 deletions dojo/finding/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from django.db.models.query_utils import Q

from dojo.celery import app
from dojo.decorators import dojo_async_task
from dojo.models import Finding, System_Settings

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -45,13 +44,11 @@ def get_finding_models_for_deduplication(finding_ids):
)


@dojo_async_task
@app.task
def do_dedupe_finding_task(new_finding_id, *args, **kwargs):
return do_dedupe_finding_task_internal(Finding.objects.get(id=new_finding_id), *args, **kwargs)


@dojo_async_task
@app.task
def do_dedupe_batch_task(finding_ids, *args, **kwargs):
"""
Expand Down
11 changes: 6 additions & 5 deletions dojo/finding/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import dojo.jira_link.helper as jira_helper
import dojo.risk_acceptance.helper as ra_helper
from dojo.celery import app
from dojo.decorators import dojo_async_task
from dojo.endpoint.utils import endpoint_get_or_create, save_endpoints_to_add
from dojo.file_uploads.helper import delete_related_files
from dojo.finding.deduplication import (
Expand Down Expand Up @@ -391,7 +390,6 @@ def add_findings_to_auto_group(name, findings, group_by, *, create_finding_group
finding_group.findings.add(*findings)


@dojo_async_task
@app.task
def post_process_finding_save(finding_id, dedupe_option=True, rules_option=True, product_grading_option=True, # noqa: FBT002
issue_updater_option=True, push_to_jira=False, user=None, *args, **kwargs): # noqa: FBT002 - this is bit hard to fix nice have this universally fixed
Expand Down Expand Up @@ -436,7 +434,9 @@ def post_process_finding_save_internal(finding, dedupe_option=True, rules_option

if product_grading_option:
if system_settings.enable_product_grade:
calculate_grade(finding.test.engagement.product.id)
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import

dojo_dispatch_task(calculate_grade, finding.test.engagement.product.id)
else:
deduplicationLogger.debug("skipping product grading because it's disabled in system settings")

Expand All @@ -453,7 +453,6 @@ def post_process_finding_save_internal(finding, dedupe_option=True, rules_option
jira_helper.push_to_jira(finding.finding_group)


@dojo_async_task
@app.task
def post_process_findings_batch(finding_ids, *args, dedupe_option=True, rules_option=True, product_grading_option=True,
issue_updater_option=True, push_to_jira=False, user=None, **kwargs):
Expand Down Expand Up @@ -496,7 +495,9 @@ def post_process_findings_batch(finding_ids, *args, dedupe_option=True, rules_op
tool_issue_updater.async_tool_issue_update(finding)

if product_grading_option and system_settings.enable_product_grade:
calculate_grade(findings[0].test.engagement.product.id)
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import

dojo_dispatch_task(calculate_grade, findings[0].test.engagement.product.id)

if push_to_jira:
for finding in findings:
Expand Down
5 changes: 3 additions & 2 deletions dojo/finding/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
user_is_authorized,
)
from dojo.authorization.roles_permissions import Permissions
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.filters import (
AcceptedFindingFilter,
AcceptedFindingFilterWithoutObjectLookups,
Expand Down Expand Up @@ -1082,7 +1083,7 @@ def process_form(self, request: HttpRequest, finding: Finding, context: dict):
product = finding.test.engagement.product
finding.delete()
# Update the grade of the product async
calculate_grade(product.id)
dojo_dispatch_task(calculate_grade, product.id)
# Add a message to the request that the finding was successfully deleted
messages.add_message(
request,
Expand Down Expand Up @@ -1353,7 +1354,7 @@ def copy_finding(request, fid):
test = form.cleaned_data.get("test")
product = finding.test.engagement.product
finding_copy = finding.copy(test=test)
calculate_grade(product.id)
dojo_dispatch_task(calculate_grade, product.id)
messages.add_message(
request,
messages.SUCCESS,
Expand Down
5 changes: 3 additions & 2 deletions dojo/finding_group/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dojo.authorization.authorization import user_has_permission_or_403
from dojo.authorization.authorization_decorators import user_is_authorized
from dojo.authorization.roles_permissions import Permissions
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.filters import (
FindingFilter,
FindingFilterWithoutObjectLookups,
Expand Down Expand Up @@ -100,7 +101,7 @@ def view_finding_group(request, fgid):
elif not finding_group.has_jira_issue:
jira_helper.finding_group_link_jira(request, finding_group, jira_issue)
elif push_to_jira:
jira_helper.push_to_jira(finding_group, sync=True)
dojo_dispatch_task(jira_helper.push_to_jira, finding_group, sync=True)

finding_group.save()
return HttpResponseRedirect(reverse("view_test", args=(finding_group.test.id,)))
Expand Down Expand Up @@ -200,7 +201,7 @@ def push_to_jira(request, fgid):

# it may look like success here, but the push_to_jira are swallowing exceptions
# but cant't change too much now without having a test suite, so leave as is for now with the addition warning message to check alerts for background errors.
if jira_helper.push_to_jira(group, sync=True):
if dojo_dispatch_task(jira_helper.push_to_jira, group, sync=True):
messages.add_message(
request,
messages.SUCCESS,
Expand Down
Loading
Loading