|
1 | 1 | import logging |
2 | 2 | from collections import defaultdict |
3 | | -from concurrent.futures import ThreadPoolExecutor |
4 | 3 | from datetime import datetime |
| 4 | +from multiprocessing.context import TimeoutError |
| 5 | +from multiprocessing.pool import ThreadPool |
5 | 6 | from typing import Any, NotRequired, TypedDict |
6 | 7 |
|
| 8 | +from sentry import options |
7 | 9 | from sentry.uptime.subscriptions.regions import get_region_config |
8 | 10 |
|
9 | 11 | logger = logging.getLogger(__name__) |
@@ -465,32 +467,53 @@ def query_trace_data( |
465 | 467 |
|
466 | 468 | # 1 worker each for spans, errors, performance issues, and optionally uptime |
467 | 469 | max_workers = 4 if include_uptime else 3 |
468 | | - query_thread_pool = ThreadPoolExecutor(thread_name_prefix=__name__, max_workers=max_workers) |
| 470 | + query_thread_pool = ThreadPool(processes=max_workers) |
469 | 471 | with query_thread_pool: |
470 | | - spans_future = query_thread_pool.submit( |
471 | | - Spans.run_trace_query, |
472 | | - trace_id=trace_id, |
473 | | - params=snuba_params, |
474 | | - referrer=referrer.value, |
475 | | - config=SearchResolverConfig(), |
476 | | - additional_attributes=additional_attributes, |
477 | | - ) |
478 | | - errors_future = query_thread_pool.submit( |
479 | | - _run_errors_query, |
480 | | - errors_query, |
481 | | - ) |
482 | | - occurrence_future = query_thread_pool.submit( |
483 | | - _run_perf_issues_query, |
484 | | - occurrence_query, |
485 | | - ) |
486 | | - uptime_future = None |
| 472 | + queries = { |
| 473 | + query_thread_pool.apply_async( |
| 474 | + Spans.run_trace_query, |
| 475 | + kwds={ |
| 476 | + "trace_id": trace_id, |
| 477 | + "params": snuba_params, |
| 478 | + "referrer": referrer.value, |
| 479 | + "config": SearchResolverConfig(), |
| 480 | + "additional_attributes": additional_attributes, |
| 481 | + }, |
| 482 | + ): "spans", |
| 483 | + query_thread_pool.apply_async( |
| 484 | + _run_errors_query, |
| 485 | + args=(errors_query,), |
| 486 | + ): "errors", |
| 487 | + query_thread_pool.apply_async( |
| 488 | + _run_perf_issues_query, |
| 489 | + args=(occurrence_query,), |
| 490 | + ): "occurrences", |
| 491 | + } |
487 | 492 | if include_uptime and uptime_query: |
488 | | - uptime_future = query_thread_pool.submit(_run_uptime_results_query, uptime_query) |
| 493 | + queries[ |
| 494 | + query_thread_pool.apply_async(_run_uptime_results_query, args=(uptime_query,)) |
| 495 | + ] = "uptime" |
| 496 | + |
| 497 | + TRACE_QUERY_TIMEOUT = options.get("performance.traces.endpoint.query-timeout") |
| 498 | + results = {} |
| 499 | + for future in queries: |
| 500 | + label = queries[future] |
| 501 | + try: |
| 502 | + results[label] = future.get(timeout=TRACE_QUERY_TIMEOUT) |
| 503 | + except TimeoutError: |
| 504 | + logger.warning( |
| 505 | + "Query timed out", |
| 506 | + extra={ |
| 507 | + "label": label, |
| 508 | + "timeout": TRACE_QUERY_TIMEOUT, |
| 509 | + }, |
| 510 | + ) |
| 511 | + |
| 512 | + spans_data = results.get("spans", []) |
| 513 | + errors_data = results.get("errors", []) |
| 514 | + occurrence_data = results.get("occurrences", []) |
| 515 | + uptime_data = results.get("uptime", []) |
489 | 516 |
|
490 | | - spans_data = spans_future.result() |
491 | | - errors_data = errors_future.result() |
492 | | - occurrence_data = occurrence_future.result() |
493 | | - uptime_data = uptime_future.result() if uptime_future else [] |
494 | 517 | result: list[dict[str, Any]] = [] |
495 | 518 | root_span: dict[str, Any] | None = None |
496 | 519 |
|
|
0 commit comments