Skip to content

Commit 12da4cd

Browse files
merge master
2 parents fe93599 + bb1451b commit 12da4cd

7 files changed

Lines changed: 99 additions & 89 deletions

File tree

sentry_sdk/consts.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,8 @@ class OP:
11801180
COHERE_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.cohere"
11811181
COHERE_EMBEDDINGS_CREATE = "ai.embeddings.create.cohere"
11821182
DB = "db"
1183+
DB_CURSOR_ITERATOR = "db.cursor.iter"
1184+
DB_CURSOR_FETCH = "db.cursor.fetch"
11831185
DB_REDIS = "db.redis"
11841186
EVENT_DJANGO = "event.django"
11851187
FUNCTION = "function"

sentry_sdk/integrations/asyncpg.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020
)
2121

2222
try:
23-
import asyncpg # type: ignore[import-not-found]
24-
from asyncpg.cursor import BaseCursor # type: ignore
23+
import asyncpg # type: ignore
24+
from asyncpg.cursor import ( # type: ignore
25+
BaseCursor,
26+
Cursor,
27+
CursorIterator,
28+
)
2529

2630
except ImportError:
2731
raise DidNotEnable("asyncpg not installed.")
@@ -169,6 +173,13 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
169173
return await f(*args, **kwargs)
170174

171175
cursor = args[0]
176+
if type(cursor) is CursorIterator:
177+
span_op_override_value = OP.DB_CURSOR_ITERATOR
178+
elif type(cursor) is Cursor:
179+
span_op_override_value = OP.DB_CURSOR_FETCH
180+
else:
181+
span_op_override_value = None
182+
172183
query = _normalize_query(cursor._query)
173184
with record_sql_queries(
174185
cursor=cursor,
@@ -178,6 +189,7 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
178189
executemany=False,
179190
record_cursor_repr=True,
180191
span_origin=AsyncPGIntegration.origin,
192+
span_op_override_value=span_op_override_value,
181193
) as span:
182194
_set_db_data(span, cursor._connection)
183195
res = await f(*args, **kwargs)

sentry_sdk/integrations/langchain.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ class LangchainIntegration(Integration):
210210
identifier = "langchain"
211211
origin = f"auto.ai.{identifier}"
212212

213+
_ignored_exceptions: "set[type[Exception]]" = set()
214+
213215
def __init__(
214216
self: "LangchainIntegration",
215217
include_prompts: bool = True,
@@ -262,17 +264,22 @@ def gc_span_map(self) -> None:
262264
self._exit_span(span, run_id)
263265

264266
def _handle_error(self, run_id: "UUID", error: "Any") -> None:
267+
is_ignored = isinstance(error, tuple(LangchainIntegration._ignored_exceptions))
268+
265269
with capture_internal_exceptions():
266270
if not run_id or run_id not in self.span_map:
267271
return
268272

269273
span = self.span_map[run_id]
270274

271-
sentry_sdk.capture_exception(
272-
error, span._scope if isinstance(span, StreamedSpan) else span.scope
273-
)
275+
if is_ignored:
276+
span.__exit__(None, None, None)
277+
else:
278+
sentry_sdk.capture_exception(
279+
error, span._scope if isinstance(span, StreamedSpan) else span.scope
280+
)
281+
span.__exit__(type(error), error, error.__traceback__)
274282

275-
span.__exit__(type(error), error, error.__traceback__)
276283
del self.span_map[run_id]
277284

278285
def _normalize_langchain_message(self, message: "BaseMessage") -> "Any":

sentry_sdk/integrations/langgraph.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
)
1111
from sentry_sdk.consts import OP, SPANDATA
1212
from sentry_sdk.integrations import DidNotEnable, Integration
13+
14+
# This is fine because langgraph depends on langchain-base, and LangchainIntegration only imports from langchain-base.
15+
from sentry_sdk.integrations.langchain import LangchainIntegration
1316
from sentry_sdk.scope import should_send_default_pii
1417
from sentry_sdk.traces import StreamedSpan
1518
from sentry_sdk.tracing_utils import (
@@ -19,6 +22,7 @@
1922
from sentry_sdk.utils import safe_serialize
2023

2124
try:
25+
from langgraph.errors import GraphBubbleUp
2226
from langgraph.graph import StateGraph
2327
from langgraph.pregel import Pregel
2428
except ImportError:
@@ -34,6 +38,7 @@ def __init__(self: "LanggraphIntegration", include_prompts: bool = True) -> None
3438

3539
@staticmethod
3640
def setup_once() -> None:
41+
LangchainIntegration._ignored_exceptions.add(GraphBubbleUp)
3742
# LangGraph lets users create agents using a StateGraph or the Functional API.
3843
# StateGraphs are then compiled to a CompiledStateGraph. Both CompiledStateGraph and
3944
# the functional API execute on a Pregel instance. Pregel is the runtime for the graph

sentry_sdk/tracing_utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ def record_sql_queries(
134134
executemany: bool,
135135
record_cursor_repr: bool = False,
136136
span_origin: str = "manual",
137+
span_op_override_value: "Optional[str]" = None,
137138
) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]":
138139
# TODO: Bring back capturing of params by default
139140
client = sentry_sdk.get_client()
@@ -167,13 +168,15 @@ def record_sql_queries(
167168
name="<unknown SQL query>" if query is None else query,
168169
attributes={
169170
"sentry.origin": span_origin,
170-
"sentry.op": OP.DB,
171+
"sentry.op": span_op_override_value
172+
if span_op_override_value
173+
else OP.DB,
171174
},
172175
) as span:
173176
yield span
174177
else:
175178
with sentry_sdk.start_span(
176-
op=OP.DB,
179+
op=span_op_override_value if span_op_override_value is not None else OP.DB,
177180
name=query,
178181
origin=span_origin,
179182
) as span:

tests/integrations/asyncpg/test_asyncpg.py

Lines changed: 35 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import sentry_sdk
2323
from sentry_sdk import capture_message, start_transaction
24-
from sentry_sdk.consts import SPANDATA
24+
from sentry_sdk.consts import OP, SPANDATA
2525
from sentry_sdk.integrations.asyncpg import AsyncPGIntegration
2626
from sentry_sdk.tracing_utils import record_sql_queries
2727
from tests.conftest import ApproxDict
@@ -1361,75 +1361,57 @@ async def test_query_source_prepare(
13611361

13621362
@pytest.mark.asyncio
13631363
@pytest.mark.parametrize("span_streaming", [True, False])
1364-
async def test_cursor__bind_exec_creates_spans(
1364+
async def test_cursor_iteration_creates_db_cursor_iter_spans(
13651365
sentry_init, capture_events, capture_items, span_streaming
13661366
) -> None:
13671367
"""
1368-
Exercises the bind_exec patch through the iterator that's created in asyncpg when "for record in conn.cursor" is called.
1369-
See https://github.com/MagicStack/asyncpg/blob/db8ecc2a38e16fb0c090aef6f5506547c2831c24/asyncpg/cursor.py#L234
1368+
Regression test for https://github.com/getsentry/sentry-python/issues/6576
1369+
1370+
When iterating a server-side cursor with a small prefetch, asyncpg fetches
1371+
rows in batches. Each batch triggers BaseCursor._bind_exec (on first query) and
1372+
BaseCursor._exec (second query onwards) through CursorIterator.__anext__, which creates a
1373+
span with the same query description. The resulting burst of identical spans
1374+
causes Sentry's N+1 query detector to raise a false positive.
1375+
1376+
To mitigate, we set the "op"/"sentry.op" to `db.cursor.iter` instead of `db`
1377+
so that the sentry backend can exclude these spans from n+1 detection.
13701378
"""
13711379
sentry_init(
13721380
integrations=[AsyncPGIntegration()],
13731381
traces_sample_rate=1.0,
1374-
enable_db_query_source=True,
1375-
db_query_source_threshold_ms=0,
13761382
_experiments={
13771383
"trace_lifecycle": "stream" if span_streaming else "static",
13781384
},
13791385
)
13801386

13811387
if span_streaming:
13821388
items = capture_items("span")
1389+
13831390
with sentry_sdk.traces.start_span(name="test_segment"):
13841391
conn: Connection = await connect(PG_CONNECTION_URI)
13851392

13861393
await conn.executemany(
13871394
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1388-
[
1389-
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
1390-
("Alice", "pw", datetime.date(1990, 12, 25)),
1391-
],
1395+
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
13921396
)
13931397

13941398
async with conn.transaction():
1395-
async for record in conn.cursor(
1396-
"SELECT * FROM users WHERE dob > $1",
1397-
datetime.date(1970, 1, 1),
1398-
):
1399+
async for _record in conn.cursor("SELECT * FROM users", prefetch=5):
13991400
pass
14001401

14011402
await conn.close()
1402-
sentry_sdk.flush()
14031403

1404-
spans = [item.payload for item in items]
1405-
1406-
assert len(spans) == 6
1407-
1408-
connect_span = spans[0]
1409-
executemany_span = spans[1]
1410-
begin_span = spans[2]
1411-
bind_exec_span = spans[3]
1412-
commit_span = spans[4]
1413-
segment = spans[5]
1404+
sentry_sdk.flush()
14141405

1415-
assert connect_span["name"] == "connect"
1416-
assert (
1417-
executemany_span["name"]
1418-
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
1419-
)
1420-
assert begin_span["name"] == "BEGIN;"
1421-
assert bind_exec_span["name"] == "SELECT * FROM users WHERE dob > $1"
1422-
assert commit_span["name"] == "COMMIT;"
1423-
assert segment["name"] == "test_segment"
1406+
cursor_iter_spans = [
1407+
item.payload
1408+
for item in items
1409+
if item.payload.get("name") == "SELECT * FROM users"
1410+
]
14241411

1425-
assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg"
1426-
assert bind_exec_span["attributes"]["sentry.op"] == "db"
1427-
assert bind_exec_span["attributes"]["db.system.name"] == "postgresql"
1428-
assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg"
1429-
assert bind_exec_span["attributes"]["server.address"] == PG_HOST
1430-
assert bind_exec_span["attributes"]["server.port"] == PG_PORT
1431-
assert bind_exec_span["attributes"]["db.namespace"] == PG_NAME
1432-
assert bind_exec_span["attributes"]["db.user"] == PG_USER
1412+
assert len(cursor_iter_spans) == 5
1413+
for span in cursor_iter_spans:
1414+
assert span["attributes"]["sentry.op"] == OP.DB_CURSOR_ITERATOR
14331415
else:
14341416
events = capture_events()
14351417

@@ -1438,57 +1420,28 @@ async def test_cursor__bind_exec_creates_spans(
14381420

14391421
await conn.executemany(
14401422
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1441-
[
1442-
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
1443-
("Alice", "pw", datetime.date(1990, 12, 25)),
1444-
],
1423+
[(f"user-{i}", "pw", datetime.date(1990, 1, 1)) for i in range(20)],
14451424
)
14461425

14471426
async with conn.transaction():
1448-
async for record in conn.cursor(
1449-
"SELECT * FROM users WHERE dob > $1",
1450-
datetime.date(1970, 1, 1),
1451-
):
1427+
async for _record in conn.cursor("SELECT * FROM users", prefetch=5):
14521428
pass
14531429

14541430
await conn.close()
14551431

14561432
(event,) = events
14571433

1458-
assert len(event["spans"]) == 5
1459-
1460-
connect_span = event["spans"][0]
1461-
executemany_span = event["spans"][1]
1462-
begin_span = event["spans"][2]
1463-
bind_exec_span = event["spans"][3]
1464-
commit_span = event["spans"][4]
1434+
cursor_iter_spans = [
1435+
s for s in event["spans"] if s.get("description") == "SELECT * FROM users"
1436+
]
14651437

1466-
assert connect_span["description"] == "connect"
1467-
assert (
1468-
executemany_span["description"]
1469-
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
1470-
)
1471-
assert begin_span["description"] == "BEGIN;"
1472-
assert bind_exec_span["description"] == "SELECT * FROM users WHERE dob > $1"
1473-
assert commit_span["description"] == "COMMIT;"
1474-
1475-
assert bind_exec_span["origin"] == "auto.db.asyncpg"
1476-
assert bind_exec_span["data"]["db.system"] == "postgresql"
1477-
assert bind_exec_span["data"]["db.driver.name"] == "asyncpg"
1478-
assert bind_exec_span["data"]["server.address"] == PG_HOST
1479-
assert bind_exec_span["data"]["server.port"] == PG_PORT
1480-
assert bind_exec_span["data"]["db.name"] == PG_NAME
1481-
assert bind_exec_span["data"]["db.user"] == PG_USER
1482-
1483-
_assert_query_source(
1484-
bind_exec_span,
1485-
span_streaming,
1486-
"test_cursor__bind_exec_creates_spans",
1487-
)
1438+
assert len(cursor_iter_spans) == 5
1439+
for span in cursor_iter_spans:
1440+
assert span["op"] == OP.DB_CURSOR_ITERATOR
14881441

14891442

14901443
@pytest.mark.asyncio
1491-
async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) -> None:
1444+
async def test_cursor_fetch_methods_create_spans(sentry_init, capture_events) -> None:
14921445
sentry_init(
14931446
integrations=[AsyncPGIntegration()],
14941447
traces_sample_rate=1.0,
@@ -1543,9 +1496,10 @@ async def test_cursor__exec_methods_create_spans(sentry_init, capture_events) ->
15431496
assert span["data"]["db.cursor"] is not None
15441497
assert span["data"]["db.system"] == "postgresql"
15451498
assert span["data"]["db.driver.name"] == "asyncpg"
1499+
assert span["op"] == OP.DB_CURSOR_FETCH
15461500
assert span["origin"] == "auto.db.asyncpg"
15471501
_assert_query_source(
15481502
span,
15491503
False,
1550-
"test_cursor__exec_methods_create_spans",
1504+
"test_cursor_fetch_methods_create_spans",
15511505
)

tests/integrations/langgraph/test_langgraph.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
from unittest.mock import MagicMock, patch
44

55
import pytest
6+
from langchain_core.language_models.chat_models import BaseChatModel
7+
from langchain_core.messages import HumanMessage
8+
from langchain_core.outputs import ChatResult
9+
from langgraph.errors import GraphBubbleUp
610

711
import sentry_sdk
812
from sentry_sdk import start_transaction
@@ -125,6 +129,15 @@ async def ainvoke(self, state, config=None):
125129
return {"messages": [MockMessage("Async Pregel response")]}
126130

127131

132+
class InterruptingChatModel(BaseChatModel):
133+
@property
134+
def _llm_type(self) -> str:
135+
return "interrupting-chat-model"
136+
137+
def _generate(self, messages, stop=None, run_manager=None, **kwargs) -> ChatResult:
138+
raise GraphBubbleUp("interrupt")
139+
140+
128141
def test_langgraph_integration_init():
129142
"""Test LanggraphIntegration initialization with different parameters."""
130143
integration = LanggraphIntegration()
@@ -2104,3 +2117,17 @@ def original_invoke(self, *args, **kwargs):
21042117
assert len(parsed_messages) == 1
21052118
assert "small message 5" in str(parsed_messages[0])
21062119
assert tx["_meta"]["spans"]["0"]["data"]["gen_ai.request.messages"][""]["len"] == 5
2120+
2121+
2122+
def test_graph_bubble_up_ignored(sentry_init, capture_items):
2123+
sentry_init(
2124+
integrations=[LanggraphIntegration()],
2125+
)
2126+
2127+
events = capture_items("event")
2128+
2129+
model = InterruptingChatModel()
2130+
with pytest.raises(GraphBubbleUp):
2131+
model.invoke([HumanMessage(content="hi")])
2132+
2133+
assert len(events) == 0

0 commit comments

Comments
 (0)