Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion dojo/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,36 @@ class DojoAsyncTask(Task):
Base task class that provides dojo_async_task functionality without using a decorator.

This class:
- Injects user context into task kwargs
- Injects user context into task kwargs (on dispatch via apply_async)
- Restores user context in the worker (on execution via __call__)
- Tracks task calls for performance testing
- Supports all Celery features (signatures, chords, groups, chains)
"""

def __call__(self, *args, **kwargs):
"""
Restore user context in the celery worker via crum.impersonate.

The apply_async method injects ``async_user`` into kwargs when a task
is dispatched. Here we pop it and set it as the current user in
thread-local storage so that all downstream code — including nested
dojo_dispatch_task calls — sees the correct user via
get_current_user().

When a task is called directly (not via apply_async), async_user is
not in kwargs. In that case we leave the existing crum context
intact so that callers who already set a user (e.g. via
crum.impersonate in tests or request middleware) are not disrupted.
"""
if "async_user" not in kwargs:
return super().__call__(*args, **kwargs)

import crum # noqa: PLC0415

user = kwargs.pop("async_user")
with crum.impersonate(user):
return super().__call__(*args, **kwargs)

def apply_async(self, args=None, kwargs=None, **options):
"""Override apply_async to inject user context and track tasks."""
from dojo.decorators import dojo_async_task_counter # noqa: PLC0415 circular import
Expand Down
2 changes: 0 additions & 2 deletions dojo/celery_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur
# Track foreground execution as a "created task" as well (matches historical dojo_async_task behavior)
dojo_async_task_counter.incr(str(sig.task), args=sig.args, kwargs=sig_kwargs)

sig_kwargs.pop("sync", None)
sig = sig.clone(kwargs=sig_kwargs)
eager = sig.apply()
try:
return eager.get(propagate=True)
Expand Down
2 changes: 1 addition & 1 deletion dojo/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def we_want_async(*args, func=None, **kwargs):
logger.debug("dojo_async_task %s: running task in the foreground as sync=True has been found as kwarg", func)
return False

user = kwargs.get("async_user", get_current_user())
user = get_current_user()
logger.debug("async user: %s", user)

if not user:
Expand Down
5 changes: 2 additions & 3 deletions dojo/finding/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ def group_findings_by(finds, finding_group_by_option):
def add_findings_to_auto_group(name, findings, group_by, *, create_finding_groups_for_all_findings=True, **kwargs):
if name is not None and findings is not None and len(findings) > 0:
creator = get_current_user()
if not creator:
creator = kwargs.get("async_user")
test = findings[0].test

if create_finding_groups_for_all_findings or len(findings) > 1:
Expand Down Expand Up @@ -470,6 +468,7 @@ def post_process_findings_batch(
push_to_jira=False,
jira_instance_id=None,
user=None,
sync=False,
**kwargs,
):

Expand Down Expand Up @@ -513,7 +512,7 @@ def post_process_findings_batch(
if product_grading_option and system_settings.enable_product_grade:
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import

dojo_dispatch_task(calculate_grade, findings[0].test.engagement.product.id)
dojo_dispatch_task(calculate_grade, findings[0].test.engagement.product.id, sync=sync)

# If we received the ID of a jira instance, then we need to determine the keep in sync behavior
jira_instance = None
Expand Down
13 changes: 4 additions & 9 deletions dojo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ def do_false_positive_history(finding, *args, **kwargs):
existing_non_fp_findings = existing_findings.filter(active=True).exclude(false_p=True)
to_mark_as_fp.update(set(existing_non_fp_findings))

# Remove the async user kwarg because save() really does not like it
# Would rather not add anything to Finding.save()
if "async_user" in kwargs:
kwargs.pop("async_user")

for find in to_mark_as_fp:
deduplicationLogger.debug(
"FALSE_POSITIVE_HISTORY: Marking Finding %i:%s from %s as false positive",
Expand Down Expand Up @@ -2065,7 +2060,7 @@ def async_delete_chunk_task(objects, **kwargs):
"""
Module-level Celery task to delete a chunk of objects.

Accepts **kwargs for async_user and _pgh_context injected by dojo_dispatch_task.
Accepts **kwargs for _pgh_context injected by dojo_dispatch_task.
Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail.
"""
max_retries = 3
Expand Down Expand Up @@ -2119,7 +2114,7 @@ def async_delete_crawl_task(obj, model_list, **kwargs):
"""
Module-level Celery task to crawl and delete related objects.

Accepts **kwargs for async_user and _pgh_context injected by dojo_dispatch_task.
Accepts **kwargs for _pgh_context injected by dojo_dispatch_task.
Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail.
"""
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
Expand Down Expand Up @@ -2158,7 +2153,7 @@ def async_delete_task(obj, **kwargs):
"""
Module-level Celery task to delete an object and its related objects.

Accepts **kwargs for async_user and _pgh_context injected by dojo_dispatch_task.
Accepts **kwargs for _pgh_context injected by dojo_dispatch_task.
Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail.
"""
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
Expand Down Expand Up @@ -2196,7 +2191,7 @@ def delete(self, obj, **kwargs):
Entry point to delete an object asynchronously.

Dispatches to async_delete_task via dojo_dispatch_task to ensure proper
handling of async_user and _pgh_context.
handling of user context and _pgh_context.
"""
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import

Expand Down
8 changes: 4 additions & 4 deletions unittests/test_async_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
Unit tests for async_delete functionality.
These tests verify that the async_delete class works correctly with dojo_dispatch_task,
which injects async_user and _pgh_context kwargs into task calls.
which injects user context and _pgh_context kwargs into task calls.
The original bug was that @app.task decorated instance methods didn't properly handle
the injected kwargs, causing TypeError: unexpected keyword argument 'async_user'.
the injected kwargs, causing TypeError for unexpected keyword arguments.
"""
import logging

Expand Down Expand Up @@ -120,8 +120,8 @@ def test_async_delete_simple_object(self):

# Use impersonate to set current user context (required for block_execution to work)
with impersonate(self.testuser):
# This would raise TypeError before the fix:
# TypeError: delete() got an unexpected keyword argument 'async_user'
# This would raise TypeError before the fix when injected kwargs
# were not handled properly by task functions
async_del = async_delete()
async_del.delete(finding)

Expand Down