From 699bd0af2929cad6c33747624f1c6f50456006a9 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Wed, 17 Jun 2026 00:33:41 +0000 Subject: [PATCH 1/6] Exclude non-successful runs from AVERAGE_RUNTIME deadline calculation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DeadlineReference.AVERAGE_RUNTIME computes a deadline from the average duration of past DAG runs, but the query filtered only on dag_id + start/end-date present — with no DagRun.state filter. Failed runs (which may have died fast or hung before failing) were folded into the average, skewing the computed deadline: a fast-failing history makes it too short (spurious misses), a slow-then-failed history makes it too long (real slowness never trips it). Filter the duration query to successful runs only. Add tests asserting failed runs are excluded from the average and that the deadline is skipped when too few successful runs exist. --- .../serialization/definitions/deadline.py | 11 ++- .../tests/unit/models/test_deadline.py | 67 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py b/airflow-core/src/airflow/serialization/definitions/deadline.py index 89e231cba24dc..20fac2b54e874 100644 --- a/airflow-core/src/airflow/serialization/definitions/deadline.py +++ b/airflow-core/src/airflow/serialization/definitions/deadline.py @@ -207,6 +207,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None: from sqlalchemy import func, select, text from airflow.models import DagRun + from airflow.utils.state import DagRunState dag_id = kwargs["dag_id"] @@ -222,9 +223,17 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None: else: raise ValueError(f"Unsupported database dialect: {dialect}") + # Only SUCCESSFUL runs represent a "normal" runtime. A run that failed fast or hung + # before failing would otherwise skew the average and produce a misleading deadline + # (too short -> spurious misses, or too long -> real slowness never trips it). query = ( select(duration_expr) - .filter(DagRun.dag_id == dag_id, DagRun.start_date.isnot(None), DagRun.end_date.isnot(None)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == DagRunState.SUCCESS, + DagRun.start_date.isnot(None), + DagRun.end_date.isnot(None), + ) .order_by(DagRun.logical_date.desc()) .limit(self.max_runs) ) diff --git a/airflow-core/tests/unit/models/test_deadline.py b/airflow-core/tests/unit/models/test_deadline.py index 3635bcadcbea5..873520062f21f 100644 --- a/airflow-core/tests/unit/models/test_deadline.py +++ b/airflow-core/tests/unit/models/test_deadline.py @@ -519,6 +519,73 @@ def test_average_runtime_min_runs_validation(self): with pytest.raises(ValueError, match="min_runs must be at least 1"): DeadlineReference.AVERAGE_RUNTIME(max_runs=10, min_runs=-1) + def test_average_runtime_excludes_non_successful_runs(self, session, dag_maker): + """Only SUCCESSFUL runs contribute to the average; FAILED runs must be ignored. + + A failed run's duration is not representative of a normal runtime, so including it + would skew the computed deadline. Seed an equal mix of fast-successful and + slow-failed runs and assert the average reflects only the successful ones. + """ + with dag_maker(DAG_ID): + EmptyOperator(task_id="test_task") + + base_time = DEFAULT_DATE + success_duration = 60 # the only durations that should count + failed_duration = 36000 # 10h — would massively skew the average if (wrongly) counted + + # Interleave 3 successful (60s) and 3 failed (36000s) runs. + specs = [ + (DagRunState.SUCCESS, success_duration), + (DagRunState.FAILED, failed_duration), + (DagRunState.SUCCESS, success_duration), + (DagRunState.FAILED, failed_duration), + (DagRunState.SUCCESS, success_duration), + (DagRunState.FAILED, failed_duration), + ] + for i, (state, duration) in enumerate(specs): + logical_date = base_time + timedelta(days=i) + start_time = logical_date + timedelta(minutes=5) + dagrun = dag_maker.create_dagrun( + logical_date=logical_date, run_id=f"mix_run_{i}", state=state + ) + dagrun.start_date = start_time + dagrun.end_date = start_time + timedelta(seconds=duration) + + session.commit() + + # min_runs=3 so the 3 successful runs alone satisfy the minimum. + reference = SerializedReferenceModels.AverageRuntimeDeadline(max_runs=10, min_runs=3) + interval = timedelta(hours=1) + + with mock.patch("airflow._shared.timezones.timezone.utcnow") as mock_utcnow: + mock_utcnow.return_value = DEFAULT_DATE + result = reference.evaluate_with(session=session, interval=interval, dag_id=DAG_ID) + + # Average must be over the 3 successful 60s runs only (not the 36000s failures). + expected = DEFAULT_DATE + timedelta(seconds=success_duration) + interval + assert result.replace(second=0, microsecond=0) == expected.replace(second=0, microsecond=0) + + def test_average_runtime_skips_when_too_few_successful_runs(self, session, dag_maker): + """If only FAILED runs exist (fewer than min_runs successful), no deadline is created.""" + with dag_maker(DAG_ID): + EmptyOperator(task_id="test_task") + + base_time = DEFAULT_DATE + for i in range(5): + logical_date = base_time + timedelta(days=i) + start_time = logical_date + timedelta(minutes=5) + dagrun = dag_maker.create_dagrun( + logical_date=logical_date, run_id=f"failed_run_{i}", state=DagRunState.FAILED + ) + dagrun.start_date = start_time + dagrun.end_date = start_time + timedelta(seconds=3600) + + session.commit() + + reference = SerializedReferenceModels.AverageRuntimeDeadline(max_runs=10, min_runs=3) + result = reference.evaluate_with(session=session, interval=timedelta(hours=1), dag_id=DAG_ID) + assert result is None + class TestDeadlineReference: """DeadlineReference lives in definitions/deadlines.py but properly testing them requires DB access.""" From b1c41cff7f87379c09920918a81d89a65ffd0ab2 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Wed, 17 Jun 2026 05:38:54 +0000 Subject: [PATCH 2/6] Reformat: collapse wrapped create_dagrun call to satisfy ruff-format --- airflow-core/tests/unit/models/test_deadline.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/models/test_deadline.py b/airflow-core/tests/unit/models/test_deadline.py index 873520062f21f..4fc1aada1cc02 100644 --- a/airflow-core/tests/unit/models/test_deadline.py +++ b/airflow-core/tests/unit/models/test_deadline.py @@ -545,9 +545,7 @@ def test_average_runtime_excludes_non_successful_runs(self, session, dag_maker): for i, (state, duration) in enumerate(specs): logical_date = base_time + timedelta(days=i) start_time = logical_date + timedelta(minutes=5) - dagrun = dag_maker.create_dagrun( - logical_date=logical_date, run_id=f"mix_run_{i}", state=state - ) + dagrun = dag_maker.create_dagrun(logical_date=logical_date, run_id=f"mix_run_{i}", state=state) dagrun.start_date = start_time dagrun.end_date = start_time + timedelta(seconds=duration) From 116959984c32857b3c04317591df6846b908c540 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Mon, 22 Jun 2026 19:58:41 +0000 Subject: [PATCH 3/6] Clarify AVERAGE_RUNTIME docs to say successful runs The average and the min_runs threshold count successful runs only, so "previous Dag runs" / "completed runs" were imprecise. Generated-by: Claude Code (Opus 4.8) following the guidelines --- airflow-core/docs/howto/deadline-alerts.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/docs/howto/deadline-alerts.rst b/airflow-core/docs/howto/deadline-alerts.rst index e36908009a0f1..a823cecd9f8a0 100644 --- a/airflow-core/docs/howto/deadline-alerts.rst +++ b/airflow-core/docs/howto/deadline-alerts.rst @@ -110,14 +110,14 @@ Airflow provides several built-in reference points that you can use with Deadlin Specifies a fixed point in time. Useful when Dags must complete by a specific time. ``DeadlineReference.AVERAGE_RUNTIME`` - Calculates deadlines based on the average runtime of previous Dag runs. This reference + Calculates deadlines based on the average runtime of previous successful Dag runs. This reference analyzes historical execution data to predict when the current run should complete. The deadline is set to the current time plus the calculated average runtime plus the interval. If insufficient historical data exists, no deadline is created. Parameters: * ``max_runs`` (int, optional): Maximum number of recent Dag runs to analyze. Defaults to 10. - * ``min_runs`` (int, optional): Minimum number of completed runs required to calculate average. Defaults to same value as ``max_runs``. + * ``min_runs`` (int, optional): Minimum number of successful runs required to calculate average. Defaults to same value as ``max_runs``. Example usage: From 64355c41ad62f545a1a6e2f5009d03a20c71af8a Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Mon, 22 Jun 2026 21:25:00 +0000 Subject: [PATCH 4/6] Re-trigger CI (flaky serialization tests) From c317eb44e268ae8a038e085d41e4419cf42e6519 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli <58916776+seanghaeli@users.noreply.github.com> Date: Mon, 22 Jun 2026 20:29:47 -0700 Subject: [PATCH 5/6] Update airflow-core/docs/howto/deadline-alerts.rst Co-authored-by: Ramit Kataria --- airflow-core/docs/howto/deadline-alerts.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/docs/howto/deadline-alerts.rst b/airflow-core/docs/howto/deadline-alerts.rst index a823cecd9f8a0..1decec432ab7b 100644 --- a/airflow-core/docs/howto/deadline-alerts.rst +++ b/airflow-core/docs/howto/deadline-alerts.rst @@ -116,7 +116,7 @@ Airflow provides several built-in reference points that you can use with Deadlin If insufficient historical data exists, no deadline is created. Parameters: - * ``max_runs`` (int, optional): Maximum number of recent Dag runs to analyze. Defaults to 10. + * ``max_runs`` (int, optional): Maximum number of successful recent Dag runs to analyze. Defaults to 10. * ``min_runs`` (int, optional): Minimum number of successful runs required to calculate average. Defaults to same value as ``max_runs``. Example usage: From c3308292e25573980b548f65087514fdea811c48 Mon Sep 17 00:00:00 2001 From: Sean Ghaeli <58916776+seanghaeli@users.noreply.github.com> Date: Mon, 22 Jun 2026 20:29:54 -0700 Subject: [PATCH 6/6] Update airflow-core/docs/howto/deadline-alerts.rst Co-authored-by: Ramit Kataria --- airflow-core/docs/howto/deadline-alerts.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/docs/howto/deadline-alerts.rst b/airflow-core/docs/howto/deadline-alerts.rst index 1decec432ab7b..8e729516fefcb 100644 --- a/airflow-core/docs/howto/deadline-alerts.rst +++ b/airflow-core/docs/howto/deadline-alerts.rst @@ -117,7 +117,7 @@ Airflow provides several built-in reference points that you can use with Deadlin Parameters: * ``max_runs`` (int, optional): Maximum number of successful recent Dag runs to analyze. Defaults to 10. - * ``min_runs`` (int, optional): Minimum number of successful runs required to calculate average. Defaults to same value as ``max_runs``. + * ``min_runs`` (int, optional): Minimum number of successful recent Dag runs required to calculate average. Defaults to same value as ``max_runs``. Example usage: