Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-testing.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ tomli;python_version<"3.11" # Only needed for pytest on Python < 3.11
pytest-cov
pytest-forked
pytest-localserver
pytest-timeout
pytest-watch
jsonschema
executing
Expand Down
39 changes: 31 additions & 8 deletions sentry_sdk/_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

Expand Down Expand Up @@ -70,23 +71,40 @@ def _ensure_thread(self) -> bool:
return True

def _flush_loop(self) -> None:
# Mark the flush-loop thread as active for its entire lifetime so
# that any re-entrant add() triggered by GC warnings during wait(),
# flush(), or Event operations is silently dropped instead of
# deadlocking on internal locks.
self._active.flag = True
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()

def add(self, item: "T") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().
if getattr(self._active, "flag", False):
return None

with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
self._active.flag = True
try:
if not self._ensure_thread() or self._flusher is None:
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
with self._lock:
if len(self._buffer) >= self.MAX_BEFORE_DROP:
self._record_lost(item)
return None

self._buffer.append(item)
if len(self._buffer) >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
finally:
self._active.flag = False

def kill(self) -> None:
if self._flusher is None:
Expand All @@ -97,7 +115,12 @@ def kill(self) -> None:
self._flusher = None

def flush(self) -> None:
self._flush()
was_active = getattr(self._active, "flag", False)
self._active.flag = True
try:
self._flush()
finally:
self._active.flag = was_active

def _add_to_envelope(self, envelope: "Envelope") -> None:
envelope.add_item(
Expand Down
52 changes: 33 additions & 19 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,50 @@ def __init__(
self._record_lost_func = record_lost_func
self._running = True
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
# a GC-emitted warning that routes back through the logging
# integration into add().
if getattr(self._active, "flag", False):
return None

with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)
self._active.flag = True

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return
try:
if not self._ensure_thread() or self._flusher is None:
return None

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
with self._lock:
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
data_category="span",
quantity=1,
)
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return
finally:
self._active.flag = False

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
Expand Down
36 changes: 36 additions & 0 deletions tests/test_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -783,3 +783,39 @@ def before_send_log(log, _):
)

get_client().flush()


@minimum_python_37
@pytest.mark.timeout(5)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will actually make the test fail after 5s if it gets deadlocked. Tested on master.

def test_reentrant_add_does_not_deadlock(sentry_init, capture_envelopes):
"""Adding to the batcher from within a flush must not deadlock.

This covers the scenario where GC emits a ResourceWarning during
_add_to_envelope (or _flush_event.wait/set), and the warning is
routed through the logging integration back into batcher.add().
See https://github.com/getsentry/sentry-python/issues/5681
"""
sentry_init(enable_logs=True)
capture_envelopes()

client = sentry_sdk.get_client()
batcher = client.log_batcher

reentrant_add_called = False
original_add_to_envelope = batcher._add_to_envelope

def add_to_envelope_with_reentrant_add(envelope):
nonlocal reentrant_add_called
# Simulate a GC warning routing back into add() during flush
batcher.add({"fake": "log"})
reentrant_add_called = True
original_add_to_envelope(envelope)

batcher._add_to_envelope = add_to_envelope_with_reentrant_add

sentry_sdk.logger.warning("test log")
client.flush()

assert reentrant_add_called
# If the re-entrancy guard didn't work, this test would hang and it'd
# eventually be timed out by pytest-timeout
Loading