Skip to content

Commit b8ddc07

Browse files
committed
feat(pymongo): Add span streaming support
Add span streaming support to the PyMongo integration, mirroring the pattern used in the asyncpg integration. When trace_lifecycle=stream is enabled, spans are emitted using the new StreamedSpan API with OTel-style attributes. The legacy code path is preserved for non-streaming clients. Fixes PY-2349 Fixes #6047
1 parent b3c6226 commit b8ddc07

2 files changed

Lines changed: 270 additions & 50 deletions

File tree

sentry_sdk/integrations/pymongo.py

Lines changed: 86 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
import json
33

44
import sentry_sdk
5-
from sentry_sdk.consts import SPANSTATUS, SPANDATA, OP
5+
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
66
from sentry_sdk.integrations import DidNotEnable, Integration
77
from sentry_sdk.scope import should_send_default_pii
8+
from sentry_sdk.traces import SpanStatus, StreamedSpan
89
from sentry_sdk.tracing import Span
10+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
911
from sentry_sdk.utils import capture_internal_exceptions
1012

1113
try:
@@ -107,7 +109,7 @@ def _get_db_data(event: "Any") -> "Dict[str, Any]":
107109

108110
class CommandTracer(monitoring.CommandListener):
109111
def __init__(self) -> None:
110-
self._ongoing_operations: "Dict[int, Span]" = {}
112+
self._ongoing_operations: "Dict[int, Union[Span, StreamedSpan]]" = {}
111113

112114
def _operation_key(
113115
self,
@@ -116,7 +118,8 @@ def _operation_key(
116118
return event.request_id
117119

118120
def started(self, event: "CommandStartedEvent") -> None:
119-
if sentry_sdk.get_client().get_integration(PyMongoIntegration) is None:
121+
client = sentry_sdk.get_client()
122+
if client.get_integration(PyMongoIntegration) is None:
120123
return
121124

122125
with capture_internal_exceptions():
@@ -126,56 +129,85 @@ def started(self, event: "CommandStartedEvent") -> None:
126129
command.pop("$clusterTime", None)
127130
command.pop("$signature", None)
128131

129-
tags = {
130-
"db.name": event.database_name,
131-
SPANDATA.DB_SYSTEM: "mongodb",
132-
SPANDATA.DB_DRIVER_NAME: "pymongo",
133-
SPANDATA.DB_OPERATION: event.command_name,
134-
SPANDATA.DB_MONGODB_COLLECTION: command.get(event.command_name),
135-
}
136-
137-
try:
138-
tags["net.peer.name"] = event.connection_id[0]
139-
tags["net.peer.port"] = str(event.connection_id[1])
140-
except TypeError:
141-
pass
142-
143-
data: "Dict[str, Any]" = {"operation_ids": {}}
144-
data["operation_ids"]["operation"] = event.operation_id
145-
data["operation_ids"]["request"] = event.request_id
146-
147-
data.update(_get_db_data(event))
148-
149-
try:
150-
lsid = command.pop("lsid")["id"]
151-
data["operation_ids"]["session"] = str(lsid)
152-
except KeyError:
153-
pass
132+
db_data = _get_db_data(event)
154133

155134
if not should_send_default_pii():
156135
command = _strip_pii(command)
157136

158137
query = json.dumps(command, default=str)
159-
span = sentry_sdk.start_span(
160-
op=OP.DB,
161-
name=query,
162-
origin=PyMongoIntegration.origin,
163-
)
164138

165-
for tag, value in tags.items():
166-
# set the tag for backwards-compatibility.
167-
# TODO: remove the set_tag call in the next major release!
168-
span.set_tag(tag, value)
139+
if has_span_streaming_enabled(client.options):
140+
span_first_data = {
141+
"db.name": event.database_name,
142+
SPANDATA.DB_SYSTEM: "mongodb",
143+
SPANDATA.DB_DRIVER_NAME: "pymongo",
144+
SPANDATA.DB_OPERATION: event.command_name,
145+
"db.collection.name": command.get(event.command_name),
146+
"sentry.op": OP.DB,
147+
"sentry.origin": PyMongoIntegration.origin,
148+
**db_data,
149+
}
150+
151+
span = sentry_sdk.traces.start_span(
152+
name=query, attributes=span_first_data
153+
)
154+
155+
with capture_internal_exceptions():
156+
sentry_sdk.add_breadcrumb(
157+
message=query,
158+
category="query",
159+
type=OP.DB,
160+
data=span_first_data,
161+
)
162+
163+
else:
164+
tags = {
165+
"db.name": event.database_name,
166+
SPANDATA.DB_SYSTEM: "mongodb",
167+
SPANDATA.DB_DRIVER_NAME: "pymongo",
168+
SPANDATA.DB_OPERATION: event.command_name,
169+
# The below is a deprecated field, but leaving for legacy reasons.
170+
# The v2 spans will use `db.collection.name` instead.
171+
SPANDATA.DB_MONGODB_COLLECTION: command.get(event.command_name),
172+
}
173+
174+
try:
175+
tags["net.peer.name"] = event.connection_id[0]
176+
tags["net.peer.port"] = str(event.connection_id[1])
177+
except TypeError:
178+
pass
179+
180+
data: "Dict[str, Any]" = {"operation_ids": {}}
181+
data["operation_ids"]["operation"] = event.operation_id
182+
data["operation_ids"]["request"] = event.request_id
183+
184+
data.update(db_data)
185+
186+
try:
187+
lsid = command.pop("lsid")["id"]
188+
data["operation_ids"]["session"] = str(lsid)
189+
except KeyError:
190+
pass
191+
192+
span = sentry_sdk.start_span(
193+
op=OP.DB,
194+
name=query,
195+
origin=PyMongoIntegration.origin,
196+
)
169197

170-
span.set_data(tag, value)
198+
for tag, value in tags.items():
199+
# set the tag for backwards-compatibility.
200+
# TODO: remove the set_tag call in the next major release!
201+
span.set_tag(tag, value)
202+
span.set_data(tag, value)
171203

172-
for key, value in data.items():
173-
span.set_data(key, value)
204+
for key, value in data.items():
205+
span.set_data(key, value)
174206

175-
with capture_internal_exceptions():
176-
sentry_sdk.add_breadcrumb(
177-
message=query, category="query", type=OP.DB, data=tags
178-
)
207+
with capture_internal_exceptions():
208+
sentry_sdk.add_breadcrumb(
209+
message=query, category="query", type=OP.DB, data=tags
210+
)
179211

180212
self._ongoing_operations[self._operation_key(event)] = span.__enter__()
181213

@@ -185,7 +217,11 @@ def failed(self, event: "CommandFailedEvent") -> None:
185217

186218
try:
187219
span = self._ongoing_operations.pop(self._operation_key(event))
188-
span.set_status(SPANSTATUS.INTERNAL_ERROR)
220+
# Ignoring NoOpStreamedSpan as it will always have a status of "ok"
221+
if type(span) is StreamedSpan:
222+
span.status = SpanStatus.ERROR
223+
elif type(span) is Span:
224+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
189225
span.__exit__(None, None, None)
190226
except KeyError:
191227
return
@@ -196,7 +232,11 @@ def succeeded(self, event: "CommandSucceededEvent") -> None:
196232

197233
try:
198234
span = self._ongoing_operations.pop(self._operation_key(event))
199-
span.set_status(SPANSTATUS.OK)
235+
# Ignoring NoOpStreamedSpan as it will always have a status of "ok"
236+
if type(span) is StreamedSpan:
237+
span.status = SpanStatus.OK
238+
elif type(span) is Span:
239+
span.set_status(SPANSTATUS.OK)
200240
span.__exit__(None, None, None)
201241
except KeyError:
202242
pass

tests/integrations/pymongo/test_pymongo.py

Lines changed: 184 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1+
import pytest
2+
from mockupdb import MockupDB, OpQuery
3+
from pymongo import MongoClient
4+
5+
import sentry_sdk
16
from sentry_sdk import capture_message, start_transaction
27
from sentry_sdk.consts import SPANDATA
38
from sentry_sdk.integrations.pymongo import PyMongoIntegration, _strip_pii
49

5-
from mockupdb import MockupDB, OpQuery
6-
from pymongo import MongoClient
7-
import pytest
8-
910

1011
@pytest.fixture(scope="session")
1112
def mongo_server():
@@ -109,6 +110,74 @@ def test_transactions(sentry_init, capture_events, mongo_server, with_pii):
109110
assert insert_fail["tags"]["status"] == "internal_error"
110111

111112

113+
@pytest.mark.parametrize("with_pii", [False, True])
114+
def test_transactions_span_streaming(
115+
sentry_init, capture_items, mongo_server, with_pii
116+
):
117+
sentry_init(
118+
integrations=[PyMongoIntegration()],
119+
traces_sample_rate=1.0,
120+
send_default_pii=with_pii,
121+
_experiments={"trace_lifecycle": "stream"},
122+
)
123+
items = capture_items("span")
124+
125+
connection = MongoClient(mongo_server.uri)
126+
127+
with sentry_sdk.traces.start_span(name="test_transaction"):
128+
list(
129+
connection["test_db"]["test_collection"].find({"foobar": 1})
130+
) # force query execution
131+
connection["test_db"]["test_collection"].insert_one({"foo": 2})
132+
try:
133+
connection["test_db"]["erroneous"].insert_many([{"bar": 3}, {"baz": 4}])
134+
pytest.fail("Request should raise")
135+
except Exception:
136+
pass
137+
sentry_sdk.flush()
138+
139+
spans = [item.payload for item in items]
140+
assert len(spans) == 4
141+
142+
(find, insert_success, insert_fail, segment) = spans
143+
assert segment["name"] == "test_transaction"
144+
145+
for span in find, insert_success, insert_fail:
146+
attrs = span["attributes"]
147+
assert attrs[SPANDATA.DB_SYSTEM] == "mongodb"
148+
assert attrs[SPANDATA.DB_DRIVER_NAME] == "pymongo"
149+
assert attrs["db.name"] == "test_db"
150+
assert attrs[SPANDATA.SERVER_ADDRESS] == "localhost"
151+
assert attrs[SPANDATA.SERVER_PORT] == mongo_server.port
152+
assert attrs["sentry.op"] == "db"
153+
assert attrs["sentry.origin"] == "auto.db.pymongo"
154+
155+
assert find["attributes"][SPANDATA.DB_OPERATION] == "find"
156+
assert insert_success["attributes"][SPANDATA.DB_OPERATION] == "insert"
157+
assert insert_fail["attributes"][SPANDATA.DB_OPERATION] == "insert"
158+
159+
assert find["name"].startswith('{"find')
160+
assert insert_success["name"].startswith('{"insert')
161+
assert insert_fail["name"].startswith('{"insert')
162+
163+
assert find["attributes"]["db.collection.name"] == "test_collection"
164+
assert insert_success["attributes"]["db.collection.name"] == "test_collection"
165+
assert insert_fail["attributes"]["db.collection.name"] == "erroneous"
166+
167+
if with_pii:
168+
assert "1" in find["name"]
169+
assert "2" in insert_success["name"]
170+
assert "3" in insert_fail["name"] and "4" in insert_fail["name"]
171+
else:
172+
assert "1" not in find["name"]
173+
assert "2" not in insert_success["name"]
174+
assert "3" not in insert_fail["name"] and "4" not in insert_fail["name"]
175+
176+
assert find["status"] == "ok"
177+
assert insert_success["status"] == "ok"
178+
assert insert_fail["status"] == "error"
179+
180+
112181
@pytest.mark.parametrize("with_pii", [False, True])
113182
def test_breadcrumbs(sentry_init, capture_events, mongo_server, with_pii):
114183
sentry_init(
@@ -146,6 +215,46 @@ def test_breadcrumbs(sentry_init, capture_events, mongo_server, with_pii):
146215
}
147216

148217

218+
@pytest.mark.parametrize("with_pii", [False, True])
219+
def test_breadcrumbs_span_streaming(sentry_init, capture_items, mongo_server, with_pii):
220+
sentry_init(
221+
integrations=[PyMongoIntegration()],
222+
traces_sample_rate=1.0,
223+
send_default_pii=with_pii,
224+
_experiments={"trace_lifecycle": "stream"},
225+
)
226+
items = capture_items("event")
227+
228+
connection = MongoClient(mongo_server.uri)
229+
230+
list(
231+
connection["test_db"]["test_collection"].find({"foobar": 1})
232+
) # force query execution
233+
capture_message("hi")
234+
235+
event = items[0].payload
236+
(crumb,) = event["breadcrumbs"]["values"]
237+
238+
assert crumb["category"] == "query"
239+
assert crumb["message"].startswith('{"find')
240+
if with_pii:
241+
assert "1" in crumb["message"]
242+
else:
243+
assert "1" not in crumb["message"]
244+
assert crumb["type"] == "db"
245+
246+
data = crumb["data"]
247+
assert data["db.name"] == "test_db"
248+
assert data[SPANDATA.DB_SYSTEM] == "mongodb"
249+
assert data[SPANDATA.DB_DRIVER_NAME] == "pymongo"
250+
assert data[SPANDATA.DB_OPERATION] == "find"
251+
assert data["db.collection.name"] == "test_collection"
252+
assert data["sentry.op"] == "db"
253+
assert data["sentry.origin"] == "auto.db.pymongo"
254+
assert data[SPANDATA.SERVER_ADDRESS] == "localhost"
255+
assert data[SPANDATA.SERVER_PORT] == mongo_server.port
256+
257+
149258
@pytest.mark.parametrize(
150259
"testcase",
151260
[
@@ -460,3 +569,74 @@ def test_span_origin(sentry_init, capture_events, mongo_server):
460569

461570
assert event["contexts"]["trace"]["origin"] == "manual"
462571
assert event["spans"][0]["origin"] == "auto.db.pymongo"
572+
573+
574+
def test_span_origin_span_streaming(sentry_init, capture_items, mongo_server):
575+
sentry_init(
576+
integrations=[PyMongoIntegration()],
577+
traces_sample_rate=1.0,
578+
_experiments={"trace_lifecycle": "stream"},
579+
)
580+
items = capture_items("span")
581+
582+
connection = MongoClient(mongo_server.uri)
583+
584+
with sentry_sdk.traces.start_span(name="test_transaction"):
585+
list(
586+
connection["test_db"]["test_collection"].find({"foobar": 1})
587+
) # force query execution
588+
sentry_sdk.flush()
589+
590+
spans = [item.payload for item in items]
591+
assert len(spans) == 2
592+
(db_span, segment) = spans
593+
assert segment["name"] == "test_transaction"
594+
assert db_span["attributes"]["sentry.origin"] == "auto.db.pymongo"
595+
596+
597+
def test_span_streaming_status_on_success(sentry_init, capture_items, mongo_server):
598+
sentry_init(
599+
integrations=[PyMongoIntegration()],
600+
traces_sample_rate=1.0,
601+
_experiments={"trace_lifecycle": "stream"},
602+
)
603+
items = capture_items("span")
604+
605+
connection = MongoClient(mongo_server.uri)
606+
607+
with sentry_sdk.traces.start_span(name="test_transaction"):
608+
connection["test_db"]["test_collection"].insert_one({"foo": 1})
609+
sentry_sdk.flush()
610+
611+
spans = [item.payload for item in items]
612+
assert len(spans) == 2
613+
(db_span, segment) = spans
614+
assert segment["name"] == "test_transaction"
615+
assert db_span["status"] == "ok"
616+
617+
618+
def test_span_streaming_status_on_failure(sentry_init, capture_items, mongo_server):
619+
sentry_init(
620+
integrations=[PyMongoIntegration()],
621+
traces_sample_rate=1.0,
622+
_experiments={"trace_lifecycle": "stream"},
623+
)
624+
items = capture_items("span")
625+
626+
connection = MongoClient(mongo_server.uri)
627+
628+
with sentry_sdk.traces.start_span(name="test_transaction"):
629+
try:
630+
connection["test_db"]["erroneous"].insert_many([{"bar": 3}])
631+
pytest.fail("Request should raise")
632+
except Exception:
633+
pass
634+
sentry_sdk.flush()
635+
636+
spans = [item.payload for item in items]
637+
638+
assert len(spans) == 2
639+
(db_span, segment) = spans
640+
641+
assert segment["name"] == "test_transaction"
642+
assert db_span["status"] == "error"

0 commit comments

Comments
 (0)