Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions backend/api_v2/notification.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,52 @@
import logging

from notification_v2.helper import NotificationHelper
from notification_v2.helper import dispatch_with_delivery_mode
from notification_v2.models import Notification
from pipeline_v2.dto import PipelineStatusPayload
from workflow_manager.workflow_v2.enums import ExecutionStatus
from workflow_manager.workflow_v2.models.execution import WorkflowExecution

from api_v2.models import APIDeployment

logger = logging.getLogger(__name__)


_FAILURE_STATUSES = {ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value}


class APINotification:
def __init__(self, api: APIDeployment, workflow_execution: WorkflowExecution) -> None:
self.notifications = Notification.objects.filter(api=api, is_active=True)
self.api = api
self.workflow_execution = workflow_execution

def send(self):
if not self.notifications.count():
logger.info(f"No notifications found for api {self.api}")
def send(self) -> None:
# Failure if the run hit a non-success terminal state OR any file errored.
# Partial-success runs land as status=COMPLETED with failed_files>0, so the
# status check alone misses them — see callback aggregation rules.
failed_files = self.workflow_execution.failed_files or 0
is_failure = (
self.workflow_execution.status in _FAILURE_STATUSES or failed_files > 0
)
if not is_failure:
# Success path: skip rows that opted into failure-only alerts.
self.notifications = self.notifications.filter(notify_on_failures=False)

if not self.notifications.exists():
logger.info(
"No notifications to dispatch for api %s (status=%s, failed_files=%s)",
self.api,
self.workflow_execution.status,
failed_files,
)
return
logger.info(f"Sending api status notification for api {self.api}")
logger.info(
"Sending api status notification for api %s (status=%s, successful=%s, failed=%s)",
self.api,
self.workflow_execution.status,
self.workflow_execution.successful_files or 0,
failed_files,
)

payload_dto = PipelineStatusPayload(
type="API",
Expand All @@ -29,8 +55,13 @@ def send(self):
status=self.workflow_execution.status,
execution_id=self.workflow_execution.id,
error_message=self.workflow_execution.error_message,
total_files=self.workflow_execution.total_files,
successful_files=self.workflow_execution.successful_files,
failed_files=failed_files,
)

NotificationHelper.send_notification(
notifications=self.notifications, payload=payload_dto.to_dict()
dispatch_with_delivery_mode(
list(self.notifications),
payload_dto.to_dict(),
error_context=f"api={self.api.id}",
)
9 changes: 9 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str |

INDEXING_FLAG_TTL = int(get_required_setting("INDEXING_FLAG_TTL"))
NOTIFICATION_TIMEOUT = int(get_required_setting("NOTIFICATION_TIMEOUT", "5"))
# Window for clubbing BATCHED notifications — also the flush cadence (seconds).
# Default 1800 (30 min). Per-notification buffer rows precompute flush_after at
# enqueue time, so changing this only affects rows enqueued after the restart.
NOTIFICATION_CLUB_INTERVAL = int(os.environ.get("NOTIFICATION_CLUB_INTERVAL", "1800"))
# Retention for terminal NotificationBuffer rows (DISPATCHED / DEAD_LETTER).
# PENDING rows are never GC'd regardless of age.
NOTIFICATION_BUFFER_RETENTION_DAYS = int(
os.environ.get("NOTIFICATION_BUFFER_RETENTION_DAYS", "7")
)
ATOMIC_REQUESTS = CommonUtils.str_to_bool(
os.environ.get("DJANGO_ATOMIC_REQUESTS", "False")
)
Expand Down
6 changes: 6 additions & 0 deletions backend/configuration/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class ConfigKey(Enum):
max_value=settings.MAX_PARALLEL_FILE_BATCHES_MAX_VALUE,
)

NOTIFICATION_CLUB_INTERVAL = ConfigSpec(
default=settings.NOTIFICATION_CLUB_INTERVAL,
value_type=ConfigType.INT,
help_text="Window (seconds) for clubbing BATCHED notifications.",
)

def cast_value(self, raw_value: Any):
converters = {
ConfigType.INT: int,
Expand Down
138 changes: 138 additions & 0 deletions backend/notification_v2/clubbed_renderer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Clubbed notification renderer.

Builds one canonical JSON envelope from a group of buffered execution events
and emits the platform-appropriate dispatch payload. Stays separate from the
single-event SlackWebhook / APIWebhook providers so immediate-dispatch behavior
stays untouched.

Envelope shape (always the same — single-event groups use this too so consumers
never need to branch on "is this batched?"):

{
"kind": "batch",
"summary": {
"pipeline": "<name>",
"interval_minutes": 30,
"total": N, "succeeded": S, "failed": F
},
"events": [{"execution_id": ..., "status": ..., "error": ...?}, ...]
}
"""

from __future__ import annotations

import logging
from typing import Any

from notification_v2.enums import PlatformType

logger = logging.getLogger(__name__)

# Hard cap on events per dispatch — extras roll over to the next flush tick.
# Bounds memory + payload size and prevents a runaway backlog from creating an
# unbounded HTTP body.
MAX_BATCH_SIZE = 500
# How many events Slack renders inline before collapsing the rest under a
# "… and K more" footer. Slack tolerates much larger payloads, but readability
# tanks past ~25 lines.
SLACK_MAX_DISPLAY_EVENTS = 25

_SUCCESS_STATUSES = {"COMPLETED", "SUCCESS"}


def _is_success(status: str | None) -> bool:
if not status:
return False
return status.upper() in _SUCCESS_STATUSES


def _event_from_payload(payload: dict[str, Any]) -> dict[str, Any]:
event: dict[str, Any] = {
"execution_id": payload.get("execution_id"),
"status": payload.get("status"),
}
error_message = payload.get("error_message")
if error_message:
event["error"] = error_message
return event


def build_envelope(
payloads: list[dict[str, Any]], interval_seconds: int
) -> dict[str, Any]:
"""Build the canonical batch envelope.

Caps the events list at MAX_BATCH_SIZE; oldest-first ordering is the
caller's responsibility (the flush job sorts by created_at).
"""
capped = payloads[:MAX_BATCH_SIZE]
succeeded = sum(1 for p in capped if _is_success(p.get("status")))
failed = len(capped) - succeeded
# Multiple pipelines can share an (org, url, auth_sig) group; we surface
# the first one's name as a representative. Mixed-pipeline batches are
# rare in practice and a v2 enhancement would aggregate distinct names.
pipeline_name = capped[0].get("pipeline_name") if capped else None
return {
"kind": "batch",
"summary": {
"pipeline": pipeline_name,
"interval_minutes": max(1, interval_seconds // 60),
"total": len(capped),
"succeeded": succeeded,
"failed": failed,
},
"events": [_event_from_payload(p) for p in capped],
}


def _slack_event_line(event: dict[str, Any]) -> str:
parts = [f"— {event.get('execution_id') or 'unknown'}: {event.get('status')}"]
if event.get("error"):
parts.append(f"({event['error']})")
return " ".join(parts)


def render_for_slack(envelope: dict[str, Any]) -> dict[str, Any]:
"""Format the envelope as a Slack-compatible payload dict.

Returns the body shape Slack incoming webhooks expect (`text` field with
mrkdwn). Truncates inline events at SLACK_MAX_DISPLAY_EVENTS.
"""
summary = envelope["summary"]
events: list[dict[str, Any]] = envelope["events"]
pipeline = summary.get("pipeline") or "pipeline"

header = f"*[Unstract] {summary['total']} executions for `{pipeline}`*"
counts = f"✅ {summary['succeeded']} succeeded ❌ {summary['failed']} failed"

visible = events[:SLACK_MAX_DISPLAY_EVENTS]
lines = [_slack_event_line(e) for e in visible]
overflow = len(events) - len(visible)
if overflow > 0:
lines.append(f"… and {overflow} more executions")

body = "\n".join([header, counts, *lines])
return {"text": body}


def render_clubbed_message(
payloads: list[dict[str, Any]], platform: str, interval_seconds: int
) -> dict[str, Any]:
"""Top-level entry point — returns the dispatch body for ``platform``.

Slack receives the rendered text payload; raw API webhooks receive the
canonical envelope unchanged so downstream consumers can parse it
programmatically.
"""
envelope = build_envelope(payloads, interval_seconds)
if platform == PlatformType.SLACK.value:
return render_for_slack(envelope)
if platform == PlatformType.API.value:
return envelope
# Unknown platform — fall back to the raw envelope and warn so misrouted
# rows don't drop silently.
logger.warning(
"Unknown platform %s for clubbed dispatch; returning raw envelope",
platform,
)
return envelope
34 changes: 34 additions & 0 deletions backend/notification_v2/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,37 @@ class PlatformType(Enum):
@classmethod
def choices(cls):
return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls]


class DeliveryMode(Enum):
"""Per-notification dispatch mode.

IMMEDIATE fires on every workflow completion (pre-existing behavior).
BATCHED buffers events into NotificationBuffer and flushes them as one
clubbed message per (org, webhook_url, auth_sig) every
NOTIFICATION_CLUB_INTERVAL seconds.
"""

IMMEDIATE = "IMMEDIATE"
BATCHED = "BATCHED"

@classmethod
def choices(cls):
return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls]


class BufferStatus(Enum):
"""Lifecycle states for a NotificationBuffer row.

PENDING — waiting for the next flush tick.
DISPATCHED — successfully sent as part of a clubbed message.
DEAD_LETTER — Celery exhausted retries; terminal, never re-picked.
"""

PENDING = "PENDING"
DISPATCHED = "DISPATCHED"
DEAD_LETTER = "DEAD_LETTER"

@classmethod
def choices(cls):
return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls]
Loading