From a84e1b9ef6cb6ecc1b51c425b2b929a1b7fb5229 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 28 Apr 2026 12:35:24 +0530 Subject: [PATCH 1/9] Do not deserialize trigger_kwargs when loading serialized DAGs --- .../serialization/serialized_objects.py | 10 ++---- .../serialization/test_dag_serialization.py | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 9a0c7ab11d7e1..2fc3aaa38e9f1 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -251,17 +251,11 @@ def serialize_kwargs(key: str) -> Any: def _decode_start_trigger_args(var: dict[str, Any]) -> StartTriggerArgs: """Decode a StartTriggerArgs.""" - - def deserialize_kwargs(key: str) -> Any: - if (val := var[key]) is None: - return None - return BaseSerialization.deserialize(val) - return StartTriggerArgs( trigger_cls=var["trigger_cls"], - trigger_kwargs=deserialize_kwargs("trigger_kwargs"), + trigger_kwargs=var["trigger_kwargs"], next_method=var["next_method"], - next_kwargs=deserialize_kwargs("next_kwargs"), + next_kwargs=var["next_kwargs"], timeout=datetime.timedelta(seconds=var["timeout"]) if var["timeout"] else None, ) diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 375b13dea3561..fb2c4da457934 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -2627,6 +2627,38 @@ def execute_complete(self): } assert tasks[1]["__var"]["start_from_trigger"] is True + def test_trigger_kwargs_not_deserialised_through_serdag(self): + """trigger_kwargs are not deserialized when loading a serialized DAG.""" + + class TestOperator(BaseOperator): + start_trigger_args = StartTriggerArgs( + trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger", + trigger_kwargs={"delta": timedelta(seconds=2)}, + next_method="execute_complete", + next_kwargs=None, + timeout=None, + ) + start_from_trigger = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def execute_complete(self): + pass + + dag = DAG(dag_id="test_dag_kwargs_raw", schedule=None, start_date=datetime(2023, 11, 9)) + with dag: + TestOperator(task_id="test_task") + + serialized = DagSerialization.to_dict(dag) + deserialized_dag = DagSerialization.from_dict(serialized) + + task = deserialized_dag.get_task("test_task") + assert task.start_trigger_args.trigger_kwargs == { + "__type": "dict", + "__var": {"delta": {"__type": "timedelta", "__var": 2.0}}, + } + def test_kubernetes_optional(): """Test that serialization module loads without kubernetes, but deserialization of PODs requires it""" From a3a6bba61ecbf6ec8f476ec7af0049cba380799e Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 28 Apr 2026 14:30:01 +0530 Subject: [PATCH 2/9] Do not deserialize trigger_kwargs when loading serialized DAGs --- airflow-core/src/airflow/serialization/enums.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow-core/src/airflow/serialization/enums.py b/airflow-core/src/airflow/serialization/enums.py index 5fdd4d6698793..d84acd6388a72 100644 --- a/airflow-core/src/airflow/serialization/enums.py +++ b/airflow-core/src/airflow/serialization/enums.py @@ -30,6 +30,9 @@ class Encoding(str, Enum): TYPE = "__type" VAR = "__var" + def __str__(self) -> str: + return self.value + # Supported types for encoding. primitives and list are not encoded. @unique From b1717e43d94790b5ed16a03cff3fd9f0a4611f28 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Tue, 28 Apr 2026 15:35:02 +0530 Subject: [PATCH 3/9] fixing broken tests --- airflow-core/tests/unit/jobs/test_triggerer_job.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 8355fa1326254..60f5d21f133de 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -646,15 +646,7 @@ async def test_trigger_kwargs_serialization_cleanup(self, session): session.commit() stored_kwargs = trigger_orm.kwargs - assert stored_kwargs == { - "Encoding.TYPE": "dict", - "Encoding.VAR": { - "dict": {"Encoding.TYPE": "dict", "Encoding.VAR": {}}, - "list": [], - "simple": "test", - "tuple": {"Encoding.TYPE": "tuple", "Encoding.VAR": []}, - }, - } + assert stored_kwargs == kw runner = TriggerRunner() runner.to_create.append( From 63e8e2f0724eb0d7612a5889839cc8364bec42ef Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Thu, 30 Apr 2026 12:57:08 +0530 Subject: [PATCH 4/9] undoing the changes to Encoding and manually hooking in --- airflow-core/src/airflow/models/taskinstance.py | 14 +++++++++++++- airflow-core/src/airflow/serialization/enums.py | 3 --- .../unit/serialization/test_dag_serialization.py | 16 ++++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 03c4c2c9b8b4a..5b6bf9dc72b2f 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1633,7 +1633,19 @@ def defer_task(self, session: Session = NEW_SESSION) -> bool: assert isinstance(self.task, Operator) if start_trigger_args := self.start_trigger_args: - trigger_kwargs = start_trigger_args.trigger_kwargs or {} + from airflow.serialization.enums import Encoding + + def _normalize(d: Any) -> Any: + # trigger_kwargs arrives with Encoding enum keys from BaseSerialization. + # On Python 3.10, str(Encoding.TYPE) returns "Encoding.TYPE" not "__type", + # so we convert enum keys to their values before passing to serde.serialize. + if isinstance(d, dict): + return { + (k.value if isinstance(k, Encoding) else str(k)): _normalize(v) for k, v in d.items() + } + return d + + trigger_kwargs = _normalize(start_trigger_args.trigger_kwargs or {}) timeout = start_trigger_args.timeout # Calculate timeout too if it was passed diff --git a/airflow-core/src/airflow/serialization/enums.py b/airflow-core/src/airflow/serialization/enums.py index d84acd6388a72..5fdd4d6698793 100644 --- a/airflow-core/src/airflow/serialization/enums.py +++ b/airflow-core/src/airflow/serialization/enums.py @@ -30,9 +30,6 @@ class Encoding(str, Enum): TYPE = "__type" VAR = "__var" - def __str__(self) -> str: - return self.value - # Supported types for encoding. primitives and list are not encoded. @unique diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index fb2c4da457934..2654ec3408b36 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -815,6 +815,8 @@ def validate_deserialized_task( "on_failure_fail_dagrun", "_needs_expansion", "_is_sensor", + # trigger_kwargs is kept as raw JSON after deserialization; checked separately + "start_trigger_args", } else: # Promised to be mapped by the assert above. assert isinstance(serialized_task, SerializedMappedOperator) @@ -855,6 +857,20 @@ def validate_deserialized_task( else: assert serialized_task.resources == task.resources + # start_trigger_args: trigger_kwargs is kept as raw JSON after deserialization; + # compare after deserializing both sides + if task.start_trigger_args is not None: + from airflow.serialization.serialized_objects import BaseSerialization + + s = serialized_task.start_trigger_args + o = task.start_trigger_args + assert s.trigger_cls == o.trigger_cls + assert s.next_method == o.next_method + assert s.timeout == o.timeout + assert BaseSerialization.deserialize(s.trigger_kwargs or {}) == BaseSerialization.deserialize( + o.trigger_kwargs or {} + ) + assert [ensure_serialized_asset(i) for i in task.inlets] == serialized_task.inlets assert [ensure_serialized_asset(o) for o in task.outlets] == serialized_task.outlets From 130bf586d8760301b1be74b4178d8361973be6e3 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 4 May 2026 17:13:12 +0530 Subject: [PATCH 5/9] reverting older change --- airflow-core/tests/unit/jobs/test_triggerer_job.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 60f5d21f133de..1f6bbf01cf782 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -645,9 +645,6 @@ async def test_trigger_kwargs_serialization_cleanup(self, session): session.add(trigger_orm) session.commit() - stored_kwargs = trigger_orm.kwargs - assert stored_kwargs == kw - runner = TriggerRunner() runner.to_create.append( workloads.RunTrigger.model_construct( From b54a9f49d68806e9600ce4b17723cb109e881570 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 4 May 2026 17:47:54 +0530 Subject: [PATCH 6/9] fixing failing tests --- .../src/airflow/models/taskinstance.py | 14 +---------- airflow-core/src/airflow/models/trigger.py | 23 ++++++++++++++++++- airflow-core/src/airflow/triggers/base.py | 13 ++++++++--- .../tests/unit/jobs/test_triggerer_job.py | 3 +++ airflow-core/tests/unit/models/test_dagrun.py | 1 - .../serialization/test_dag_serialization.py | 4 +--- 6 files changed, 37 insertions(+), 21 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5b6bf9dc72b2f..03c4c2c9b8b4a 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1633,19 +1633,7 @@ def defer_task(self, session: Session = NEW_SESSION) -> bool: assert isinstance(self.task, Operator) if start_trigger_args := self.start_trigger_args: - from airflow.serialization.enums import Encoding - - def _normalize(d: Any) -> Any: - # trigger_kwargs arrives with Encoding enum keys from BaseSerialization. - # On Python 3.10, str(Encoding.TYPE) returns "Encoding.TYPE" not "__type", - # so we convert enum keys to their values before passing to serde.serialize. - if isinstance(d, dict): - return { - (k.value if isinstance(k, Encoding) else str(k)): _normalize(v) for k, v in d.items() - } - return d - - trigger_kwargs = _normalize(start_trigger_args.trigger_kwargs or {}) + trigger_kwargs = start_trigger_args.trigger_kwargs or {} timeout = start_trigger_args.timeout # Calculate timeout too if it was passed diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 589725ced2c09..333d28523114c 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -71,6 +71,27 @@ class TriggerFailureReason(str, Enum): TRIGGER_FAILURE = "Trigger failure" +def _stringify_encoding_keys(d: Any) -> Any: + """ + Convert BaseSerialization Encoding enum keys to their string values recursively. + + On Python 3.10, str(Encoding.TYPE) returns "Encoding.TYPE" instead of "__type__", + so serde.serialize confuses it. Converting keys to their value strings + here ensures serde stores and retrieves them correctly on all Python versions. + """ + from airflow.serialization.enums import Encoding + + if isinstance(d, dict): + return { + (k.value if isinstance(k, Encoding) else str(k)): _stringify_encoding_keys(v) + for k, v in d.items() + } + if isinstance(d, (list, tuple)): + converted = [_stringify_encoding_keys(i) for i in d] + return type(d)(converted) + return d + + class Trigger(Base): """ Base Trigger class. @@ -146,7 +167,7 @@ def encrypt_kwargs(kwargs: dict[str, Any]) -> str: from airflow.models.crypto import get_fernet from airflow.sdk.serde import serialize - serialized_kwargs = serialize(kwargs) + serialized_kwargs = serialize(_stringify_encoding_keys(kwargs)) return get_fernet().encrypt(json.dumps(serialized_kwargs).encode("utf-8")).decode("utf-8") @staticmethod diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index b37448edfea19..5bf7f2f20eaac 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -120,9 +120,16 @@ def task_instance(self, value: TaskInstance | None) -> None: # does not build a template context, so render_template_fields is # never called and empty template_fields is safe. start_trigger_args = getattr(self.task, "start_trigger_args", None) - trigger_kwarg_keys = ( - set((start_trigger_args.trigger_kwargs or {}).keys()) if start_trigger_args else set() - ) + if start_trigger_args: + from airflow.serialization.enums import Encoding + + raw = start_trigger_args.trigger_kwargs or {} + # trigger_kwargs may be BaseSerialization-encoded; extract inner dict keys + if isinstance(raw, dict) and Encoding.TYPE in raw: + raw = raw.get(Encoding.VAR) or {} + trigger_kwarg_keys = set(raw.keys()) + else: + trigger_kwarg_keys = set() if trigger_kwarg_keys: self.template_fields = tuple( f for f in self.task.template_fields if f in trigger_kwarg_keys and hasattr(self, f) diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py b/airflow-core/tests/unit/jobs/test_triggerer_job.py index 1f6bbf01cf782..60f5d21f133de 100644 --- a/airflow-core/tests/unit/jobs/test_triggerer_job.py +++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py @@ -645,6 +645,9 @@ async def test_trigger_kwargs_serialization_cleanup(self, session): session.add(trigger_orm) session.commit() + stored_kwargs = trigger_orm.kwargs + assert stored_kwargs == kw + runner = TriggerRunner() runner.to_create.append( workloads.RunTrigger.model_construct( diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index dd34c2d10e7ea..9ce3af636771a 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -2260,7 +2260,6 @@ def test_schedule_tis_only_one_scheduler_update_succeeds_when_competing(dag_make assert refreshed_ti.try_number == 1 -@pytest.mark.xfail(reason="We can't keep this behaviour with remote workers where scheduler can't reach xcom") @pytest.mark.need_serialized_dag def test_schedule_tis_start_trigger(dag_maker, session): """ diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 2654ec3408b36..b69d5c6d3a7a3 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -867,9 +867,7 @@ def validate_deserialized_task( assert s.trigger_cls == o.trigger_cls assert s.next_method == o.next_method assert s.timeout == o.timeout - assert BaseSerialization.deserialize(s.trigger_kwargs or {}) == BaseSerialization.deserialize( - o.trigger_kwargs or {} - ) + assert BaseSerialization.deserialize(s.trigger_kwargs or {}) == (o.trigger_kwargs or {}) assert [ensure_serialized_asset(i) for i in task.inlets] == serialized_task.inlets assert [ensure_serialized_asset(o) for o in task.outlets] == serialized_task.outlets From 94915715b86735a229ee418a1cf62782794bde77 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 4 May 2026 17:56:10 +0530 Subject: [PATCH 7/9] fixing failing tests --- .../src/airflow/models/taskinstance.py | 4 ++- airflow-core/src/airflow/models/trigger.py | 8 +++-- airflow-core/tests/unit/models/test_dagrun.py | 36 +++++++++++++++++++ .../serialization/test_dag_serialization.py | 16 ++++++--- 4 files changed, 55 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 03c4c2c9b8b4a..fea6cfc9a1d68 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1658,7 +1658,9 @@ def defer_task(self, session: Session = NEW_SESSION) -> bool: self.state = TaskInstanceState.DEFERRED self.trigger_id = trigger_row.id self.next_method = start_trigger_args.next_method - self.next_kwargs = start_trigger_args.next_kwargs or {} + from airflow.models.trigger import _stringify_encoding_keys + + self.next_kwargs = _stringify_encoding_keys(start_trigger_args.next_kwargs or {}) self.start_date = timezone.utcnow() # If an execution_timeout is set, set the timeout to the minimum of diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 333d28523114c..29c3693e983cf 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -75,9 +75,11 @@ def _stringify_encoding_keys(d: Any) -> Any: """ Convert BaseSerialization Encoding enum keys to their string values recursively. - On Python 3.10, str(Encoding.TYPE) returns "Encoding.TYPE" instead of "__type__", - so serde.serialize confuses it. Converting keys to their value strings - here ensures serde stores and retrieves them correctly on all Python versions. + Python 3.10 compatibility: str(Encoding.TYPE) returns "Encoding.TYPE" on 3.10 + instead of "__type__" (3.10 is still the default CI target). serde.serialize + uses str(k) for dict keys, so without this conversion the encrypted blob ends up + with "Encoding.TYPE" keys that neither serde._convert nor the BaseSerialization + fallback can read back. """ from airflow.serialization.enums import Encoding diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 9ce3af636771a..de72ff1d2e7fe 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -2260,6 +2260,7 @@ def test_schedule_tis_only_one_scheduler_update_succeeds_when_competing(dag_make assert refreshed_ti.try_number == 1 +@pytest.mark.xfail(reason="We can't keep this behaviour with remote workers where scheduler can't reach xcom") @pytest.mark.need_serialized_dag def test_schedule_tis_start_trigger(dag_maker, session): """ @@ -2294,6 +2295,41 @@ def execute_complete(self): assert ti.state == TaskInstanceState.DEFERRED +@pytest.mark.need_serialized_dag +def test_schedule_tis_start_trigger_next_kwargs_round_trip(dag_maker, session): + """next_kwargs with encoded values (timedelta) must survive the defer_task round-trip.""" + import datetime + + from airflow.sdk.serde import deserialize + + class TestOperator(BaseOperator): + start_trigger_args = StartTriggerArgs( + trigger_cls="airflow.triggers.testing.SuccessTrigger", + trigger_kwargs={}, + next_method="execute_complete", + next_kwargs={"delay": datetime.timedelta(seconds=30)}, + timeout=None, + ) + start_from_trigger = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def execute_complete(self): + pass + + with dag_maker(session=session): + TestOperator(task_id="test_task") + + dr: DagRun = dag_maker.create_dagrun() + ti = dr.get_task_instance("test_task") + ti.task = dr.dag.get_task("test_task") + dr.schedule_tis((ti,), session=session) + + assert ti.state == TaskInstanceState.DEFERRED + assert deserialize(ti.next_kwargs) == {"delay": datetime.timedelta(seconds=30)} + + def test_schedule_tis_empty_operator_try_number(dag_maker, session: Session): """ When empty operator is not actually run, then we need to increment the try_number, diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index b69d5c6d3a7a3..bb850e6ed573c 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -857,8 +857,10 @@ def validate_deserialized_task( else: assert serialized_task.resources == task.resources - # start_trigger_args: trigger_kwargs is kept as raw JSON after deserialization; - # compare after deserializing both sides + # start_trigger_args: trigger_kwargs is kept as raw BaseSerialization-encoded form + # after deserialization. Compare the encoded forms directly — s.trigger_kwargs is + # exactly BaseSerialization.serialize(o.trigger_kwargs) since _encode_start_trigger_args + # serializes it and _decode_start_trigger_args keeps it raw. if task.start_trigger_args is not None: from airflow.serialization.serialized_objects import BaseSerialization @@ -867,7 +869,7 @@ def validate_deserialized_task( assert s.trigger_cls == o.trigger_cls assert s.next_method == o.next_method assert s.timeout == o.timeout - assert BaseSerialization.deserialize(s.trigger_kwargs or {}) == (o.trigger_kwargs or {}) + assert s.trigger_kwargs == BaseSerialization.serialize(o.trigger_kwargs or {}) assert [ensure_serialized_asset(i) for i in task.inlets] == serialized_task.inlets assert [ensure_serialized_asset(o) for o in task.outlets] == serialized_task.outlets @@ -2642,14 +2644,14 @@ def execute_complete(self): assert tasks[1]["__var"]["start_from_trigger"] is True def test_trigger_kwargs_not_deserialised_through_serdag(self): - """trigger_kwargs are not deserialized when loading a serialized DAG.""" + """trigger_kwargs and next_kwargs are kept as raw BaseSerialization JSON when loading a serialized DAG.""" class TestOperator(BaseOperator): start_trigger_args = StartTriggerArgs( trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger", trigger_kwargs={"delta": timedelta(seconds=2)}, next_method="execute_complete", - next_kwargs=None, + next_kwargs={"resume_after": timedelta(seconds=5)}, timeout=None, ) start_from_trigger = True @@ -2672,6 +2674,10 @@ def execute_complete(self): "__type": "dict", "__var": {"delta": {"__type": "timedelta", "__var": 2.0}}, } + assert task.start_trigger_args.next_kwargs == { + "__type": "dict", + "__var": {"resume_after": {"__type": "timedelta", "__var": 5.0}}, + } def test_kubernetes_optional(): From 1278f3b14787926636be15559ed06d90a1f94e87 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Mon, 4 May 2026 19:26:22 +0530 Subject: [PATCH 8/9] making hash predictable --- .../src/airflow/models/taskinstance.py | 4 ++-- airflow-core/src/airflow/models/trigger.py | 24 +------------------ .../src/airflow/serialization/enums.py | 21 ++++++++++++++++ airflow-core/src/airflow/triggers/base.py | 4 +++- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index fea6cfc9a1d68..b5174643c52f5 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1658,9 +1658,9 @@ def defer_task(self, session: Session = NEW_SESSION) -> bool: self.state = TaskInstanceState.DEFERRED self.trigger_id = trigger_row.id self.next_method = start_trigger_args.next_method - from airflow.models.trigger import _stringify_encoding_keys + from airflow.serialization.enums import stringify_encoding_keys - self.next_kwargs = _stringify_encoding_keys(start_trigger_args.next_kwargs or {}) + self.next_kwargs = stringify_encoding_keys(start_trigger_args.next_kwargs or {}) self.start_date = timezone.utcnow() # If an execution_timeout is set, set the timeout to the minimum of diff --git a/airflow-core/src/airflow/models/trigger.py b/airflow-core/src/airflow/models/trigger.py index 29c3693e983cf..17584a594fdfb 100644 --- a/airflow-core/src/airflow/models/trigger.py +++ b/airflow-core/src/airflow/models/trigger.py @@ -35,6 +35,7 @@ from airflow.models.asset import AssetWatcherModel from airflow.models.base import Base from airflow.models.taskinstance import TaskInstance +from airflow.serialization.enums import stringify_encoding_keys as _stringify_encoding_keys from airflow.triggers.base import BaseTaskEndEvent from airflow.utils.retries import run_with_db_retries from airflow.utils.session import NEW_SESSION, provide_session @@ -71,29 +72,6 @@ class TriggerFailureReason(str, Enum): TRIGGER_FAILURE = "Trigger failure" -def _stringify_encoding_keys(d: Any) -> Any: - """ - Convert BaseSerialization Encoding enum keys to their string values recursively. - - Python 3.10 compatibility: str(Encoding.TYPE) returns "Encoding.TYPE" on 3.10 - instead of "__type__" (3.10 is still the default CI target). serde.serialize - uses str(k) for dict keys, so without this conversion the encrypted blob ends up - with "Encoding.TYPE" keys that neither serde._convert nor the BaseSerialization - fallback can read back. - """ - from airflow.serialization.enums import Encoding - - if isinstance(d, dict): - return { - (k.value if isinstance(k, Encoding) else str(k)): _stringify_encoding_keys(v) - for k, v in d.items() - } - if isinstance(d, (list, tuple)): - converted = [_stringify_encoding_keys(i) for i in d] - return type(d)(converted) - return d - - class Trigger(Base): """ Base Trigger class. diff --git a/airflow-core/src/airflow/serialization/enums.py b/airflow-core/src/airflow/serialization/enums.py index 5fdd4d6698793..738006807c708 100644 --- a/airflow-core/src/airflow/serialization/enums.py +++ b/airflow-core/src/airflow/serialization/enums.py @@ -20,6 +20,7 @@ from __future__ import annotations from enum import Enum, unique +from typing import Any # Fields of an encoded object in serialization. @@ -31,6 +32,26 @@ class Encoding(str, Enum): VAR = "__var" +def stringify_encoding_keys(d: Any) -> Any: + """ + Convert BaseSerialization Encoding enum keys to their string values recursively. + + Python 3.10 compatibility: str(Encoding.TYPE) returns "Encoding.TYPE" on 3.10 + instead of "__type__" (3.10 is still the default CI target). serde.serialize + uses str(k) for dict keys, so without this conversion the encrypted blob ends up + with "Encoding.TYPE" keys that neither serde._convert nor the BaseSerialization + fallback can read back. + """ + if isinstance(d, dict): + return { + (k.value if isinstance(k, Encoding) else str(k)): stringify_encoding_keys(v) for k, v in d.items() + } + if isinstance(d, (list, tuple)): + converted = [stringify_encoding_keys(i) for i in d] + return type(d)(converted) + return d + + # Supported types for encoding. primitives and list are not encoded. @unique class DagAttributeTypes(str, Enum): diff --git a/airflow-core/src/airflow/triggers/base.py b/airflow-core/src/airflow/triggers/base.py index 5bf7f2f20eaac..f39b62facf7b2 100644 --- a/airflow-core/src/airflow/triggers/base.py +++ b/airflow-core/src/airflow/triggers/base.py @@ -263,9 +263,11 @@ def hash(classpath: str, kwargs: dict[str, Any]) -> int: We do not want to have this logic in ``BaseTrigger`` because, when used to defer tasks, 2 triggers can have the same classpath and kwargs. This is not true for event driven scheduling. """ + from airflow.serialization.encoders import encode_trigger from airflow.serialization.serialized_objects import BaseSerialization - return hash((classpath, json.dumps(BaseSerialization.serialize(kwargs)).encode("utf-8"))) + normalized = encode_trigger({"classpath": classpath, "kwargs": kwargs})["kwargs"] + return hash((classpath, json.dumps(BaseSerialization.serialize(normalized)).encode("utf-8"))) class TriggerEvent(BaseModel): From a65df33af92b10ae49f3841635f0cc034bed8dc5 Mon Sep 17 00:00:00 2001 From: Amogh Desai Date: Wed, 6 May 2026 15:47:19 +0530 Subject: [PATCH 9/9] comments from ephraim --- .../src/airflow/models/taskinstance.py | 3 +- .../src/airflow/serialization/enums.py | 9 +++- airflow-core/tests/unit/models/test_dagrun.py | 47 +++++++++++++++++++ 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index b5174643c52f5..b395b154a40b3 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -87,6 +87,7 @@ from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule from airflow.models.xcom import XCOM_RETURN_KEY, LazyXComSelectSequence, XComModel +from airflow.serialization.enums import stringify_encoding_keys from airflow.settings import task_instance_mutation_hook from airflow.task.priority_strategy import validate_and_load_priority_weight_strategy from airflow.ti_deps.dep_context import DepContext @@ -1658,8 +1659,6 @@ def defer_task(self, session: Session = NEW_SESSION) -> bool: self.state = TaskInstanceState.DEFERRED self.trigger_id = trigger_row.id self.next_method = start_trigger_args.next_method - from airflow.serialization.enums import stringify_encoding_keys - self.next_kwargs = stringify_encoding_keys(start_trigger_args.next_kwargs or {}) self.start_date = timezone.utcnow() diff --git a/airflow-core/src/airflow/serialization/enums.py b/airflow-core/src/airflow/serialization/enums.py index 738006807c708..ae4c1249cab09 100644 --- a/airflow-core/src/airflow/serialization/enums.py +++ b/airflow-core/src/airflow/serialization/enums.py @@ -46,9 +46,14 @@ def stringify_encoding_keys(d: Any) -> Any: return { (k.value if isinstance(k, Encoding) else str(k)): stringify_encoding_keys(v) for k, v in d.items() } - if isinstance(d, (list, tuple)): + if isinstance(d, list): + return [stringify_encoding_keys(i) for i in d] + if isinstance(d, tuple): converted = [stringify_encoding_keys(i) for i in d] - return type(d)(converted) + # namedtuples require positional args, not a single list argument + if hasattr(d, "_fields"): + return type(d)(*converted) + return tuple(converted) return d diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index de72ff1d2e7fe..b16735659df0a 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -54,6 +54,7 @@ from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, clear_task_instances from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule +from airflow.models.trigger import Trigger from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator, ShortCircuitOperator @@ -2330,6 +2331,52 @@ def execute_complete(self): assert deserialize(ti.next_kwargs) == {"delay": datetime.timedelta(seconds=30)} +@pytest.mark.need_serialized_dag +def test_schedule_tis_start_trigger_kwargs_e2e(dag_maker, session): + """ + End to end test of scheduler defer_task with non-trivial trigger_kwargs (timedelta) -> + Trigger row -> Trigger.kwargs returns correct Python objects. + + Covers the path: BaseSerialization encodes trigger_kwargs with Encoding enum keys, + defer_task passes them to Trigger as kwargs which calls encrypt_kwargs -> _stringify_encoding_keys + -> serde.serialize -> stores them. + + On reading, serde.deserialize + _convert must reconstruct the original values. + """ + import datetime + + class TestOperator(BaseOperator): + start_trigger_args = StartTriggerArgs( + trigger_cls="airflow.triggers.testing.SuccessTrigger", + trigger_kwargs={"delta": datetime.timedelta(seconds=2)}, + next_method="execute_complete", + next_kwargs=None, + timeout=None, + ) + start_from_trigger = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def execute_complete(self): + pass + + with dag_maker(session=session): + TestOperator(task_id="test_task") + + dr: DagRun = dag_maker.create_dagrun() + ti = dr.get_task_instance("test_task") + ti.task = dr.dag.get_task("test_task") + dr.schedule_tis((ti,), session=session) + + assert ti.state == TaskInstanceState.DEFERRED + + trigger_row = session.get(Trigger, ti.trigger_id) + assert trigger_row is not None + # trigger_kwargs must round-trip correctly through encrypt_kwargs → _decrypt_kwargs + assert trigger_row.kwargs == {"delta": datetime.timedelta(seconds=2)} + + def test_schedule_tis_empty_operator_try_number(dag_maker, session: Session): """ When empty operator is not actually run, then we need to increment the try_number,