Skip to content

Commit 9e54e14

Browse files
feat(arq): Support span streaming (#6506)
Replace the `arq_task_id` tag with the `messaging.message.id` attribute in the streaming lifecycle mode. Do not set the `arq_task_retry` tag or entries in the `rq-job` extra as attributes in the streaming mode.
1 parent baae950 commit 9e54e14

2 files changed

Lines changed: 337 additions & 105 deletions

File tree

sentry_sdk/integrations/arq.py

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import sys
22

33
import sentry_sdk
4-
from sentry_sdk.consts import OP, SPANSTATUS
4+
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
55
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
66
from sentry_sdk.integrations.logging import ignore_logger
77
from sentry_sdk.scope import should_send_default_pii
8+
from sentry_sdk.traces import SegmentSource
89
from sentry_sdk.tracing import Transaction, TransactionSource
10+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
911
from sentry_sdk.utils import (
1012
SENSITIVE_DATA_SUBSTITUTE,
13+
_register_control_flow_exception,
1114
capture_internal_exceptions,
1215
ensure_integration_enabled,
1316
event_from_exception,
@@ -59,6 +62,8 @@ def setup_once() -> None:
5962
patch_run_job()
6063
patch_create_worker()
6164

65+
_register_control_flow_exception(ARQ_CONTROL_FLOW_EXCEPTIONS) # type: ignore
66+
6267
ignore_logger("arq.worker")
6368

6469

@@ -69,10 +74,20 @@ def patch_enqueue_job() -> None:
6974
async def _sentry_enqueue_job(
7075
self: "ArqRedis", function: str, *args: "Any", **kwargs: "Any"
7176
) -> "Optional[Job]":
72-
integration = sentry_sdk.get_client().get_integration(ArqIntegration)
73-
if integration is None:
77+
client = sentry_sdk.get_client()
78+
if client.get_integration(ArqIntegration) is None:
7479
return await old_enqueue_job(self, function, *args, **kwargs)
7580

81+
if has_span_streaming_enabled(client.options):
82+
with sentry_sdk.traces.start_span(
83+
name=function,
84+
attributes={
85+
"sentry.op": OP.QUEUE_SUBMIT_ARQ,
86+
"sentry.origin": ArqIntegration.origin,
87+
},
88+
):
89+
return await old_enqueue_job(self, function, *args, **kwargs)
90+
7691
with sentry_sdk.start_span(
7792
op=OP.QUEUE_SUBMIT_ARQ, name=function, origin=ArqIntegration.origin
7893
):
@@ -86,14 +101,27 @@ def patch_run_job() -> None:
86101
old_run_job = Worker.run_job
87102

88103
async def _sentry_run_job(self: "Worker", job_id: str, score: int) -> None:
89-
integration = sentry_sdk.get_client().get_integration(ArqIntegration)
90-
if integration is None:
104+
client = sentry_sdk.get_client()
105+
if client.get_integration(ArqIntegration) is None:
91106
return await old_run_job(self, job_id, score)
92107

93108
with sentry_sdk.isolation_scope() as scope:
94109
scope._name = "arq"
95110
scope.clear_breadcrumbs()
96111

112+
if has_span_streaming_enabled(client.options):
113+
with sentry_sdk.traces.start_span(
114+
name="unknown arq task",
115+
attributes={
116+
"sentry.op": OP.QUEUE_TASK_ARQ,
117+
"sentry.origin": ArqIntegration.origin,
118+
"sentry.span.source": SegmentSource.TASK,
119+
SPANDATA.MESSAGING_MESSAGE_ID: job_id,
120+
},
121+
parent_span=None,
122+
):
123+
return await old_run_job(self, job_id, score)
124+
97125
transaction = Transaction(
98126
name="unknown arq task",
99127
status="ok",
@@ -163,10 +191,19 @@ def _wrap_coroutine(name: str, coroutine: "WorkerCoroutine") -> "WorkerCoroutine
163191
async def _sentry_coroutine(
164192
ctx: "Dict[Any, Any]", *args: "Any", **kwargs: "Any"
165193
) -> "Any":
166-
integration = sentry_sdk.get_client().get_integration(ArqIntegration)
194+
client = sentry_sdk.get_client()
195+
integration = client.get_integration(ArqIntegration)
167196
if integration is None:
168197
return await coroutine(ctx, *args, **kwargs)
169198

199+
if has_span_streaming_enabled(client.options):
200+
scope = sentry_sdk.get_current_scope()
201+
span = scope.streamed_span
202+
if span is not None:
203+
span.name = name
204+
205+
scope.set_transaction_name(name)
206+
170207
sentry_sdk.get_isolation_scope().add_event_processor(
171208
_make_event_processor({**ctx, "job_name": name}, *args, **kwargs)
172209
)

0 commit comments

Comments
 (0)