Skip to content
20 changes: 20 additions & 0 deletions dojo/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3102,6 +3102,26 @@ def validate(self, data):
return data


class CeleryStatusSerializer(serializers.Serializer):
worker_status = serializers.BooleanField(read_only=True)
broker_status = serializers.BooleanField(read_only=True)
queue_length = serializers.IntegerField(allow_null=True, read_only=True)
task_time_limit = serializers.IntegerField(allow_null=True, read_only=True)
task_soft_time_limit = serializers.IntegerField(allow_null=True, read_only=True)
task_default_expires = serializers.IntegerField(allow_null=True, read_only=True)


class CeleryQueueTaskDetailSerializer(serializers.Serializer):
task_name = serializers.CharField(read_only=True)
count = serializers.IntegerField(read_only=True)
oldest_position = serializers.IntegerField(read_only=True)
newest_position = serializers.IntegerField(read_only=True)
oldest_eta = serializers.CharField(allow_null=True, read_only=True)
newest_eta = serializers.CharField(allow_null=True, read_only=True)
earliest_expires = serializers.CharField(allow_null=True, read_only=True)
latest_expires = serializers.CharField(allow_null=True, read_only=True)


class FindingNoteSerializer(serializers.Serializer):
note_id = serializers.IntegerField()

Expand Down
78 changes: 78 additions & 0 deletions dojo/api_v2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,14 @@
from dojo.utils import (
async_delete,
generate_file_response,
get_celery_queue_details,
get_celery_queue_length,
get_celery_worker_status,
get_setting,
get_system_setting,
process_tag_notifications,
purge_celery_queue,
purge_celery_queue_by_task_name,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -3260,6 +3265,79 @@ def get_queryset(self):
return System_Settings.objects.all().order_by("id")


class CeleryViewSet(viewsets.ViewSet):
permission_classes = (permissions.IsSuperUser, DjangoModelPermissions)
queryset = System_Settings.objects.none()

@extend_schema(
responses=serializers.CeleryStatusSerializer,
summary="Get Celery worker and queue status",
description=(
"Returns Celery worker liveness, pending queue length, and the active task "
"timeout/expiry configuration. Uses the Celery control channel (pidbox) for "
"worker status so it works correctly even when the task queue is clogged."
),
)
@action(detail=False, methods=["get"], url_path="status")
def status(self, request):
queue_length = get_celery_queue_length()
data = {
"worker_status": get_celery_worker_status(),
"broker_status": queue_length is not None,
"queue_length": queue_length,
"task_time_limit": getattr(settings, "CELERY_TASK_TIME_LIMIT", None),
"task_soft_time_limit": getattr(settings, "CELERY_TASK_SOFT_TIME_LIMIT", None),
"task_default_expires": getattr(settings, "CELERY_TASK_DEFAULT_EXPIRES", None),
}
return Response(serializers.CeleryStatusSerializer(data).data)

@extend_schema(
request=None,
responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}},
summary="Purge all pending Celery tasks from the queue",
description=(
"Removes all pending tasks from the default Celery queue. Tasks already being "
"executed by workers are not affected. Note: if deduplication tasks were queued, "
"you may need to re-run deduplication manually via `python manage.py dedupe`."
),
)
@action(detail=False, methods=["post"], url_path="queue/purge")
def queue_purge(self, request):
purged = purge_celery_queue()
return Response({"purged": purged})

@extend_schema(
responses=serializers.CeleryQueueTaskDetailSerializer(many=True),
summary="Get per-task breakdown of the Celery queue",
description=(
"Scans every message in the queue (O(N)) and returns task name, count, and "
"oldest/newest queue positions. May be slow for large queues."
),
)
@action(detail=False, methods=["get"], url_path="queue/details")
def queue_details(self, request):
details = get_celery_queue_details()
if details is None:
return Response({"error": "Unable to read queue details."}, status=503)
return Response(serializers.CeleryQueueTaskDetailSerializer(details, many=True).data)

@extend_schema(
request={"application/json": {"type": "object", "properties": {"task_name": {"type": "string"}}, "required": ["task_name"]}},
responses={200: {"type": "object", "properties": {"purged": {"type": "integer"}}}},
summary="Purge all queued tasks with a given task name",
description="Removes all pending tasks matching the given task name from the default Celery queue.",
)
@action(detail=False, methods=["post"], url_path="queue/task/purge")
def queue_task_purge(self, request):
task_name = request.data.get("task_name", "").strip()
if not task_name:
return Response({"error": "task_name is required."}, status=400)
purged = purge_celery_queue_by_task_name(task_name)
if purged is None:
return Response({"error": "Unable to purge tasks."}, status=503)
return Response({"purged": purged})


# Authorization: superuser
@extend_schema_view(**schema_with_prefetch())
class NotificationsViewSet(
Expand Down
24 changes: 24 additions & 0 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@
DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")),
DD_CELERY_TASK_SERIALIZER=(str, "pickle"),
DD_CELERY_LOG_LEVEL=(str, "INFO"),
# Hard ceiling on task runtime. When reached, the worker process is sent SIGKILL — no cleanup
# code runs. Always set higher than DD_CELERY_TASK_SOFT_TIME_LIMIT. (0 = disabled, no limit)
DD_CELERY_TASK_TIME_LIMIT=(int, 43200), # default: 12 hours
# Raises SoftTimeLimitExceeded inside the task, giving it a chance to clean up before the hard
# kill. Set a few seconds below DD_CELERY_TASK_TIME_LIMIT so cleanup has time to finish.
# (0 = disabled, no limit)
DD_CELERY_TASK_SOFT_TIME_LIMIT=(int, 0),
# If a task sits in the broker queue for longer than this without being picked up by a worker,
# Celery silently discards it — it is never executed and no exception is raised. Does not
# affect tasks that are already running. (0 = disabled, no limit)
DD_CELERY_TASK_DEFAULT_EXPIRES=(int, 43200), # default: 12 hours
DD_TAG_BULK_ADD_BATCH_SIZE=(int, 1000),
# Tagulous slug truncate unique setting. Set to -1 to use tagulous internal default (5)
DD_TAGULOUS_SLUG_TRUNCATE_UNIQUE=(int, -1),
Expand Down Expand Up @@ -288,6 +299,10 @@
DD_IMPORT_REIMPORT_MATCH_BATCH_SIZE=(int, 1000),
# Batch size for import/reimport deduplication processing
DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=(int, 1000),
# Batch size for Redis pipeline when purging the Celery queue by task name
DD_CELERY_QUEUE_PURGE_BATCH_SIZE=(int, 1000),
# Maximum number of tasks to purge in a single per-task purge action
DD_CELERY_QUEUE_PURGE_MAX_TASKS=(int, 10000),
# Delete Auditlogs older than x month; -1 to keep all logs
DD_AUDITLOG_FLUSH_RETENTION_PERIOD=(int, -1),
# Batch size for flushing audit logs per task run
Expand Down Expand Up @@ -1249,6 +1264,13 @@ def saml2_attrib_map_format(din):
CELERY_TASK_SERIALIZER = env("DD_CELERY_TASK_SERIALIZER")
CELERY_LOG_LEVEL = env("DD_CELERY_LOG_LEVEL")

if env("DD_CELERY_TASK_TIME_LIMIT") > 0:
CELERY_TASK_TIME_LIMIT = env("DD_CELERY_TASK_TIME_LIMIT")
if env("DD_CELERY_TASK_SOFT_TIME_LIMIT") > 0:
CELERY_TASK_SOFT_TIME_LIMIT = env("DD_CELERY_TASK_SOFT_TIME_LIMIT")
if env("DD_CELERY_TASK_DEFAULT_EXPIRES") > 0:
CELERY_TASK_DEFAULT_EXPIRES = env("DD_CELERY_TASK_DEFAULT_EXPIRES")

if len(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS")) > 0:
CELERY_BROKER_TRANSPORT_OPTIONS = json.loads(env("DD_CELERY_BROKER_TRANSPORT_OPTIONS"))

Expand Down Expand Up @@ -1779,6 +1801,8 @@ def saml2_attrib_map_format(din):
TRACK_IMPORT_HISTORY = env("DD_TRACK_IMPORT_HISTORY")
IMPORT_REIMPORT_MATCH_BATCH_SIZE = env("DD_IMPORT_REIMPORT_MATCH_BATCH_SIZE")
IMPORT_REIMPORT_DEDUPE_BATCH_SIZE = env("DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE")
CELERY_QUEUE_PURGE_BATCH_SIZE = env("DD_CELERY_QUEUE_PURGE_BATCH_SIZE")
CELERY_QUEUE_PURGE_MAX_TASKS = env("DD_CELERY_QUEUE_PURGE_MAX_TASKS")

# ------------------------------------------------------------------------------
# JIRA
Expand Down
5 changes: 5 additions & 0 deletions dojo/system_settings/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@
views.SystemSettingsView.as_view(),
name="system_settings",
),
re_path(
r"^celery_status$",
views.CeleryStatusView.as_view(),
name="celery_status",
),
]
51 changes: 13 additions & 38 deletions dojo/system_settings/views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

from django.conf import settings
from django.contrib import messages
from django.core.exceptions import PermissionDenied
from django.http import HttpRequest, HttpResponse
Expand All @@ -9,7 +8,7 @@

from dojo.forms import SystemSettingsForm
from dojo.models import System_Settings
from dojo.utils import add_breadcrumb, get_celery_queue_length, get_celery_worker_status
from dojo.utils import add_breadcrumb

logger = logging.getLogger(__name__)

Expand All @@ -30,15 +29,10 @@ def get_context(
request: HttpRequest,
) -> dict:
system_settings_obj = self.get_settings_object()
# Set the initial context
context = {
return {
"system_settings_obj": system_settings_obj,
"form": self.get_form(request, system_settings_obj),
}
# Check the status of celery
self.get_celery_status(context)

return context

def get_form(
self,
Expand Down Expand Up @@ -95,35 +89,6 @@ def validate_form(
return request, True
return request, False

def get_celery_status(
self,
context: dict,
) -> None:
# Celery needs to be set with the setting: CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite'
if hasattr(settings, "CELERY_RESULT_BACKEND"):
# Check the status of Celery by sending calling a celery task
context["celery_bool"] = get_celery_worker_status()

if context["celery_bool"]:
context["celery_msg"] = "Celery is processing tasks."
context["celery_status"] = "Running"
else:
context["celery_msg"] = "Celery does not appear to be up and running. Please ensure celery is running."
context["celery_status"] = "Not Running"

q_len = get_celery_queue_length()
if q_len is None:
context["celery_q_len"] = " It is not possible to identify number of waiting tasks."
elif q_len:
context["celery_q_len"] = f"{q_len} tasks are waiting to be proccessed."
else:
context["celery_q_len"] = "No task is waiting to be proccessed."

else:
context["celery_bool"] = False
context["celery_msg"] = "Celery needs to have the setting CELERY_RESULT_BACKEND = 'db+sqlite:///dojo.celeryresults.sqlite' set in settings.py."
context["celery_status"] = "Unknown"

def get_template(self) -> str:
return "dojo/system_settings.html"

Expand All @@ -148,9 +113,19 @@ def post(
self.permission_check(request)
# Set up the initial context
context = self.get_context(request)
# Check the status of celery
request, _ = self.validate_form(request, context)
# Add some breadcrumbs
add_breadcrumb(title="System settings", top_level=False, request=request)
# Render the page
return render(request, self.get_template(), context)


class CeleryStatusView(View):
def get(
self,
request: HttpRequest,
) -> HttpResponse:
if not request.user.is_superuser:
raise PermissionDenied
add_breadcrumb(title="Celery status", top_level=False, request=request)
return render(request, "dojo/celery_status.html")
5 changes: 5 additions & 0 deletions dojo/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@
{% trans "System Settings" %}
</a>
</li>
<li>
<a href="{% url 'celery_status' %}">
{% trans "Celery Status" %}
</a>
</li>
{% endif %}
{% if "dojo.view_tool_configuration"|has_configuration_permission:request %}
<li>
Expand Down
Loading
Loading