Skip to content

Commit 2707b43

Browse files
feat(rq): Support span streaming
1 parent 0527c03 commit 2707b43

2 files changed

Lines changed: 392 additions & 130 deletions

File tree

sentry_sdk/integrations/rq.py

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22

33
import sentry_sdk
44
from sentry_sdk.api import continue_trace
5-
from sentry_sdk.consts import OP
5+
from sentry_sdk.consts import OP, SPANDATA
66
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
77
from sentry_sdk.integrations.logging import ignore_logger
8-
from sentry_sdk.scope import should_send_default_pii
8+
from sentry_sdk.scope import Scope, should_send_default_pii
99
from sentry_sdk.tracing import TransactionSource
10+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
1011
from sentry_sdk.utils import (
1112
SENSITIVE_DATA_SUBSTITUTE,
1213
capture_internal_exceptions,
13-
ensure_integration_enabled,
1414
event_from_exception,
1515
format_timestamp,
1616
parse_version,
@@ -61,30 +61,57 @@ def setup_once() -> None:
6161

6262
old_perform_job = worker_cls.perform_job
6363

64-
@ensure_integration_enabled(RqIntegration, old_perform_job)
6564
def sentry_patched_perform_job(
6665
self: "Any", job: "Job", *args: "Queue", **kwargs: "Any"
6766
) -> bool:
67+
client = sentry_sdk.get_client()
68+
if client.get_integration(RqIntegration) is None:
69+
return old_perform_job(self, job, *args, **kwargs)
70+
6871
with sentry_sdk.new_scope() as scope:
6972
scope.clear_breadcrumbs()
7073
scope.add_event_processor(_make_event_processor(weakref.ref(job)))
7174

72-
transaction = continue_trace(
73-
job.meta.get("_sentry_trace_headers") or {},
74-
op=OP.QUEUE_TASK_RQ,
75-
name="unknown RQ task",
76-
source=TransactionSource.TASK,
77-
origin=RqIntegration.origin,
78-
)
79-
80-
with capture_internal_exceptions():
81-
transaction.name = job.func_name
82-
83-
with sentry_sdk.start_transaction(
84-
transaction,
85-
custom_sampling_context={"rq_job": job},
86-
):
87-
rv = old_perform_job(self, job, *args, **kwargs)
75+
if has_span_streaming_enabled(client.options):
76+
sentry_sdk.traces.continue_trace(
77+
job.meta.get("_sentry_trace_headers") or {}
78+
)
79+
80+
Scope.set_custom_sampling_context({"rq_job": job})
81+
82+
span_name = "unknown RQ task"
83+
with capture_internal_exceptions():
84+
span_name = job.func_name
85+
86+
with sentry_sdk.traces.start_span(
87+
name=span_name,
88+
attributes={
89+
"sentry.op": OP.QUEUE_TASK_RQ,
90+
"sentry.origin": RqIntegration.origin,
91+
"sentry.span.source": TransactionSource.TASK,
92+
SPANDATA.MESSAGING_MESSAGE_ID: job.id,
93+
SPANDATA.CODE_FUNCTION_NAME: job.func_name,
94+
},
95+
parent_span=None,
96+
):
97+
rv = old_perform_job(self, job, *args, **kwargs)
98+
else:
99+
transaction = continue_trace(
100+
job.meta.get("_sentry_trace_headers") or {},
101+
op=OP.QUEUE_TASK_RQ,
102+
name="unknown RQ task",
103+
source=TransactionSource.TASK,
104+
origin=RqIntegration.origin,
105+
)
106+
107+
with capture_internal_exceptions():
108+
transaction.name = job.func_name
109+
110+
with sentry_sdk.start_transaction(
111+
transaction,
112+
custom_sampling_context={"rq_job": job},
113+
):
114+
rv = old_perform_job(self, job, *args, **kwargs)
88115

89116
if self.is_horse:
90117
# We're inside of a forked process and RQ is
@@ -116,12 +143,20 @@ def sentry_patched_handle_exception(
116143

117144
old_enqueue_job = Queue.enqueue_job
118145

119-
@ensure_integration_enabled(RqIntegration, old_enqueue_job)
120146
def sentry_patched_enqueue_job(
121147
self: "Queue", job: "Any", **kwargs: "Any"
122148
) -> "Any":
149+
client = sentry_sdk.get_client()
150+
if client.get_integration(RqIntegration) is None:
151+
return old_enqueue_job(self, job, **kwargs)
152+
123153
scope = sentry_sdk.get_current_scope()
124-
if scope.span is not None:
154+
span_streaming = has_span_streaming_enabled(client.options)
155+
if span_streaming and scope.streamed_span is not None:
156+
job.meta["_sentry_trace_headers"] = dict(
157+
scope.iter_trace_propagation_headers()
158+
)
159+
elif not span_streaming and scope.span is not None:
125160
job.meta["_sentry_trace_headers"] = dict(
126161
scope.iter_trace_propagation_headers()
127162
)

0 commit comments

Comments
 (0)