Skip to content

Commit e65b1c2

Browse files
committed
Merge branch 'master' into ivana/span-first-before-send-span
2 parents 9221b12 + f92c803 commit e65b1c2

14 files changed

Lines changed: 561 additions & 81 deletions

File tree

.github/workflows/ai-integration-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
token: ${{ secrets.GITHUB_TOKEN }}
3535

3636
- name: Run Python SDK Tests
37-
uses: getsentry/testing-ai-sdk-integrations@1dd9ee2a2d821f41473fc90809a758e8394c689c
37+
uses: getsentry/testing-ai-sdk-integrations@18229d1da4bb8120ce923bdf7aac00fcf6f38311
3838
env:
3939
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
4040
with:

sentry_sdk/_batcher.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,10 @@ def _add_to_envelope(self, envelope: "Envelope") -> None:
153153
},
154154
payload=PayloadRef(
155155
json={
156+
"version": 2,
156157
"items": [
157158
self._to_transport_format(item) for item in self._buffer
158-
]
159+
],
159160
}
160161
),
161162
)

sentry_sdk/_log_batcher.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ def _record_lost(self, item: "Log") -> None:
4545
headers={
4646
"item_count": 1,
4747
},
48-
payload=PayloadRef(json={"items": [self._to_transport_format(item)]}),
48+
payload=PayloadRef(
49+
json={
50+
"version": 2,
51+
"items": [self._to_transport_format(item)],
52+
}
53+
),
4954
)
5055

5156
self._record_lost_func(

sentry_sdk/_span_batcher.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,11 @@ def _flush(self, only_pending: bool = False) -> None:
214214
},
215215
payload=PayloadRef(
216216
json={
217+
"version": 2,
217218
"items": [
218219
self._to_transport_format(spans[j])
219220
for j in range(start, end)
220-
]
221+
],
221222
}
222223
),
223224
)

sentry_sdk/integrations/asyncpg.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
)
1717
from sentry_sdk.utils import (
1818
capture_internal_exceptions,
19-
ensure_integration_enabled,
2019
parse_version,
2120
)
2221

@@ -52,8 +51,12 @@ def setup_once() -> None:
5251
asyncpg.Connection._executemany = _wrap_connection_method(
5352
asyncpg.Connection._executemany, executemany=True
5453
)
55-
asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
5654
asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)
55+
56+
BaseCursor._bind_exec = _wrap_cursor_method(BaseCursor._bind_exec)
57+
BaseCursor._bind = _wrap_cursor_method(BaseCursor._bind)
58+
BaseCursor._exec = _wrap_cursor_method(BaseCursor._exec)
59+
5760
asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
5861
asyncpg.connect_utils._connect_addr
5962
)
@@ -150,20 +153,26 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T":
150153
return _inner
151154

152155

153-
def _wrap_cursor_creation(f: "Callable[..., T]") -> "Callable[..., T]":
154-
@ensure_integration_enabled(AsyncPGIntegration, f)
155-
def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807
156-
query = args[1]
157-
params_list = args[2] if len(args) > 2 else None
156+
def _wrap_cursor_method(
157+
f: "Callable[..., Awaitable[T]]",
158+
) -> "Callable[..., Awaitable[T]]":
159+
async def _inner(*args: "Any", **kwargs: "Any") -> "T":
160+
if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None:
161+
return await f(*args, **kwargs)
158162

159-
with _record(
160-
None,
161-
query,
162-
params_list,
163+
cursor = args[0]
164+
query = _normalize_query(cursor._query)
165+
with record_sql_queries_supporting_streaming(
166+
cursor=cursor,
167+
query=query,
168+
params_list=None,
169+
paramstyle=None,
163170
executemany=False,
171+
record_cursor_repr=True,
172+
span_origin=AsyncPGIntegration.origin,
164173
) as span:
165-
_set_db_data(span, args[0])
166-
res = f(*args, **kwargs)
174+
_set_db_data(span, cursor._connection)
175+
res = await f(*args, **kwargs)
167176

168177
return res
169178

sentry_sdk/integrations/pymongo.py

Lines changed: 99 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import json
33

44
import sentry_sdk
5-
from sentry_sdk.consts import SPANSTATUS, SPANDATA, OP
5+
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
66
from sentry_sdk.integrations import DidNotEnable, Integration
77
from sentry_sdk.scope import should_send_default_pii
8+
from sentry_sdk.traces import SpanStatus, StreamedSpan
89
from sentry_sdk.tracing import Span
10+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
911
from sentry_sdk.utils import capture_internal_exceptions
1012

1113
try:
@@ -87,12 +89,11 @@ def _strip_pii(command: "Dict[str, Any]") -> "Dict[str, Any]":
8789
def _get_db_data(event: "Any") -> "Dict[str, Any]":
8890
data = {}
8991

90-
data[SPANDATA.DB_SYSTEM] = "mongodb"
91-
data[SPANDATA.DB_DRIVER_NAME] = "pymongo"
92+
client = sentry_sdk.get_client()
93+
is_span_streaming_enabled = has_span_streaming_enabled(client.options)
9294

95+
data[SPANDATA.DB_DRIVER_NAME] = "pymongo"
9396
db_name = event.database_name
94-
if db_name is not None:
95-
data[SPANDATA.DB_NAME] = db_name
9697

9798
server_address = event.connection_id[0]
9899
if server_address is not None:
@@ -102,12 +103,23 @@ def _get_db_data(event: "Any") -> "Dict[str, Any]":
102103
if server_port is not None:
103104
data[SPANDATA.SERVER_PORT] = server_port
104105

106+
if is_span_streaming_enabled:
107+
data["db.system.name"] = "mongodb"
108+
109+
if db_name is not None:
110+
data["db.namespace"] = db_name
111+
else:
112+
data[SPANDATA.DB_SYSTEM] = "mongodb"
113+
114+
if db_name is not None:
115+
data[SPANDATA.DB_NAME] = db_name
116+
105117
return data
106118

107119

108120
class CommandTracer(monitoring.CommandListener):
109121
def __init__(self) -> None:
110-
self._ongoing_operations: "Dict[int, Span]" = {}
122+
self._ongoing_operations: "Dict[int, Union[Span, StreamedSpan]]" = {}
111123

112124
def _operation_key(
113125
self,
@@ -116,7 +128,8 @@ def _operation_key(
116128
return event.request_id
117129

118130
def started(self, event: "CommandStartedEvent") -> None:
119-
if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
131+
client = sentry_sdk.get_client()
132+
if client.get_integration(PyMongoIntegration) is None:
120133
return
121134

122135
with capture_internal_exceptions():
@@ -126,56 +139,88 @@ def started(self, event: "CommandStartedEvent") -> None:
126139
command.pop("$clusterTime", None)
127140
command.pop("$signature", None)
128141

129-
tags = {
130-
"db.name": event.database_name,
131-
SPANDATA.DB_SYSTEM: "mongodb",
132-
SPANDATA.DB_DRIVER_NAME: "pymongo",
133-
SPANDATA.DB_OPERATION: event.command_name,
134-
SPANDATA.DB_MONGODB_COLLECTION: command.get(event.command_name),
135-
}
136-
137-
try:
138-
tags["net.peer.name"] = event.connection_id[0]
139-
tags["net.peer.port"] = str(event.connection_id[1])
140-
except TypeError:
141-
pass
142+
db_data = _get_db_data(event)
142143

143-
data: "Dict[str, Any]" = {"operation_ids": {}}
144-
data["operation_ids"]["operation"] = event.operation_id
145-
data["operation_ids"]["request"] = event.request_id
146-
147-
data.update(_get_db_data(event))
148-
149-
try:
150-
lsid = command.pop("lsid")["id"]
151-
data["operation_ids"]["session"] = str(lsid)
152-
except KeyError:
153-
pass
144+
collection_name = command.get(event.command_name)
145+
operation_name = event.command_name
146+
db_name = event.database_name
154147

148+
lsid = command.pop("lsid", None)
155149
if not should_send_default_pii():
156150
command = _strip_pii(command)
157151

158152
query = json.dumps(command, default=str)
159-
span = sentry_sdk.start_span(
160-
op=OP.DB,
161-
name=query,
162-
origin=PyMongoIntegration.origin,
163-
)
164153

165-
for tag, value in tags.items():
166-
# set the tag for backwards-compatibility.
167-
# TODO: remove the set_tag call in the next major release!
168-
span.set_tag(tag, value)
154+
if has_span_streaming_enabled(client.options):
155+
span_first_data = {
156+
"db.operation.name": operation_name,
157+
"db.collection.name": collection_name,
158+
"sentry.op": OP.DB,
159+
"sentry.origin": PyMongoIntegration.origin,
160+
**db_data,
161+
}
162+
163+
span = sentry_sdk.traces.start_span(
164+
name=query, attributes=span_first_data
165+
)
166+
167+
with capture_internal_exceptions():
168+
sentry_sdk.add_breadcrumb(
169+
message=query,
170+
category="query",
171+
type=OP.DB,
172+
data=span_first_data,
173+
)
174+
175+
else:
176+
tags = {
177+
"db.name": db_name,
178+
SPANDATA.DB_SYSTEM: "mongodb",
179+
SPANDATA.DB_DRIVER_NAME: "pymongo",
180+
SPANDATA.DB_OPERATION: operation_name,
181+
# The below is a deprecated field, but leaving for legacy reasons.
182+
# The v2 spans will use `db.collection.name` instead.
183+
SPANDATA.DB_MONGODB_COLLECTION: collection_name,
184+
}
185+
186+
try:
187+
tags["net.peer.name"] = event.connection_id[0]
188+
tags["net.peer.port"] = str(event.connection_id[1])
189+
except TypeError:
190+
pass
191+
192+
data: "Dict[str, Any]" = {"operation_ids": {}}
193+
data["operation_ids"]["operation"] = event.operation_id
194+
data["operation_ids"]["request"] = event.request_id
195+
196+
data.update(db_data)
197+
198+
try:
199+
if lsid:
200+
lsid_id = lsid["id"]
201+
data["operation_ids"]["session"] = str(lsid_id)
202+
except KeyError:
203+
pass
204+
205+
span = sentry_sdk.start_span(
206+
op=OP.DB,
207+
name=query,
208+
origin=PyMongoIntegration.origin,
209+
)
169210

170-
span.set_data(tag, value)
211+
for tag, value in tags.items():
212+
# set the tag for backwards-compatibility.
213+
# TODO: remove the set_tag call in the next major release!
214+
span.set_tag(tag, value)
215+
span.set_data(tag, value)
171216

172-
for key, value in data.items():
173-
span.set_data(key, value)
217+
for key, value in data.items():
218+
span.set_data(key, value)
174219

175-
with capture_internal_exceptions():
176-
sentry_sdk.add_breadcrumb(
177-
message=query, category="query", type=OP.DB, data=tags
178-
)
220+
with capture_internal_exceptions():
221+
sentry_sdk.add_breadcrumb(
222+
message=query, category="query", type=OP.DB, data=tags
223+
)
179224

180225
self._ongoing_operations[self._operation_key(event)] = span.__enter__()
181226

@@ -185,7 +230,11 @@ def failed(self, event: "CommandFailedEvent") -> None:
185230

186231
try:
187232
span = self._ongoing_operations.pop(self._operation_key(event))
188-
span.set_status(SPANSTATUS.INTERNAL_ERROR)
233+
# Ignoring NoOpStreamedSpan as it will always have a status of "ok"
234+
if type(span) is StreamedSpan:
235+
span.status = SpanStatus.ERROR
236+
elif type(span) is Span:
237+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
189238
span.__exit__(None, None, None)
190239
except KeyError:
191240
return
@@ -196,7 +245,8 @@ def succeeded(self, event: "CommandSucceededEvent") -> None:
196245

197246
try:
198247
span = self._ongoing_operations.pop(self._operation_key(event))
199-
span.set_status(SPANSTATUS.OK)
248+
if type(span) is Span:
249+
span.set_status(SPANSTATUS.OK)
200250
span.__exit__(None, None, None)
201251
except KeyError:
202252
pass

sentry_sdk/scrubber.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ def scrub_extra(self, event: "Event") -> None:
137137
def scrub_user(self, event: "Event") -> None:
138138
with capture_internal_exceptions():
139139
if "user" in event:
140-
self.scrub_dict(event["user"])
140+
user = event["user"]
141+
if "ip_address" in self.denylist and isinstance(user, dict):
142+
user.pop("ip_address", None)
143+
self.scrub_dict(user)
141144

142145
def scrub_breadcrumbs(self, event: "Event") -> None:
143146
with capture_internal_exceptions():

0 commit comments

Comments
 (0)