Skip to content

Commit c753ca2

Browse files
committed
Prevent using trigger_rule="always" in a dynamic mapped task
1 parent 752f933 commit c753ca2

File tree

3 files changed

+47
-5
lines changed

3 files changed

+47
-5
lines changed

airflow/utils/task_group.py

+18-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from airflow.models.taskmixin import DAGNode
3838
from airflow.serialization.enums import DagAttributeTypes
3939
from airflow.utils.helpers import validate_group_key, validate_instance_args
40+
from airflow.utils.trigger_rule import TriggerRule
4041

4142
if TYPE_CHECKING:
4243
from sqlalchemy.orm import Session
@@ -220,10 +221,15 @@ def parent_group(self) -> TaskGroup | None:
220221

221222
def __iter__(self):
222223
for child in self.children.values():
223-
if isinstance(child, TaskGroup):
224-
yield from child
225-
else:
226-
yield child
224+
yield from self._iter_child(child)
225+
226+
@staticmethod
227+
def _iter_child(child):
228+
"""Iterate over the children of this TaskGroup."""
229+
if isinstance(child, TaskGroup):
230+
yield from child
231+
else:
232+
yield child
227233

228234
def add(self, task: DAGNode) -> DAGNode:
229235
"""
@@ -593,6 +599,14 @@ def __init__(self, *, expand_input: ExpandInput, **kwargs: Any) -> None:
593599
super().__init__(**kwargs)
594600
self._expand_input = expand_input
595601

602+
def __iter__(self):
603+
from airflow.models.abstractoperator import AbstractOperator
604+
605+
for child in self.children.values():
606+
if isinstance(child, AbstractOperator) and child.trigger_rule == TriggerRule.ALWAYS:
607+
raise ValueError("Tasks in a mapped task group cannot have trigger_rule set to 'ALWAYS'")
608+
yield from self._iter_child(child)
609+
596610
def iter_mapped_dependencies(self) -> Iterator[Operator]:
597611
"""Upstream dependencies that provide XComs used by this mapped task group."""
598612
from airflow.models.xcom_arg import XComArg

docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst

+5
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ The grid view also provides visibility into your mapped tasks in the details pan
8484

8585
Although we show a "reduce" task here (``sum_it``) you don't have to have one, the mapped tasks will still be executed even if they have no downstream tasks.
8686

87+
.. warning:: ``TriggerRule.ALWAYS`` cannot be utilized in expanded tasks
88+
89+
Assigning ``trigger_rule=TriggerRule.ALWAYS`` in expanded tasks is forbidden, as expanded parameters will be undefined with the task's immediate execution.
90+
This is enforced at the time of the DAG parsing, and will raise an error if you try to use it.
91+
8792
Task-generated Mapping
8893
----------------------
8994

tests/decorators/test_task_group.py

+24-1
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
import pendulum
2323
import pytest
2424

25-
from airflow.decorators import dag, task_group
25+
from airflow.decorators import dag, task, task_group
2626
from airflow.models.expandinput import DictOfListsExpandInput, ListOfDictsExpandInput, MappedArgument
2727
from airflow.operators.empty import EmptyOperator
2828
from airflow.utils.task_group import MappedTaskGroup
29+
from airflow.utils.trigger_rule import TriggerRule
2930

3031

3132
def test_task_group_with_overridden_kwargs():
@@ -133,6 +134,28 @@ def tg():
133134
assert str(ctx.value) == "no arguments to expand against"
134135

135136

137+
@pytest.mark.db_test
138+
def test_expand_fail_trigger_rule_always(dag_maker, session):
139+
@dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
140+
def pipeline():
141+
@task
142+
def get_param():
143+
return ["a", "b", "c"]
144+
145+
@task(trigger_rule=TriggerRule.ALWAYS)
146+
def t1(param):
147+
return param
148+
149+
@task_group()
150+
def tg(param):
151+
t1(param)
152+
153+
with pytest.raises(
154+
ValueError, match="Tasks in a mapped task group cannot have trigger_rule set to 'ALWAYS'"
155+
):
156+
tg.expand(param=get_param())
157+
158+
136159
def test_expand_create_mapped():
137160
saved = {}
138161

0 commit comments

Comments
 (0)