Skip to content

Commit 4bf5dac

Browse files
committed
fix(batcher): Reset lock and flusher in child after fork
Uses the same fix introduced in #6148 to prevent deadlocks in the monitor when os.fork() is called while another thread holds the monitor's lock. If os.fork() runs while another thread holds Batcher._lock, the child inherits the lock locked but the holding thread does not exist in the child, so the lock can never be released and _ensure_thread deadlocks forever. Register an after-fork hook via os.register_at_fork that replaces _lock with a fresh lock and resets _flusher / _flusher_pid in the child. Use a weakref to the batcher so the hook does not keep the instance alive. Move shared init out of SpanBatcher into the base Batcher.__init__ so all batchers (log, metrics, span) get the fork-safety hook from a single place. Fixes PY-2391 Fixes #6149
1 parent 9c1d475 commit 4bf5dac

5 files changed

Lines changed: 141 additions & 12 deletions

File tree

sentry_sdk/_batcher.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import threading
44
from datetime import datetime, timezone
55
from typing import TYPE_CHECKING, TypeVar, Generic
6+
import weakref
67

78
from sentry_sdk.utils import format_timestamp
89
from sentry_sdk.envelope import Envelope, Item, PayloadRef
@@ -38,6 +39,24 @@ def __init__(
3839
self._flusher: "Optional[threading.Thread]" = None
3940
self._flusher_pid: "Optional[int]" = None
4041

42+
self_ref = weakref.ref(self)
43+
44+
def _reset_thread_state() -> None:
45+
batcher = self_ref()
46+
47+
if batcher is not None:
48+
batcher._flusher = None
49+
batcher._lock = threading.Lock()
50+
batcher._flusher_pid = None
51+
52+
# Same as https://github.com/getsentry/sentry-python/issues/6148.
53+
# If os.fork() runs while another thread holds self._lock,
54+
# the child inherits the lock locked but the holding thread does
55+
# not exist in the child, so the lock can never be released and
56+
# _ensure_thread deadlocks forever.
57+
if hasattr(os, "register_at_fork"):
58+
os.register_at_fork(after_in_child=_reset_thread_state)
59+
4160
def _ensure_thread(self) -> bool:
4261
"""For forking processes we might need to restart this thread.
4362
This ensures that our process actually has that thread running.

sentry_sdk/_span_batcher.py

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import threading
21
from collections import defaultdict
32
from datetime import datetime, timezone
43
from typing import TYPE_CHECKING
@@ -8,7 +7,7 @@
87
from sentry_sdk.utils import format_timestamp, serialize_attribute
98

109
if TYPE_CHECKING:
11-
from typing import Any, Callable, Optional
10+
from typing import Any, Callable
1211
from sentry_sdk.traces import StreamedSpan
1312

1413

@@ -32,23 +31,14 @@ def __init__(
3231
capture_func: "Callable[[Envelope], None]",
3332
record_lost_func: "Callable[..., None]",
3433
) -> None:
34+
super().__init__(capture_func, record_lost_func)
3535
# Spans from different traces cannot be emitted in the same envelope
3636
# since the envelope contains a shared trace header. That's why we bucket
3737
# by trace_id, so that we can then send the buckets each in its own
3838
# envelope.
3939
# trace_id -> span buffer
4040
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
4141
self._running_size: dict[str, int] = defaultdict(lambda: 0)
42-
self._capture_func = capture_func
43-
self._record_lost_func = record_lost_func
44-
self._running = True
45-
self._lock = threading.Lock()
46-
self._active: "threading.local" = threading.local()
47-
48-
self._flush_event: "threading.Event" = threading.Event()
49-
50-
self._flusher: "Optional[threading.Thread]" = None
51-
self._flusher_pid: "Optional[int]" = None
5242

5343
def add(self, span: "StreamedSpan") -> None:
5444
# Bail out if the current thread is already executing batcher code.

tests/test_logs.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import logging
3+
import os
34
import sys
45
import time
56
from typing import List, Any, Mapping, Union
@@ -819,3 +820,40 @@ def add_to_envelope_with_reentrant_add(envelope):
819820
assert reentrant_add_called
820821
# If the re-entrancy guard didn't work, this test would hang and it'd
821822
# eventually be timed out by pytest-timeout
823+
824+
825+
@pytest.mark.skipif(
826+
sys.platform == "win32"
827+
or not hasattr(os, "fork")
828+
or not hasattr(os, "register_at_fork"),
829+
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
830+
)
831+
def test_log_batcher_lock_reset_in_child_after_fork(sentry_init):
832+
"""Regression test for the LogBatcher fork-deadlock fix.
833+
834+
If os.fork() runs while another thread holds LogBatcher._lock, the
835+
child inherits the lock locked. The holding thread does not exist in
836+
the child, so the lock can never be released and _ensure_thread
837+
deadlocks forever. The after-fork hook must replace the lock with a
838+
fresh one in the child and reset _flusher / _flusher_pid.
839+
"""
840+
sentry_init(enable_logs=True)
841+
batcher = sentry_sdk.get_client().log_batcher
842+
assert batcher is not None
843+
844+
original_lock = batcher._lock
845+
original_lock.acquire()
846+
pid = os.fork()
847+
if pid == 0:
848+
# Child: was the lock object replaced and is the new one not
849+
# held? Without the fix, _lock is `original_lock` inherited
850+
# locked, so `replaced` is False. blocking=False guarantees the
851+
# child can't hang on a regression.
852+
replaced = batcher._lock is not original_lock
853+
unheld = batcher._lock.acquire(blocking=False)
854+
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
855+
os._exit(0 if replaced and unheld and flusher_reset else 1)
856+
857+
original_lock.release()
858+
_, status = os.waitpid(pid, 0)
859+
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0

tests/test_metrics.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import os
2+
import sys
13
from typing import List
24
from unittest import mock
35

6+
import pytest
47

58
import sentry_sdk
69
from sentry_sdk import get_client
@@ -512,3 +515,41 @@ def before_send_metric(metric, _):
512515
)
513516

514517
get_client().flush()
518+
519+
520+
@pytest.mark.skipif(
521+
sys.platform == "win32"
522+
or not hasattr(os, "fork")
523+
or not hasattr(os, "register_at_fork"),
524+
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
525+
)
526+
def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init):
527+
"""Regression test for the MetricsBatcher fork-deadlock fix.
528+
529+
If os.fork() runs while another thread holds MetricsBatcher._lock,
530+
the child inherits the lock locked. The holding thread does not
531+
exist in the child, so the lock can never be released and
532+
_ensure_thread deadlocks forever. The after-fork hook must replace
533+
the lock with a fresh one in the child and reset
534+
_flusher / _flusher_pid.
535+
"""
536+
sentry_init()
537+
batcher = sentry_sdk.get_client().metrics_batcher
538+
assert batcher is not None
539+
540+
original_lock = batcher._lock
541+
original_lock.acquire()
542+
pid = os.fork()
543+
if pid == 0:
544+
# Child: was the lock object replaced and is the new one not
545+
# held? Without the fix, _lock is `original_lock` inherited
546+
# locked, so `replaced` is False. blocking=False guarantees the
547+
# child can't hang on a regression.
548+
replaced = batcher._lock is not original_lock
549+
unheld = batcher._lock.acquire(blocking=False)
550+
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
551+
os._exit(0 if replaced and unheld and flusher_reset else 1)
552+
553+
original_lock.release()
554+
_, status = os.waitpid(pid, 0)
555+
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0

tests/tracing/test_span_streaming.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import os
23
import re
34
import sys
45
import time
@@ -1593,3 +1594,43 @@ def test_transport_format(sentry_init, capture_envelopes):
15931594
assert "value" in value
15941595
assert "type" in value
15951596
assert value["type"] in ("string", "boolean", "integer", "double", "array")
1597+
1598+
1599+
@pytest.mark.skipif(
1600+
sys.platform == "win32"
1601+
or not hasattr(os, "fork")
1602+
or not hasattr(os, "register_at_fork"),
1603+
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
1604+
)
1605+
def test_span_batcher_lock_reset_in_child_after_fork(sentry_init):
1606+
"""Regression test for the SpanBatcher fork-deadlock fix.
1607+
1608+
If os.fork() runs while another thread holds SpanBatcher._lock, the
1609+
child inherits the lock locked. The holding thread does not exist in
1610+
the child, so the lock can never be released and _ensure_thread
1611+
deadlocks forever. The after-fork hook must replace the lock with a
1612+
fresh one in the child and reset _flusher / _flusher_pid.
1613+
"""
1614+
sentry_init(
1615+
traces_sample_rate=1.0,
1616+
_experiments={"trace_lifecycle": "stream"},
1617+
)
1618+
batcher = sentry_sdk.get_client().span_batcher
1619+
assert batcher is not None
1620+
1621+
original_lock = batcher._lock
1622+
original_lock.acquire()
1623+
pid = os.fork()
1624+
if pid == 0:
1625+
# Child: was the lock object replaced and is the new one not
1626+
# held? Without the fix, _lock is `original_lock` inherited
1627+
# locked, so `replaced` is False. blocking=False guarantees the
1628+
# child can't hang on a regression.
1629+
replaced = batcher._lock is not original_lock
1630+
unheld = batcher._lock.acquire(blocking=False)
1631+
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
1632+
os._exit(0 if replaced and unheld and flusher_reset else 1)
1633+
1634+
original_lock.release()
1635+
_, status = os.waitpid(pid, 0)
1636+
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0

0 commit comments

Comments
 (0)