Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 87 additions & 87 deletions src/sentry/tasks/llm_issue_detection/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import sentry_sdk
from django.conf import settings
from pydantic import BaseModel, Field
from urllib3.exceptions import TimeoutError

from sentry import features, options
from sentry.constants import VALID_PLATFORMS
Expand All @@ -30,10 +29,10 @@
logger = logging.getLogger("sentry.tasks.llm_issue_detection")

SEER_ANALYZE_ISSUE_ENDPOINT_PATH = "/v1/automation/issue-detection/analyze"
SEER_TIMEOUT_S = 180
SEER_TIMEOUT_S = 10
START_TIME_DELTA_MINUTES = 60
TRANSACTION_BATCH_SIZE = 50
NUM_TRANSACTIONS_TO_PROCESS = 10
TRACES_TO_SEND_TO_SEER = 20
TRACE_PROCESSING_TTL_SECONDS = 7200
# Character limit for LLM-generated fields to protect against abuse.
# Word limits are enforced by Seer's prompt (see seer/automation/issue_detection/analyze.py).
Expand All @@ -49,15 +48,32 @@
)


def mark_trace_as_processed(trace_id: str) -> bool:
def _get_unprocessed_traces(trace_ids: list[str]) -> set[str]:
"""Return set of trace_ids that have NOT been processed"""
if not trace_ids:
return set()
cluster = redis_clusters.get("default")
keys = [f"llm_detection:processed:{tid}" for tid in trace_ids]
results = cluster.mget(keys)
return {tid for tid, val in zip(trace_ids, results) if val is None}


def mark_traces_as_processed(trace_ids: list[str]) -> None:
"""
Mark trace as processed for LLM issue detection to prevent duplicate analysis.
Returns True if successfully marked, False if already marked.
Mark traces as processed for LLM issue detection to prevent duplicate analysis.

Will overwrite existing keys to refresh the TTL, since reaching this point
means we have confirmation that the trace is being processed.
"""
if not trace_ids:
return

cluster = redis_clusters.get("default")
key = f"llm_detection:processed:{trace_id}"
result = cluster.set(key, "1", nx=True, ex=TRACE_PROCESSING_TTL_SECONDS)
return bool(result)
with cluster.pipeline() as pipeline:
for trace_id in trace_ids:
key = f"llm_detection:processed:{trace_id}"
pipeline.set(key, "1", ex=TRACE_PROCESSING_TTL_SECONDS)
pipeline.execute()


class DetectedIssue(BaseModel):
Expand All @@ -76,11 +92,6 @@ class DetectedIssue(BaseModel):
transaction_name: str


class IssueDetectionResponse(BaseModel):
issues: list[DetectedIssue]
traces_analyzed: int


class IssueDetectionRequest(BaseModel):
traces: list[TraceMetadata]
organization_id: int
Expand Down Expand Up @@ -113,17 +124,11 @@ def get_base_platform(platform: str | None) -> str | None:

def create_issue_occurrence_from_detection(
detected_issue: DetectedIssue,
project_id: int | None = None,
project: Project | None = None,
project: Project,
) -> None:
"""
Create and produce an IssueOccurrence from an LLM-detected issue.
"""
if project is None:
if project_id is None:
raise ValueError("Either project or project_id must be provided")
project = Project.objects.get_from_cache(id=project_id)

event_id = uuid4().hex
occurrence_id = uuid4().hex
detection_time = datetime.now(UTC)
Expand Down Expand Up @@ -200,7 +205,7 @@ def get_enabled_project_ids() -> list[int]:
@instrumented_task(
name="sentry.tasks.llm_issue_detection.run_llm_issue_detection",
namespace=issues_tasks,
processing_deadline_duration=120,
processing_deadline_duration=120, # 2 minutes
)
def run_llm_issue_detection() -> None:
"""
Expand All @@ -225,7 +230,7 @@ def run_llm_issue_detection() -> None:
@instrumented_task(
name="sentry.tasks.llm_issue_detection.detect_llm_issues_for_project",
namespace=issues_tasks,
processing_deadline_duration=10 * 60,
processing_deadline_duration=180, # 3 minutes
)
def detect_llm_issues_for_project(project_id: int) -> None:
"""
Expand All @@ -252,70 +257,65 @@ def detect_llm_issues_for_project(project_id: int) -> None:
if not evidence_traces:
return

# Shuffle to randomize order
# Shuffle to randomize selection
random.shuffle(evidence_traces)
processed_traces = 0

for trace in evidence_traces:
if processed_traces >= NUM_TRANSACTIONS_TO_PROCESS:
break

if not mark_trace_as_processed(trace.trace_id):
sentry_sdk.metrics.count(
"llm_issue_detection.trace.skipped",
1,
)
continue

trace_id = trace.trace_id
sentry_sdk.metrics.count(
"llm_issue_detection.seer_request", 1, attributes={"trace_id": trace_id}
)
seer_request = IssueDetectionRequest(
traces=[trace],
organization_id=organization_id,
project_id=project_id,

# Bulk check which traces are already processed
all_trace_ids = [t.trace_id for t in evidence_traces]
unprocessed_ids = _get_unprocessed_traces(all_trace_ids)
skipped = len(all_trace_ids) - len(unprocessed_ids)
if skipped:
sentry_sdk.metrics.count("llm_issue_detection.trace.skipped", skipped)

# Take up to TRACES_TO_SEND_TO_SEER unprocessed traces
traces_to_send: list[TraceMetadata] = [
t for t in evidence_traces if t.trace_id in unprocessed_ids
][:TRACES_TO_SEND_TO_SEER]

if not traces_to_send:
return
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in trace deduplication logic

Medium Severity

The refactored trace deduplication logic splits the atomic check-and-set operation into separate check and set steps. The old code used nx=True for atomic deduplication, but the new code checks unprocessed traces via _get_unprocessed_traces, then later marks them after receiving a 202 response. If two task instances run concurrently for the same project, both can identify the same traces as unprocessed, send duplicate batches to Seer, and both receive 202 responses, causing duplicate trace analysis.

Additional Locations (1)

Fix in Cursor Fix in Web


sentry_sdk.metrics.count(
"llm_issue_detection.seer_request",
1,
attributes={"trace_count": len(traces_to_send)},
)

seer_request = IssueDetectionRequest(
traces=traces_to_send,
organization_id=organization_id,
project_id=project_id,
)

response = make_signed_seer_api_request(
connection_pool=seer_issue_detection_connection_pool,
path=SEER_ANALYZE_ISSUE_ENDPOINT_PATH,
body=json.dumps(seer_request.dict()).encode("utf-8"),
timeout=SEER_TIMEOUT_S,
retries=0,
)
Comment on lines +290 to +296
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The call to make_signed_seer_api_request is not wrapped in a try-except block, which can cause the task to crash on network errors and lead to repeated processing failures.
Severity: MEDIUM

Suggested Fix

Wrap the make_signed_seer_api_request call in a try-except Exception block. In the except block, log the exception for visibility and allow the task to exit gracefully without crashing. This ensures that a transient network failure does not stop the entire task and allows the unprocessed traces to be retried on the next scheduled run.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/sentry/tasks/llm_issue_detection/detection.py#L290-L296

Potential issue: The call to `make_signed_seer_api_request` is not wrapped in a
`try-except` block. While the previous synchronous timeout handling was removed, other
network-related exceptions like `ConnectionError` or DNS failures can still occur. An
unhandled exception will cause the Celery task to crash. Because the crash happens
before `mark_traces_as_processed` is called, the same batch of traces will be picked up
and retried on the next scheduled run, potentially causing them to be stuck in a retry
loop until their Redis TTL expires. Other parts of the codebase making similar Seer API
calls include robust exception handling.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we send all traces in a single batch request. There's no loop to continue, so catching errors serves no purpose.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed timeout exception handling for Seer request

Medium Severity

The TimeoutError exception handling was removed when converting to batch processing. If make_signed_seer_api_request times out due to network issues, the entire task crashes instead of handling the error gracefully. The old code caught TimeoutError from urllib3 and logged it while continuing to process other traces. The new code has no exception handling, causing the task to fail completely on timeout.

Fix in Cursor Fix in Web


if response.status == 202:
mark_traces_as_processed([trace.trace_id for trace in traces_to_send])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task crash after Seer accepts work causes duplicate processing

Medium Severity

Traces are marked as processed AFTER Seer returns 202, rather than before submission. If the task crashes or mark_traces_as_processed fails after Seer accepts the work but before Redis marks complete, those traces won't be marked and will be resubmitted on the next run. This causes Seer to process the same traces multiple times, potentially creating duplicate issues. The old code atomically marked traces before sending to Seer, preventing this failure mode.

Fix in Cursor Fix in Web


logger.info(
"llm_issue_detection.request_accepted",
extra={
"project_id": project_id,
"organization_id": organization_id,
"trace_count": len(traces_to_send),
},
)
return

try:
response = make_signed_seer_api_request(
connection_pool=seer_issue_detection_connection_pool,
path=SEER_ANALYZE_ISSUE_ENDPOINT_PATH,
body=json.dumps(seer_request.dict()).encode("utf-8"),
timeout=SEER_TIMEOUT_S,
retries=0,
)
except TimeoutError:
logger.exception("LLM Issue Detection Seer timeout", extra={"trace_id": trace_id})
continue

if response.status < 200 or response.status >= 300:
logger.error(
"LLM issue Detection Seer response failure",
extra={
"status_code": response.status,
"response_data": response.data,
"trace_id": trace_id,
},
)
continue

raw_response_data = response.json()
response_data = IssueDetectionResponse.parse_obj(raw_response_data)
processed_traces += response_data.traces_analyzed

for detected_issue in response_data.issues:
logger.info(
"LLM Issue Detection Result",
extra={
"title": detected_issue.title,
"trace_id": trace_id,
"project_id": project_id,
"category": detected_issue.category,
"subcategory": detected_issue.subcategory,
},
)
create_issue_occurrence_from_detection(
detected_issue=detected_issue,
project_id=project_id,
)
# Log (+ send to sentry) unexpected responses
logger.error(
"llm_issue_detection.unexpected_response",
extra={
"status_code": response.status,
"response_data": response.data,
"project_id": project_id,
"organization_id": organization_id,
"trace_count": len(traces_to_send),
},
)
Loading
Loading