Skip to content

Commit edaa6d6

Browse files
authored
feat(pymongo): Add span streaming support (#6253)
Add span streaming support to the PyMongo integration. The legacy code path is preserved unchanged for non-streaming clients. Fixes PY-2349 Fixes #6047
1 parent 0e47ead commit edaa6d6

2 files changed

Lines changed: 281 additions & 53 deletions

File tree

sentry_sdk/integrations/pymongo.py

Lines changed: 99 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import json
33

44
import sentry_sdk
5-
from sentry_sdk.consts import SPANSTATUS, SPANDATA, OP
5+
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
66
from sentry_sdk.integrations import DidNotEnable, Integration
77
from sentry_sdk.scope import should_send_default_pii
8+
from sentry_sdk.traces import SpanStatus, StreamedSpan
89
from sentry_sdk.tracing import Span
10+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
911
from sentry_sdk.utils import capture_internal_exceptions
1012

1113
try:
@@ -87,12 +89,11 @@ def _strip_pii(command: "Dict[str, Any]") -> "Dict[str, Any]":
8789
def _get_db_data(event: "Any") -> "Dict[str, Any]":
8890
data = {}
8991

90-
data[SPANDATA.DB_SYSTEM] = "mongodb"
91-
data[SPANDATA.DB_DRIVER_NAME] = "pymongo"
92+
client = sentry_sdk.get_client()
93+
is_span_streaming_enabled = has_span_streaming_enabled(client.options)
9294

95+
data[SPANDATA.DB_DRIVER_NAME] = "pymongo"
9396
db_name = event.database_name
94-
if db_name is not None:
95-
data[SPANDATA.DB_NAME] = db_name
9697

9798
server_address = event.connection_id[0]
9899
if server_address is not None:
@@ -102,12 +103,23 @@ def _get_db_data(event: "Any") -> "Dict[str, Any]":
102103
if server_port is not None:
103104
data[SPANDATA.SERVER_PORT] = server_port
104105

106+
if is_span_streaming_enabled:
107+
data["db.system.name"] = "mongodb"
108+
109+
if db_name is not None:
110+
data["db.namespace"] = db_name
111+
else:
112+
data[SPANDATA.DB_SYSTEM] = "mongodb"
113+
114+
if db_name is not None:
115+
data[SPANDATA.DB_NAME] = db_name
116+
105117
return data
106118

107119

108120
class CommandTracer(monitoring.CommandListener):
109121
def __init__(self) -> None:
110-
self._ongoing_operations: "Dict[int, Span]" = {}
122+
self._ongoing_operations: "Dict[int, Union[Span, StreamedSpan]]" = {}
111123

112124
def _operation_key(
113125
self,
@@ -116,7 +128,8 @@ def _operation_key(
116128
return event.request_id
117129

118130
def started(self, event: "CommandStartedEvent") -> None:
119-
if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
131+
client = sentry_sdk.get_client()
132+
if client.get_integration(PyMongoIntegration) is None:
120133
return
121134

122135
with capture_internal_exceptions():
@@ -126,56 +139,88 @@ def started(self, event: "CommandStartedEvent") -> None:
126139
command.pop("$clusterTime", None)
127140
command.pop("$signature", None)
128141

129-
tags = {
130-
"db.name": event.database_name,
131-
SPANDATA.DB_SYSTEM: "mongodb",
132-
SPANDATA.DB_DRIVER_NAME: "pymongo",
133-
SPANDATA.DB_OPERATION: event.command_name,
134-
SPANDATA.DB_MONGODB_COLLECTION: command.get(event.command_name),
135-
}
136-
137-
try:
138-
tags["net.peer.name"] = event.connection_id[0]
139-
tags["net.peer.port"] = str(event.connection_id[1])
140-
except TypeError:
141-
pass
142+
db_data = _get_db_data(event)
142143

143-
data: "Dict[str, Any]" = {"operation_ids": {}}
144-
data["operation_ids"]["operation"] = event.operation_id
145-
data["operation_ids"]["request"] = event.request_id
146-
147-
data.update(_get_db_data(event))
148-
149-
try:
150-
lsid = command.pop("lsid")["id"]
151-
data["operation_ids"]["session"] = str(lsid)
152-
except KeyError:
153-
pass
144+
collection_name = command.get(event.command_name)
145+
operation_name = event.command_name
146+
db_name = event.database_name
154147

148+
lsid = command.pop("lsid", None)
155149
if not should_send_default_pii():
156150
command = _strip_pii(command)
157151

158152
query = json.dumps(command, default=str)
159-
span = sentry_sdk.start_span(
160-
op=OP.DB,
161-
name=query,
162-
origin=PyMongoIntegration.origin,
163-
)
164153

165-
for tag, value in tags.items():
166-
# set the tag for backwards-compatibility.
167-
# TODO: remove the set_tag call in the next major release!
168-
span.set_tag(tag, value)
154+
if has_span_streaming_enabled(client.options):
155+
span_first_data = {
156+
"db.operation.name": operation_name,
157+
"db.collection.name": collection_name,
158+
"sentry.op": OP.DB,
159+
"sentry.origin": PyMongoIntegration.origin,
160+
**db_data,
161+
}
162+
163+
span = sentry_sdk.traces.start_span(
164+
name=query, attributes=span_first_data
165+
)
166+
167+
with capture_internal_exceptions():
168+
sentry_sdk.add_breadcrumb(
169+
message=query,
170+
category="query",
171+
type=OP.DB,
172+
data=span_first_data,
173+
)
174+
175+
else:
176+
tags = {
177+
"db.name": db_name,
178+
SPANDATA.DB_SYSTEM: "mongodb",
179+
SPANDATA.DB_DRIVER_NAME: "pymongo",
180+
SPANDATA.DB_OPERATION: operation_name,
181+
# The below is a deprecated field, but leaving for legacy reasons.
182+
# The v2 spans will use `db.collection.name` instead.
183+
SPANDATA.DB_MONGODB_COLLECTION: collection_name,
184+
}
185+
186+
try:
187+
tags["net.peer.name"] = event.connection_id[0]
188+
tags["net.peer.port"] = str(event.connection_id[1])
189+
except TypeError:
190+
pass
191+
192+
data: "Dict[str, Any]" = {"operation_ids": {}}
193+
data["operation_ids"]["operation"] = event.operation_id
194+
data["operation_ids"]["request"] = event.request_id
195+
196+
data.update(db_data)
197+
198+
try:
199+
if lsid:
200+
lsid_id = lsid["id"]
201+
data["operation_ids"]["session"] = str(lsid_id)
202+
except KeyError:
203+
pass
204+
205+
span = sentry_sdk.start_span(
206+
op=OP.DB,
207+
name=query,
208+
origin=PyMongoIntegration.origin,
209+
)
169210

170-
span.set_data(tag, value)
211+
for tag, value in tags.items():
212+
# set the tag for backwards-compatibility.
213+
# TODO: remove the set_tag call in the next major release!
214+
span.set_tag(tag, value)
215+
span.set_data(tag, value)
171216

172-
for key, value in data.items():
173-
span.set_data(key, value)
217+
for key, value in data.items():
218+
span.set_data(key, value)
174219

175-
with capture_internal_exceptions():
176-
sentry_sdk.add_breadcrumb(
177-
message=query, category="query", type=OP.DB, data=tags
178-
)
220+
with capture_internal_exceptions():
221+
sentry_sdk.add_breadcrumb(
222+
message=query, category="query", type=OP.DB, data=tags
223+
)
179224

180225
self._ongoing_operations[self._operation_key(event)] = span.__enter__()
181226

@@ -185,7 +230,11 @@ def failed(self, event: "CommandFailedEvent") -> None:
185230

186231
try:
187232
span = self._ongoing_operations.pop(self._operation_key(event))
188-
span.set_status(SPANSTATUS.INTERNAL_ERROR)
233+
# Ignoring NoOpStreamedSpan as it will always have a status of "ok"
234+
if type(span) is StreamedSpan:
235+
span.status = SpanStatus.ERROR
236+
elif type(span) is Span:
237+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
189238
span.__exit__(None, None, None)
190239
except KeyError:
191240
return
@@ -196,7 +245,8 @@ def succeeded(self, event: "CommandSucceededEvent") -> None:
196245

197246
try:
198247
span = self._ongoing_operations.pop(self._operation_key(event))
199-
span.set_status(SPANSTATUS.OK)
248+
if type(span) is Span:
249+
span.set_status(SPANSTATUS.OK)
200250
span.__exit__(None, None, None)
201251
except KeyError:
202252
pass

0 commit comments

Comments
 (0)