Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions sentry_sdk/_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
from datetime import datetime, timezone
from typing import TYPE_CHECKING, TypeVar, Generic
import weakref

from sentry_sdk.utils import format_timestamp
from sentry_sdk.envelope import Envelope, Item, PayloadRef
Expand Down Expand Up @@ -37,6 +38,23 @@ def __init__(

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None
self._reset_thread_state()

# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
if hasattr(os, "register_at_fork"):
weak_reset = weakref.WeakMethod(self._reset_thread_state)

def _reset_in_child() -> None:
method = weak_reset()
if method is not None:
method()

os.register_at_fork(after_in_child=_reset_in_child)

def _reset_thread_state(self) -> None:
self._flusher = None
self._lock = threading.Lock()
self._flusher_pid = None
Comment thread
ericapisani marked this conversation as resolved.
Comment thread
sentry[bot] marked this conversation as resolved.

def _ensure_thread(self) -> bool:
"""For forking processes we might need to restart this thread.
Expand Down
26 changes: 23 additions & 3 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import threading
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING
import os
import threading
from typing import TYPE_CHECKING, Optional
import weakref

from sentry_sdk._batcher import Batcher
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute

if TYPE_CHECKING:
from typing import Any, Callable, Optional
from typing import Any, Callable
from sentry_sdk.traces import StreamedSpan


Expand Down Expand Up @@ -50,6 +52,24 @@ def __init__(
self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

self._reset_thread_state()

# See https://github.com/getsentry/sentry-python/blob/051cc01640a29bfd64b1f1e2e3414c02f027dd1b/sentry_sdk/monitor.py#L41-L50
if hasattr(os, "register_at_fork"):
weak_reset = weakref.WeakMethod(self._reset_thread_state)

def _reset_in_child() -> None:
method = weak_reset()
if method is not None:
method()

os.register_at_fork(after_in_child=_reset_in_child)
Comment thread
alexander-alderman-webb marked this conversation as resolved.

def _reset_thread_state(self) -> None:
self._flusher = None
self._lock = threading.Lock()
self._flusher_pid = None
Comment thread
ericapisani marked this conversation as resolved.

def add(self, span: "StreamedSpan") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
Expand Down
38 changes: 38 additions & 0 deletions tests/test_logs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import os
import sys
import time
from typing import List, Any, Mapping, Union
Expand Down Expand Up @@ -819,3 +820,40 @@ def add_to_envelope_with_reentrant_add(envelope):
assert reentrant_add_called
# If the re-entrancy guard didn't work, this test would hang and it'd
# eventually be timed out by pytest-timeout


@pytest.mark.skipif(
sys.platform == "win32"
or not hasattr(os, "fork")
or not hasattr(os, "register_at_fork"),
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
)
def test_log_batcher_lock_reset_in_child_after_fork(sentry_init):
"""Regression test for the LogBatcher fork-deadlock fix.

If os.fork() runs while another thread holds LogBatcher._lock, the
child inherits the lock locked. The holding thread does not exist in
the child, so the lock can never be released and _ensure_thread
deadlocks forever. The after-fork hook must replace the lock with a
fresh one in the child and reset _flusher / _flusher_pid.
"""
sentry_init(enable_logs=True)
batcher = sentry_sdk.get_client().log_batcher
assert batcher is not None

original_lock = batcher._lock
original_lock.acquire()
pid = os.fork()
if pid == 0:
# Child: was the lock object replaced and is the new one not
# held? Without the fix, _lock is `original_lock` inherited
# locked, so `replaced` is False. blocking=False guarantees the
# child can't hang on a regression.
replaced = batcher._lock is not original_lock
unheld = batcher._lock.acquire(blocking=False)
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
os._exit(0 if replaced and unheld and flusher_reset else 1)

original_lock.release()
_, status = os.waitpid(pid, 0)
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
41 changes: 41 additions & 0 deletions tests/test_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import sys
from typing import List
from unittest import mock

import pytest

import sentry_sdk
from sentry_sdk import get_client
Expand Down Expand Up @@ -512,3 +515,41 @@ def before_send_metric(metric, _):
)

get_client().flush()


@pytest.mark.skipif(
sys.platform == "win32"
or not hasattr(os, "fork")
or not hasattr(os, "register_at_fork"),
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
)
def test_metrics_batcher_lock_reset_in_child_after_fork(sentry_init):
"""Regression test for the MetricsBatcher fork-deadlock fix.

If os.fork() runs while another thread holds MetricsBatcher._lock,
the child inherits the lock locked. The holding thread does not
exist in the child, so the lock can never be released and
_ensure_thread deadlocks forever. The after-fork hook must replace
the lock with a fresh one in the child and reset
_flusher / _flusher_pid.
"""
sentry_init()
batcher = sentry_sdk.get_client().metrics_batcher
assert batcher is not None

original_lock = batcher._lock
original_lock.acquire()
pid = os.fork()
if pid == 0:
# Child: was the lock object replaced and is the new one not
# held? Without the fix, _lock is `original_lock` inherited
# locked, so `replaced` is False. blocking=False guarantees the
# child can't hang on a regression.
replaced = batcher._lock is not original_lock
unheld = batcher._lock.acquire(blocking=False)
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
os._exit(0 if replaced and unheld and flusher_reset else 1)

original_lock.release()
_, status = os.waitpid(pid, 0)
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
41 changes: 41 additions & 0 deletions tests/tracing/test_span_streaming.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
import re
import sys
import time
Expand Down Expand Up @@ -1554,3 +1555,43 @@ def test_transport_format(sentry_init, capture_envelopes):
}
]
}


@pytest.mark.skipif(
sys.platform == "win32"
or not hasattr(os, "fork")
or not hasattr(os, "register_at_fork"),
reason="requires POSIX fork and os.register_at_fork (Python 3.7+)",
)
def test_span_batcher_lock_reset_in_child_after_fork(sentry_init):
"""Regression test for the SpanBatcher fork-deadlock fix.

If os.fork() runs while another thread holds SpanBatcher._lock, the
child inherits the lock locked. The holding thread does not exist in
the child, so the lock can never be released and _ensure_thread
deadlocks forever. The after-fork hook must replace the lock with a
fresh one in the child and reset _flusher / _flusher_pid.
"""
sentry_init(
traces_sample_rate=1.0,
_experiments={"trace_lifecycle": "stream"},
)
batcher = sentry_sdk.get_client().span_batcher
assert batcher is not None

original_lock = batcher._lock
original_lock.acquire()
pid = os.fork()
if pid == 0:
# Child: was the lock object replaced and is the new one not
# held? Without the fix, _lock is `original_lock` inherited
# locked, so `replaced` is False. blocking=False guarantees the
# child can't hang on a regression.
replaced = batcher._lock is not original_lock
unheld = batcher._lock.acquire(blocking=False)
flusher_reset = batcher._flusher is None and batcher._flusher_pid is None
os._exit(0 if replaced and unheld and flusher_reset else 1)

original_lock.release()
_, status = os.waitpid(pid, 0)
assert os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
Loading