Skip to content

Emit task.queued_duration metric on QUEUED -> RUNNING in Task SDK path#67190

Open
guhyunwoo wants to merge 10 commits into
apache:mainfrom
guhyunwoo:fix/task-queued-duration-metric-66067
Open

Emit task.queued_duration metric on QUEUED -> RUNNING in Task SDK path#67190
guhyunwoo wants to merge 10 commits into
apache:mainfrom
guhyunwoo:fix/task-queued-duration-metric-66067

Conversation

@guhyunwoo
Copy link
Copy Markdown
Contributor

closes: #66067

task.queued_duration (legacy: dag.<dag_id>.<task_id>.queued_duration) stopped being emitted in Airflow 3 regardless of executor (LocalExecutor, CeleryExecutor, KubernetesExecutor). The companion (LocalExecutor, CeleryExecutor, KubernetesExecutor). The companion metric task.scheduled_duration still emits.

In Airflow 2 the worker emitted task.queued_duration from TaskInstance._check_and_change_state_before_execution (airflow-core/src/airflow/models/taskinstance.py:1366) on the QUEUED → RUNNING transition. In Airflow 3 the worker no longer touches the DB directly — it reaches RUNNING through the Execution API endpoint PATCH /execution/task-instances/{id}/run (airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py), which never called TaskInstance.emit_state_change_metric. The companion metric kept working because the scheduler still emits task.scheduled_duration directly from scheduler_job_runner.py:990 when it queues the TI — that path was not affected by the worker/API split.

Emit task.queued_duration inline from the run endpoint on the first QUEUED → RUNNING transition. The legacy TaskInstance.emit_state_change_metric is not reused here because the handler operates on a partial-column Row (not a TI ORM instance) and the update later resets end_date to None, which would defeat the method's "only on first try" guard for retries. Mirroring the guard inline (end_date is None and queued_dttm is not None) preserves the legacy semantics — including not emitting on retries — and uses the same tag set (task_id, dag_id, queue).

Two regression tests in airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:

  • test_ti_run_emits_queued_duration_metric — first transition emits with the expected metric name, timing (now - queued_dttm), and tags.
  • test_ti_run_skips_queued_duration_metric_on_retry — a retry (previous attempt's end_date populated) does not emit, matching legacy behavior.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

In Airflow 2 the worker emitted task.queued_duration from
TaskInstance._check_and_change_state_before_execution on the
QUEUED -> RUNNING transition. In Airflow 3 the worker reaches RUNNING
through the Execution API endpoint
PATCH /execution/task-instances/{id}/run, which never called
TaskInstance.emit_state_change_metric — so task.queued_duration stopped
being emitted on every executor while task.scheduled_duration (still
emitted by the scheduler-side path) kept working.

Emit task.queued_duration inline from the run endpoint on the first
QUEUED -> RUNNING transition, preserving the legacy
"only on first try" semantics (skip when end_date is already set from
a previous attempt) and the same tag set (task_id, dag_id, queue).

closes: apache#66067
@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:task-sdk labels May 19, 2026
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label May 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:task-sdk ready for maintainer review Set after triaging when all criteria pass.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

task.queued_duration metric not emitted in Airflow 3 (all executors)

2 participants