-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
ref(llm-detection): Convert detect_llm_issues_for_project to async batch processing #106839
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,6 @@ | |
| import sentry_sdk | ||
| from django.conf import settings | ||
| from pydantic import BaseModel, Field | ||
| from urllib3.exceptions import TimeoutError | ||
|
|
||
| from sentry import features, options | ||
| from sentry.constants import VALID_PLATFORMS | ||
|
|
@@ -30,10 +29,10 @@ | |
| logger = logging.getLogger("sentry.tasks.llm_issue_detection") | ||
|
|
||
| SEER_ANALYZE_ISSUE_ENDPOINT_PATH = "/v1/automation/issue-detection/analyze" | ||
| SEER_TIMEOUT_S = 180 | ||
| SEER_TIMEOUT_S = 10 | ||
| START_TIME_DELTA_MINUTES = 60 | ||
| TRANSACTION_BATCH_SIZE = 50 | ||
| NUM_TRANSACTIONS_TO_PROCESS = 10 | ||
| TRACES_TO_SEND_TO_SEER = 20 | ||
| TRACE_PROCESSING_TTL_SECONDS = 7200 | ||
| # Character limit for LLM-generated fields to protect against abuse. | ||
| # Word limits are enforced by Seer's prompt (see seer/automation/issue_detection/analyze.py). | ||
|
|
@@ -49,15 +48,32 @@ | |
| ) | ||
|
|
||
|
|
||
| def mark_trace_as_processed(trace_id: str) -> bool: | ||
| def _get_unprocessed_traces(trace_ids: list[str]) -> set[str]: | ||
| """Return set of trace_ids that have NOT been processed""" | ||
| if not trace_ids: | ||
| return set() | ||
| cluster = redis_clusters.get("default") | ||
| keys = [f"llm_detection:processed:{tid}" for tid in trace_ids] | ||
| results = cluster.mget(keys) | ||
| return {tid for tid, val in zip(trace_ids, results) if val is None} | ||
|
|
||
|
|
||
| def mark_traces_as_processed(trace_ids: list[str]) -> None: | ||
| """ | ||
| Mark trace as processed for LLM issue detection to prevent duplicate analysis. | ||
| Returns True if successfully marked, False if already marked. | ||
| Mark traces as processed for LLM issue detection to prevent duplicate analysis. | ||
|
|
||
| Will overwrite existing keys to refresh the TTL, since reaching this point | ||
| means we have confirmation that the trace is being processed. | ||
| """ | ||
| if not trace_ids: | ||
| return | ||
|
|
||
| cluster = redis_clusters.get("default") | ||
| key = f"llm_detection:processed:{trace_id}" | ||
| result = cluster.set(key, "1", nx=True, ex=TRACE_PROCESSING_TTL_SECONDS) | ||
| return bool(result) | ||
| with cluster.pipeline() as pipeline: | ||
| for trace_id in trace_ids: | ||
| key = f"llm_detection:processed:{trace_id}" | ||
| pipeline.set(key, "1", ex=TRACE_PROCESSING_TTL_SECONDS) | ||
| pipeline.execute() | ||
|
|
||
|
|
||
| class DetectedIssue(BaseModel): | ||
|
|
@@ -76,11 +92,6 @@ class DetectedIssue(BaseModel): | |
| transaction_name: str | ||
|
|
||
|
|
||
| class IssueDetectionResponse(BaseModel): | ||
| issues: list[DetectedIssue] | ||
| traces_analyzed: int | ||
|
|
||
|
|
||
| class IssueDetectionRequest(BaseModel): | ||
| traces: list[TraceMetadata] | ||
| organization_id: int | ||
|
|
@@ -113,17 +124,11 @@ def get_base_platform(platform: str | None) -> str | None: | |
|
|
||
| def create_issue_occurrence_from_detection( | ||
| detected_issue: DetectedIssue, | ||
| project_id: int | None = None, | ||
| project: Project | None = None, | ||
| project: Project, | ||
| ) -> None: | ||
| """ | ||
| Create and produce an IssueOccurrence from an LLM-detected issue. | ||
| """ | ||
| if project is None: | ||
| if project_id is None: | ||
| raise ValueError("Either project or project_id must be provided") | ||
| project = Project.objects.get_from_cache(id=project_id) | ||
|
|
||
| event_id = uuid4().hex | ||
| occurrence_id = uuid4().hex | ||
| detection_time = datetime.now(UTC) | ||
|
|
@@ -200,7 +205,7 @@ def get_enabled_project_ids() -> list[int]: | |
| @instrumented_task( | ||
| name="sentry.tasks.llm_issue_detection.run_llm_issue_detection", | ||
| namespace=issues_tasks, | ||
| processing_deadline_duration=120, | ||
| processing_deadline_duration=120, # 2 minutes | ||
| ) | ||
| def run_llm_issue_detection() -> None: | ||
| """ | ||
|
|
@@ -225,7 +230,7 @@ def run_llm_issue_detection() -> None: | |
| @instrumented_task( | ||
| name="sentry.tasks.llm_issue_detection.detect_llm_issues_for_project", | ||
| namespace=issues_tasks, | ||
| processing_deadline_duration=10 * 60, | ||
| processing_deadline_duration=180, # 3 minutes | ||
| ) | ||
| def detect_llm_issues_for_project(project_id: int) -> None: | ||
| """ | ||
|
|
@@ -252,70 +257,65 @@ def detect_llm_issues_for_project(project_id: int) -> None: | |
| if not evidence_traces: | ||
| return | ||
|
|
||
| # Shuffle to randomize order | ||
| # Shuffle to randomize selection | ||
| random.shuffle(evidence_traces) | ||
| processed_traces = 0 | ||
|
|
||
| for trace in evidence_traces: | ||
| if processed_traces >= NUM_TRANSACTIONS_TO_PROCESS: | ||
| break | ||
|
|
||
| if not mark_trace_as_processed(trace.trace_id): | ||
| sentry_sdk.metrics.count( | ||
| "llm_issue_detection.trace.skipped", | ||
| 1, | ||
| ) | ||
| continue | ||
|
|
||
| trace_id = trace.trace_id | ||
| sentry_sdk.metrics.count( | ||
| "llm_issue_detection.seer_request", 1, attributes={"trace_id": trace_id} | ||
| ) | ||
| seer_request = IssueDetectionRequest( | ||
| traces=[trace], | ||
| organization_id=organization_id, | ||
| project_id=project_id, | ||
|
|
||
| # Bulk check which traces are already processed | ||
| all_trace_ids = [t.trace_id for t in evidence_traces] | ||
| unprocessed_ids = _get_unprocessed_traces(all_trace_ids) | ||
| skipped = len(all_trace_ids) - len(unprocessed_ids) | ||
| if skipped: | ||
| sentry_sdk.metrics.count("llm_issue_detection.trace.skipped", skipped) | ||
|
|
||
| # Take up to TRACES_TO_SEND_TO_SEER unprocessed traces | ||
| traces_to_send: list[TraceMetadata] = [ | ||
| t for t in evidence_traces if t.trace_id in unprocessed_ids | ||
| ][:TRACES_TO_SEND_TO_SEER] | ||
|
|
||
| if not traces_to_send: | ||
| return | ||
|
|
||
| sentry_sdk.metrics.count( | ||
| "llm_issue_detection.seer_request", | ||
| 1, | ||
| attributes={"trace_count": len(traces_to_send)}, | ||
| ) | ||
|
|
||
| seer_request = IssueDetectionRequest( | ||
| traces=traces_to_send, | ||
| organization_id=organization_id, | ||
| project_id=project_id, | ||
| ) | ||
|
|
||
| response = make_signed_seer_api_request( | ||
| connection_pool=seer_issue_detection_connection_pool, | ||
| path=SEER_ANALYZE_ISSUE_ENDPOINT_PATH, | ||
| body=json.dumps(seer_request.dict()).encode("utf-8"), | ||
| timeout=SEER_TIMEOUT_S, | ||
| retries=0, | ||
| ) | ||
|
Comment on lines
+290
to
+296
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: The call to Suggested FixWrap the Prompt for AI AgentDid we get this right? 👍 / 👎 to inform future reviews.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we send all traces in a single batch request. There's no loop to continue, so catching errors serves no purpose.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed timeout exception handling for Seer requestMedium Severity The |
||
|
|
||
| if response.status == 202: | ||
| mark_traces_as_processed([trace.trace_id for trace in traces_to_send]) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Task crash after Seer accepts work causes duplicate processingMedium Severity Traces are marked as processed AFTER Seer returns 202, rather than before submission. If the task crashes or |
||
|
|
||
| logger.info( | ||
| "llm_issue_detection.request_accepted", | ||
| extra={ | ||
| "project_id": project_id, | ||
| "organization_id": organization_id, | ||
| "trace_count": len(traces_to_send), | ||
| }, | ||
| ) | ||
| return | ||
|
|
||
| try: | ||
| response = make_signed_seer_api_request( | ||
| connection_pool=seer_issue_detection_connection_pool, | ||
| path=SEER_ANALYZE_ISSUE_ENDPOINT_PATH, | ||
| body=json.dumps(seer_request.dict()).encode("utf-8"), | ||
| timeout=SEER_TIMEOUT_S, | ||
| retries=0, | ||
| ) | ||
| except TimeoutError: | ||
| logger.exception("LLM Issue Detection Seer timeout", extra={"trace_id": trace_id}) | ||
| continue | ||
|
|
||
| if response.status < 200 or response.status >= 300: | ||
| logger.error( | ||
| "LLM issue Detection Seer response failure", | ||
| extra={ | ||
| "status_code": response.status, | ||
| "response_data": response.data, | ||
| "trace_id": trace_id, | ||
| }, | ||
| ) | ||
| continue | ||
|
|
||
| raw_response_data = response.json() | ||
| response_data = IssueDetectionResponse.parse_obj(raw_response_data) | ||
| processed_traces += response_data.traces_analyzed | ||
|
|
||
| for detected_issue in response_data.issues: | ||
| logger.info( | ||
| "LLM Issue Detection Result", | ||
| extra={ | ||
| "title": detected_issue.title, | ||
| "trace_id": trace_id, | ||
| "project_id": project_id, | ||
| "category": detected_issue.category, | ||
| "subcategory": detected_issue.subcategory, | ||
| }, | ||
| ) | ||
| create_issue_occurrence_from_detection( | ||
| detected_issue=detected_issue, | ||
| project_id=project_id, | ||
| ) | ||
| # Log (+ send to sentry) unexpected responses | ||
| logger.error( | ||
| "llm_issue_detection.unexpected_response", | ||
| extra={ | ||
| "status_code": response.status, | ||
| "response_data": response.data, | ||
| "project_id": project_id, | ||
| "organization_id": organization_id, | ||
| "trace_count": len(traces_to_send), | ||
| }, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition in trace deduplication logic
Medium Severity
The refactored trace deduplication logic splits the atomic check-and-set operation into separate check and set steps. The old code used
nx=Truefor atomic deduplication, but the new code checks unprocessed traces via_get_unprocessed_traces, then later marks them after receiving a 202 response. If two task instances run concurrently for the same project, both can identify the same traces as unprocessed, send duplicate batches to Seer, and both receive 202 responses, causing duplicate trace analysis.Additional Locations (1)
src/sentry/tasks/llm_issue_detection/detection.py#L297-L299