Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from sqlalchemy.sql import select
from structlog.contextvars import bind_contextvars

from airflow._shared.observability.metrics import stats
from airflow._shared.observability.traces import override_ids
from airflow._shared.state import TaskScope
from airflow._shared.timezones import timezone
Expand Down Expand Up @@ -151,6 +152,9 @@ def ti_run(
TI.hostname,
TI.unixname,
TI.pid,
TI.queue,
TI.queued_dttm,
TI.end_date,
# This selects the raw JSON value, bypassing the deserialization -- we want that to happen on the
# client
column("next_kwargs", JSON),
Expand Down Expand Up @@ -230,6 +234,17 @@ def ti_run(
extra=json.dumps({"host_name": ti_run_payload.hostname}) if ti_run_payload.hostname else None,
)
)
# Emit task.queued_duration on the first QUEUED -> RUNNING transition. In Airflow 2 the
# worker emitted this from TaskInstance._check_and_change_state_before_execution; in
# Airflow 3 the worker reaches RUNNING through this endpoint, so emit here instead.
# Skip on retries (end_date already set from a previous attempt) to keep the legacy
# "only on first try" semantics from TaskInstance.emit_state_change_metric.
if ti.end_date is None and ti.queued_dttm is not None:
stats.timing(
"task.queued_duration",
timezone.utcnow() - ti.queued_dttm,
tags={"task_id": ti.task_id, "dag_id": ti.dag_id, "queue": ti.queue},
)
# Ensure there is no end date set and clear retry policy overrides from the previous attempt.
query = query.values(
end_date=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,90 @@ def test_ti_run_state_to_running(
)
assert response.status_code == 409

def test_ti_run_emits_queued_duration_metric(self, client, session, create_task_instance, time_machine):
"""
On the first QUEUED -> RUNNING transition the run endpoint should emit
``task.queued_duration`` (regression test for #66067 — the metric stopped
being emitted in Airflow 3 after the worker-side path was replaced by the
Execution API call).
"""
run_instant = timezone.parse("2024-09-30T12:00:30Z")
queued_instant = timezone.parse("2024-09-30T12:00:00Z")
time_machine.move_to(run_instant, tick=False)

ti = create_task_instance(
task_id="test_ti_run_queued_duration_metric",
state=State.QUEUED,
dagrun_state=DagRunState.RUNNING,
session=session,
start_date=run_instant,
dag_id=str(uuid4()),
)
ti.queued_dttm = queued_instant
ti.end_date = None
session.commit()

with mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats") as mock_stats:
response = client.patch(
f"/execution/task-instances/{ti.id}/run",
json={
"state": "running",
"hostname": "random-hostname",
"unixname": "random-unixname",
"pid": 100,
"start_date": run_instant.to_iso8601_string(),
},
)

assert response.status_code == 200
mock_stats.timing.assert_any_call(
"task.queued_duration",
run_instant - queued_instant,
tags={"task_id": ti.task_id, "dag_id": ti.dag_id, "queue": ti.queue},
)

def test_ti_run_skips_queued_duration_metric_on_retry(
self, client, session, create_task_instance, time_machine
):
"""
A retry transition (previous attempt left ``end_date`` populated before
the update zeros it) should not emit ``task.queued_duration`` — preserves
the legacy "only on first try" semantics from
``TaskInstance.emit_state_change_metric``.
"""
run_instant = timezone.parse("2024-09-30T12:05:00Z")
time_machine.move_to(run_instant, tick=False)

ti = create_task_instance(
task_id="test_ti_run_queued_duration_retry",
state=State.QUEUED,
dagrun_state=DagRunState.RUNNING,
session=session,
start_date=run_instant,
dag_id=str(uuid4()),
)
ti.queued_dttm = run_instant.subtract(seconds=30)
ti.end_date = run_instant.subtract(seconds=60)
session.commit()

with mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats") as mock_stats:
response = client.patch(
f"/execution/task-instances/{ti.id}/run",
json={
"state": "running",
"hostname": "random-hostname",
"unixname": "random-unixname",
"pid": 100,
"start_date": run_instant.to_iso8601_string(),
},
)

assert response.status_code == 200
timing_calls = [
c for c in mock_stats.timing.call_args_list if c.args and c.args[0] == "task.queued_duration"
]
assert timing_calls == []

def test_ti_run_returns_execution_token(
self, client, exec_app, session, create_task_instance, time_machine
):
Expand Down
Loading