Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow-core/docs/howto/deadline-alerts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ Airflow provides several built-in reference points that you can use with Deadlin
Specifies a fixed point in time. Useful when Dags must complete by a specific time.

``DeadlineReference.AVERAGE_RUNTIME``
Calculates deadlines based on the average runtime of previous Dag runs. This reference
Calculates deadlines based on the average runtime of previous successful Dag runs. This reference
analyzes historical execution data to predict when the current run should complete.
The deadline is set to the current time plus the calculated average runtime plus the interval.
If insufficient historical data exists, no deadline is created.

Parameters:
* ``max_runs`` (int, optional): Maximum number of recent Dag runs to analyze. Defaults to 10.
* ``min_runs`` (int, optional): Minimum number of completed runs required to calculate average. Defaults to same value as ``max_runs``.
* ``max_runs`` (int, optional): Maximum number of successful recent Dag runs to analyze. Defaults to 10.
* ``min_runs`` (int, optional): Minimum number of successful recent Dag runs required to calculate average. Defaults to same value as ``max_runs``.

Example usage:

Expand Down
11 changes: 10 additions & 1 deletion airflow-core/src/airflow/serialization/definitions/deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None:
from sqlalchemy import func, select, text

from airflow.models import DagRun
from airflow.utils.state import DagRunState

dag_id = kwargs["dag_id"]

Expand All @@ -222,9 +223,17 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None:
else:
raise ValueError(f"Unsupported database dialect: {dialect}")

# Only SUCCESSFUL runs represent a "normal" runtime. A run that failed fast or hung
Comment thread
seanghaeli marked this conversation as resolved.
# before failing would otherwise skew the average and produce a misleading deadline
# (too short -> spurious misses, or too long -> real slowness never trips it).
query = (
select(duration_expr)
.filter(DagRun.dag_id == dag_id, DagRun.start_date.isnot(None), DagRun.end_date.isnot(None))
.filter(
DagRun.dag_id == dag_id,
DagRun.state == DagRunState.SUCCESS,
DagRun.start_date.isnot(None),
DagRun.end_date.isnot(None),
)
.order_by(DagRun.logical_date.desc())
.limit(self.max_runs)
)
Expand Down
65 changes: 65 additions & 0 deletions airflow-core/tests/unit/models/test_deadline.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,71 @@ def test_average_runtime_min_runs_validation(self):
with pytest.raises(ValueError, match="min_runs must be at least 1"):
DeadlineReference.AVERAGE_RUNTIME(max_runs=10, min_runs=-1)

def test_average_runtime_excludes_non_successful_runs(self, session, dag_maker):
"""Only SUCCESSFUL runs contribute to the average; FAILED runs must be ignored.

A failed run's duration is not representative of a normal runtime, so including it
would skew the computed deadline. Seed an equal mix of fast-successful and
slow-failed runs and assert the average reflects only the successful ones.
"""
with dag_maker(DAG_ID):
EmptyOperator(task_id="test_task")

base_time = DEFAULT_DATE
success_duration = 60 # the only durations that should count
failed_duration = 36000 # 10h — would massively skew the average if (wrongly) counted

# Interleave 3 successful (60s) and 3 failed (36000s) runs.
specs = [
(DagRunState.SUCCESS, success_duration),
(DagRunState.FAILED, failed_duration),
(DagRunState.SUCCESS, success_duration),
(DagRunState.FAILED, failed_duration),
(DagRunState.SUCCESS, success_duration),
(DagRunState.FAILED, failed_duration),
]
for i, (state, duration) in enumerate(specs):
logical_date = base_time + timedelta(days=i)
start_time = logical_date + timedelta(minutes=5)
dagrun = dag_maker.create_dagrun(logical_date=logical_date, run_id=f"mix_run_{i}", state=state)
dagrun.start_date = start_time
dagrun.end_date = start_time + timedelta(seconds=duration)

session.commit()

# min_runs=3 so the 3 successful runs alone satisfy the minimum.
reference = SerializedReferenceModels.AverageRuntimeDeadline(max_runs=10, min_runs=3)
interval = timedelta(hours=1)

with mock.patch("airflow._shared.timezones.timezone.utcnow") as mock_utcnow:
mock_utcnow.return_value = DEFAULT_DATE
result = reference.evaluate_with(session=session, interval=interval, dag_id=DAG_ID)

# Average must be over the 3 successful 60s runs only (not the 36000s failures).
expected = DEFAULT_DATE + timedelta(seconds=success_duration) + interval
assert result.replace(second=0, microsecond=0) == expected.replace(second=0, microsecond=0)

def test_average_runtime_skips_when_too_few_successful_runs(self, session, dag_maker):
"""If only FAILED runs exist (fewer than min_runs successful), no deadline is created."""
with dag_maker(DAG_ID):
EmptyOperator(task_id="test_task")

base_time = DEFAULT_DATE
for i in range(5):
logical_date = base_time + timedelta(days=i)
start_time = logical_date + timedelta(minutes=5)
dagrun = dag_maker.create_dagrun(
logical_date=logical_date, run_id=f"failed_run_{i}", state=DagRunState.FAILED
)
dagrun.start_date = start_time
dagrun.end_date = start_time + timedelta(seconds=3600)

session.commit()

reference = SerializedReferenceModels.AverageRuntimeDeadline(max_runs=10, min_runs=3)
result = reference.evaluate_with(session=session, interval=timedelta(hours=1), dag_id=DAG_ID)
assert result is None


class TestDeadlineReference:
"""DeadlineReference lives in definitions/deadlines.py but properly testing them requires DB access."""
Expand Down
Loading