Skip to content

Commit 26ef518

Browse files
[mq] working branch - merge 00b7640 on top of main at 1501bff
{"baseBranch":"main","baseCommit":"1501bff342c55fca79d2687e75f1f1b3382d2fb9","createdAt":"2024-12-18T23:20:16.713593Z","headSha":"00b76400e6565fe94a9dc91108885ccf41f91c75","id":"08473384-2b72-44ff-9215-045d36f9e95b","priority":"200","pullRequestNumber":"11498","queuedAt":"2024-12-18T23:20:16.712998Z","status":"STATUS_QUEUED"}
2 parents 0546c73 + 00b7640 commit 26ef518

File tree

6 files changed

+85
-14
lines changed

6 files changed

+85
-14
lines changed

ddtrace/contrib/internal/celery/app.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,6 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
133133
if task_span:
134134
task_span.set_exc_info(*sys.exc_info())
135135

136-
prerun_span = core.get_item("prerun_span")
137-
if prerun_span:
138-
prerun_span.set_exc_info(*sys.exc_info())
139-
140136
raise
141137
finally:
142138
task_span = core.get_item("task_span")
@@ -147,11 +143,4 @@ def _traced_apply_async_inner(func, instance, args, kwargs):
147143
)
148144
task_span.finish()
149145

150-
prerun_span = core.get_item("prerun_span")
151-
if prerun_span:
152-
log.debug(
153-
"The task_postrun signal was not called, so manually closing span: %s", prerun_span._pprint()
154-
)
155-
prerun_span.finish()
156-
157146
return _traced_apply_async_inner

ddtrace/contrib/internal/celery/signals.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ def trace_prerun(*args, **kwargs):
5454
service = config.celery["worker_service_name"]
5555
span = pin.tracer.trace(c.WORKER_ROOT_SPAN, service=service, resource=task.name, span_type=SpanTypes.WORKER)
5656

57-
# Store an item called "prerun span" in case task_postrun doesn't get called
58-
core.set_item("prerun_span", span)
59-
6057
# set span.kind to the type of request being performed
6158
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
6259

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing(celery): Fixes an issue where ``celery.apply`` spans from Celery prerun got closed too soon leading to span tags being missing.

tests/contrib/celery/run_tasks.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from tasks import fn_a
2+
from tasks import fn_b
3+
4+
5+
(fn_a.si() | fn_b.si()).delay()

tests/contrib/celery/tasks.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from celery import Celery
2+
3+
4+
app = Celery("tasks")
5+
6+
7+
@app.task(name="tests.contrib.celery.tasks.fn_a")
8+
def fn_a():
9+
return "a"
10+
11+
12+
@app.task(name="tests.contrib.celery.tasks.fn_b")
13+
def fn_b():
14+
return "b"
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import os
2+
import re
3+
import subprocess
4+
import time
5+
6+
from celery import Celery
7+
8+
9+
# Ensure that when we call Celery chains, the root span has celery specific span tags
10+
# The test_integration.py setup doesn't perfectly mimic the condition of a worker process running.
11+
# This test runs the worker as a side so we can check the tracer logs afterwards to ensure expected span results.
12+
# See https://github.com/DataDog/dd-trace-py/issues/11479
13+
def test_task_chain_task_call_task():
14+
app = Celery("tasks")
15+
16+
celery_worker_cmd = "ddtrace-run celery -A tasks worker -c 1 -l DEBUG -n uniquename1 -P solo"
17+
celery_task_runner_cmd = "ddtrace-run python run_tasks.py"
18+
19+
# The commands need to run from the directory where this test file lives
20+
current_directory = str(os.path.dirname(__file__))
21+
22+
worker_process = subprocess.Popen(
23+
celery_worker_cmd.split(),
24+
stdout=subprocess.PIPE,
25+
stderr=subprocess.PIPE,
26+
preexec_fn=os.setsid,
27+
close_fds=True,
28+
cwd=current_directory,
29+
)
30+
31+
max_wait_time = 10
32+
waited_so_far = 0
33+
# {app.control.inspect().active() returns {'celery@uniquename1': []} when the worker is running}
34+
while app.control.inspect().active() is None and waited_so_far < max_wait_time:
35+
time.sleep(1)
36+
waited_so_far += 1
37+
38+
# The task should only run after the Celery worker has sufficient time to start up
39+
task_runner_process = subprocess.Popen(
40+
celery_task_runner_cmd.split(),
41+
stdout=subprocess.PIPE,
42+
stderr=subprocess.PIPE,
43+
preexec_fn=os.setsid,
44+
close_fds=True,
45+
cwd=current_directory,
46+
)
47+
48+
task_runner_process.wait()
49+
# Kill the process so it starts to send traces to the Trace Agent
50+
worker_process.kill()
51+
worker_logs = worker_process.stderr.read()
52+
53+
# Check that the root span was created with one of the Celery specific tags, such as celery.correlation_id
54+
# Some versions of python seem to require escaping when using `re.search`:
55+
old_pattern_match = r"resource=\\'tests.contrib.celery.tasks.fn_a\\' type=\\'worker\\' .* tags=.*correlation_id.*"
56+
new_pattern_match = r"resource=\'tests.contrib.celery.tasks.fn_a\' type=\'worker\' .* tags=.*correlation_id.*"
57+
58+
pattern_exists = (
59+
re.search(old_pattern_match, str(worker_logs)) is not None
60+
or re.search(new_pattern_match, str(worker_logs)) is not None
61+
)
62+
assert pattern_exists is not None

0 commit comments

Comments
 (0)