Skip to content

Commit 0e47ead

Browse files
feat(asyncpg): Add cursor span support via BaseCursor method patching (#6252)
Patch `BaseCursor._bind_exec`, `_bind`, and `_exec` instead of wrapping `Connection.cursor` creation. The old approach only captured cursor creation, not actual query execution. The new approach creates spans for each cursor operation (iteration via `_bind_exec`, explicit cursor usage via `_bind`/`_exec`) and supports both static and streaming span lifecycles. Builds on #6215 which added streaming span support to the asyncpg integration. Removes the patch on `asyncpg.Connection.cursor` because the patches on `BaseCursor` introduced in this pull request will create the correct span when the cursor is used. Fixes PY-2408 Fixes #6240 --------- Co-authored-by: Ivana Kellyer <[email protected]>
1 parent 17cc8c7 commit 0e47ead

2 files changed

Lines changed: 221 additions & 15 deletions

File tree

sentry_sdk/integrations/asyncpg.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
)
1717
from sentry_sdk.utils import (
1818
capture_internal_exceptions,
19-
ensure_integration_enabled,
2019
parse_version,
2120
)
2221

@@ -52,8 +51,12 @@ def setup_once() -> None:
5251
asyncpg.Connection._executemany = _wrap_connection_method(
5352
asyncpg.Connection._executemany, executemany=True
5453
)
55-
asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
5654
asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)
55+
56+
BaseCursor._bind_exec = _wrap_cursor_method(BaseCursor._bind_exec)
57+
BaseCursor._bind = _wrap_cursor_method(BaseCursor._bind)
58+
BaseCursor._exec = _wrap_cursor_method(BaseCursor._exec)
59+
5760
asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
5861
asyncpg.connect_utils._connect_addr
5962
)
@@ -150,20 +153,26 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
150153
return _inner
151154

152155

153-
def _wrap_cursor_creation(f: "Callable[..., T]") -> "Callable[..., T]":
154-
@ensure_integration_enabled(AsyncPGIntegration, f)
155-
def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807
156-
query = args[1]
157-
params_list = args[2] if len(args) > 2 else None
156+
def _wrap_cursor_method(
157+
f: "Callable[..., Awaitable[T]]",
158+
) -> "Callable[..., Awaitable[T]]":
159+
async def _inner(*args: "Any", **kwargs: "Any") -> "T":
160+
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
161+
return await f(*args, **kwargs)
158162

159-
with _record(
160-
None,
161-
query,
162-
params_list,
163+
cursor = args[0]
164+
query = _normalize_query(cursor._query)
165+
with record_sql_queries_supporting_streaming(
166+
cursor=cursor,
167+
query=query,
168+
params_list=None,
169+
paramstyle=None,
163170
executemany=False,
171+
record_cursor_repr=True,
172+
span_origin=AsyncPGIntegration.origin,
164173
) as span:
165-
_set_db_data(span, args[0])
166-
res = f(*args, **kwargs)
174+
_set_db_data(span, cursor._connection)
175+
res = await f(*args, **kwargs)
167176

168177
return res
169178

tests/integrations/asyncpg/test_asyncpg.py

Lines changed: 199 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ async def test_cursor(sentry_init, capture_events) -> None:
342342
{"category": "query", "data": {}, "message": "BEGIN;", "type": "default"},
343343
{
344344
"category": "query",
345-
"data": {},
345+
"data": {"db.cursor": mock.ANY},
346346
"message": "SELECT * FROM users WHERE dob > $1",
347347
"type": "default",
348348
},
@@ -400,7 +400,19 @@ async def test_cursor_manual(sentry_init, capture_events) -> None:
400400
{"category": "query", "data": {}, "message": "BEGIN;", "type": "default"},
401401
{
402402
"category": "query",
403-
"data": {},
403+
"data": {"db.cursor": mock.ANY},
404+
"message": "SELECT * FROM users WHERE dob > $1",
405+
"type": "default",
406+
},
407+
{
408+
"category": "query",
409+
"data": {"db.cursor": mock.ANY},
410+
"message": "SELECT * FROM users WHERE dob > $1",
411+
"type": "default",
412+
},
413+
{
414+
"category": "query",
415+
"data": {"db.cursor": mock.ANY},
404416
"message": "SELECT * FROM users WHERE dob > $1",
405417
"type": "default",
406418
},
@@ -1102,3 +1114,188 @@ def before_send_transaction(event, hint):
11021114

11031115
assert len(spans) == 1
11041116
assert spans[0]["description"] == "filtered"
1117+
1118+
1119+
@pytest.mark.asyncio
1120+
@pytest.mark.parametrize("span_streaming", [True, False])
1121+
async def test_cursor__bind_exec_creates_spans(
1122+
sentry_init, capture_events, capture_items, span_streaming
1123+
) -> None:
1124+
"""
1125+
Exercises the bind_exec patch through the iterator that's created in asyncpg when "for record in conn.cursor" is called.
1126+
See https://github.com/MagicStack/asyncpg/blob/db8ecc2a38e16fb0c090aef6f5506547c2831c24/asyncpg/cursor.py#L234
1127+
"""
1128+
sentry_init(
1129+
integrations=[AsyncPGIntegration()],
1130+
traces_sample_rate=1.0,
1131+
_experiments={
1132+
"trace_lifecycle": "stream" if span_streaming else "static",
1133+
},
1134+
)
1135+
1136+
if span_streaming:
1137+
items = capture_items("span")
1138+
with sentry_sdk.traces.start_span(name="test_transaction"):
1139+
conn: Connection = await connect(PG_CONNECTION_URI)
1140+
1141+
await conn.executemany(
1142+
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1143+
[
1144+
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
1145+
("Alice", "pw", datetime.date(1990, 12, 25)),
1146+
],
1147+
)
1148+
1149+
async with conn.transaction():
1150+
async for record in conn.cursor(
1151+
"SELECT * FROM users WHERE dob > $1",
1152+
datetime.date(1970, 1, 1),
1153+
):
1154+
pass
1155+
1156+
await conn.close()
1157+
sentry_sdk.flush()
1158+
1159+
spans = [item.payload for item in items]
1160+
1161+
assert len(spans) == 6
1162+
1163+
connect_span = spans[0]
1164+
executemany_span = spans[1]
1165+
begin_span = spans[2]
1166+
bind_exec_span = spans[3]
1167+
commit_span = spans[4]
1168+
segment = spans[5]
1169+
1170+
assert connect_span["name"] == "connect"
1171+
assert (
1172+
executemany_span["name"]
1173+
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
1174+
)
1175+
assert begin_span["name"] == "BEGIN;"
1176+
assert bind_exec_span["name"] == "SELECT * FROM users WHERE dob > $1"
1177+
assert commit_span["name"] == "COMMIT;"
1178+
assert segment["name"] == "test_transaction"
1179+
1180+
assert bind_exec_span["attributes"]["sentry.origin"] == "auto.db.asyncpg"
1181+
assert bind_exec_span["attributes"]["sentry.op"] == "db"
1182+
assert bind_exec_span["attributes"]["db.system"] == "postgresql"
1183+
assert bind_exec_span["attributes"]["db.driver.name"] == "asyncpg"
1184+
assert bind_exec_span["attributes"]["server.address"] == PG_HOST
1185+
assert bind_exec_span["attributes"]["server.port"] == PG_PORT
1186+
assert bind_exec_span["attributes"]["db.name"] == PG_NAME
1187+
assert bind_exec_span["attributes"]["db.user"] == PG_USER
1188+
else:
1189+
events = capture_events()
1190+
1191+
with start_transaction(name="test_transaction", sampled=True):
1192+
conn: Connection = await connect(PG_CONNECTION_URI)
1193+
1194+
await conn.executemany(
1195+
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1196+
[
1197+
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
1198+
("Alice", "pw", datetime.date(1990, 12, 25)),
1199+
],
1200+
)
1201+
1202+
async with conn.transaction():
1203+
async for record in conn.cursor(
1204+
"SELECT * FROM users WHERE dob > $1",
1205+
datetime.date(1970, 1, 1),
1206+
):
1207+
pass
1208+
1209+
await conn.close()
1210+
1211+
(event,) = events
1212+
1213+
assert len(event["spans"]) == 5
1214+
1215+
connect_span = event["spans"][0]
1216+
executemany_span = event["spans"][1]
1217+
begin_span = event["spans"][2]
1218+
bind_exec_span = event["spans"][3]
1219+
commit_span = event["spans"][4]
1220+
1221+
assert connect_span["description"] == "connect"
1222+
assert (
1223+
executemany_span["description"]
1224+
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
1225+
)
1226+
assert begin_span["description"] == "BEGIN;"
1227+
assert bind_exec_span["description"] == "SELECT * FROM users WHERE dob > $1"
1228+
assert commit_span["description"] == "COMMIT;"
1229+
1230+
assert bind_exec_span["origin"] == "auto.db.asyncpg"
1231+
assert bind_exec_span["data"]["db.system"] == "postgresql"
1232+
assert bind_exec_span["data"]["db.driver.name"] == "asyncpg"
1233+
assert bind_exec_span["data"]["server.address"] == PG_HOST
1234+
assert bind_exec_span["data"]["server.port"] == PG_PORT
1235+
assert bind_exec_span["data"]["db.name"] == PG_NAME
1236+
assert bind_exec_span["data"]["db.user"] == PG_USER
1237+
1238+
1239+
@pytest.mark.asyncio
1240+
async def test_cursor__bind_and__exec_methods_create_spans(
1241+
sentry_init, capture_events
1242+
) -> None:
1243+
sentry_init(
1244+
integrations=[AsyncPGIntegration()],
1245+
traces_sample_rate=1.0,
1246+
)
1247+
events = capture_events()
1248+
1249+
with start_transaction(name="test_transaction"):
1250+
conn: Connection = await connect(PG_CONNECTION_URI)
1251+
1252+
await conn.executemany(
1253+
"INSERT INTO users(name, password, dob) VALUES($1, $2, $3)",
1254+
[
1255+
("Bob", "secret_pw", datetime.date(1984, 3, 1)),
1256+
("Alice", "pw", datetime.date(1990, 12, 25)),
1257+
],
1258+
)
1259+
1260+
async with conn.transaction():
1261+
# This exercises the `_bind` patch and the `cursor` patch
1262+
cur = await conn.cursor(
1263+
"SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1)
1264+
)
1265+
# These exercise the `_exec` patch
1266+
await cur.fetchrow()
1267+
await cur.fetchrow()
1268+
1269+
await conn.close()
1270+
1271+
(event,) = events
1272+
1273+
assert len(event["spans"]) == 7
1274+
1275+
connect_span = event["spans"][0]
1276+
executemany_span = event["spans"][1]
1277+
begin_span = event["spans"][2]
1278+
cursor_creation_and_bind_span = event["spans"][3]
1279+
fetchrow_span_1 = event["spans"][4]
1280+
fetchrow_span_2 = event["spans"][5]
1281+
commit_span = event["spans"][6]
1282+
1283+
assert connect_span["description"] == "connect"
1284+
assert (
1285+
executemany_span["description"]
1286+
== "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)"
1287+
)
1288+
assert begin_span["description"] == "BEGIN;"
1289+
assert fetchrow_span_1["description"] == "SELECT * FROM users WHERE dob > $1"
1290+
assert (
1291+
cursor_creation_and_bind_span["description"]
1292+
== "SELECT * FROM users WHERE dob > $1"
1293+
)
1294+
assert fetchrow_span_2["description"] == "SELECT * FROM users WHERE dob > $1"
1295+
assert commit_span["description"] == "COMMIT;"
1296+
1297+
for span in (cursor_creation_and_bind_span, fetchrow_span_1, fetchrow_span_2):
1298+
assert span["data"]["db.cursor"] is not None
1299+
assert span["data"]["db.system"] == "postgresql"
1300+
assert span["data"]["db.driver.name"] == "asyncpg"
1301+
assert span["origin"] == "auto.db.asyncpg"

0 commit comments

Comments
 (0)