diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index c2d67926a21e8..6d315a6bac52d 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -40,6 +40,7 @@ from sqlalchemy.sql import select from structlog.contextvars import bind_contextvars +from airflow._shared.observability.metrics import stats from airflow._shared.observability.traces import override_ids from airflow._shared.state import TaskScope from airflow._shared.timezones import timezone @@ -151,6 +152,9 @@ def ti_run( TI.hostname, TI.unixname, TI.pid, + TI.queue, + TI.queued_dttm, + TI.end_date, # This selects the raw JSON value, bypassing the deserialization -- we want that to happen on the # client column("next_kwargs", JSON), @@ -230,6 +234,17 @@ def ti_run( extra=json.dumps({"host_name": ti_run_payload.hostname}) if ti_run_payload.hostname else None, ) ) + # Emit task.queued_duration on the first QUEUED -> RUNNING transition. In Airflow 2 the + # worker emitted this from TaskInstance._check_and_change_state_before_execution; in + # Airflow 3 the worker reaches RUNNING through this endpoint, so emit here instead. + # Skip on retries (end_date already set from a previous attempt) to keep the legacy + # "only on first try" semantics from TaskInstance.emit_state_change_metric. + if ti.end_date is None and ti.queued_dttm is not None: + stats.timing( + "task.queued_duration", + timezone.utcnow() - ti.queued_dttm, + tags={"task_id": ti.task_id, "dag_id": ti.dag_id, "queue": ti.queue}, + ) # Ensure there is no end date set and clear retry policy overrides from the previous attempt. query = query.values( end_date=None, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 2d15e548a13a4..c05559a71dfff 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -289,6 +289,90 @@ def test_ti_run_state_to_running( ) assert response.status_code == 409 + def test_ti_run_emits_queued_duration_metric(self, client, session, create_task_instance, time_machine): + """ + On the first QUEUED -> RUNNING transition the run endpoint should emit + ``task.queued_duration`` (regression test for #66067 — the metric stopped + being emitted in Airflow 3 after the worker-side path was replaced by the + Execution API call). + """ + run_instant = timezone.parse("2024-09-30T12:00:30Z") + queued_instant = timezone.parse("2024-09-30T12:00:00Z") + time_machine.move_to(run_instant, tick=False) + + ti = create_task_instance( + task_id="test_ti_run_queued_duration_metric", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=run_instant, + dag_id=str(uuid4()), + ) + ti.queued_dttm = queued_instant + ti.end_date = None + session.commit() + + with mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats") as mock_stats: + response = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "random-hostname", + "unixname": "random-unixname", + "pid": 100, + "start_date": run_instant.to_iso8601_string(), + }, + ) + + assert response.status_code == 200 + mock_stats.timing.assert_any_call( + "task.queued_duration", + run_instant - queued_instant, + tags={"task_id": ti.task_id, "dag_id": ti.dag_id, "queue": ti.queue}, + ) + + def test_ti_run_skips_queued_duration_metric_on_retry( + self, client, session, create_task_instance, time_machine + ): + """ + A retry transition (previous attempt left ``end_date`` populated before + the update zeros it) should not emit ``task.queued_duration`` — preserves + the legacy "only on first try" semantics from + ``TaskInstance.emit_state_change_metric``. + """ + run_instant = timezone.parse("2024-09-30T12:05:00Z") + time_machine.move_to(run_instant, tick=False) + + ti = create_task_instance( + task_id="test_ti_run_queued_duration_retry", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=run_instant, + dag_id=str(uuid4()), + ) + ti.queued_dttm = run_instant.subtract(seconds=30) + ti.end_date = run_instant.subtract(seconds=60) + session.commit() + + with mock.patch("airflow.api_fastapi.execution_api.routes.task_instances.stats") as mock_stats: + response = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "random-hostname", + "unixname": "random-unixname", + "pid": 100, + "start_date": run_instant.to_iso8601_string(), + }, + ) + + assert response.status_code == 200 + timing_calls = [ + c for c in mock_stats.timing.call_args_list if c.args and c.args[0] == "task.queued_duration" + ] + assert timing_calls == [] + def test_ti_run_returns_execution_token( self, client, exec_app, session, create_task_instance, time_machine ):