Skip to content

Commit 7e9bdb2

Browse files
authored
Remove dead AIP-44 trigger-over-BaseSerialization path (DAT.BASE_TRIGGER) (#68528)
The DAT.BASE_TRIGGER encode/decode (serializing a BaseTrigger instance via BaseSerialization) is a vestige of AIP-44's Internal API. Live deferral uses the structured DeferTask/TIDeferredStatePayload (classpath+kwargs); the execution API stores the classpath opaquely; the triggerer imports it from the DB row. No external producer/consumer of DAT.BASE_TRIGGER remains. Keeps generic AirflowException serialization (live). Generated-by: Claude Opus 4.8 (1M context)
1 parent a42fdb9 commit 7e9bdb2

3 files changed

Lines changed: 3 additions & 47 deletions

File tree

airflow-core/src/airflow/serialization/enums.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ class DagAttributeTypes(str, Enum):
7070
TIMEDELTA = "timedelta"
7171
TIMEZONE = "timezone"
7272
RELATIVEDELTA = "relativedelta"
73-
BASE_TRIGGER = "base_trigger"
7473
AIRFLOW_EXC_SER = "airflow_exc_ser"
7574
BASE_EXC_SER = "base_exc_ser"
7675
DICT = "dict"

airflow-core/src/airflow/serialization/serialized_objects.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
validate_and_load_priority_weight_strategy,
107107
)
108108
from airflow.timetables.base import DagRunInfo, Timetable
109-
from airflow.triggers.base import BaseTrigger, StartTriggerArgs
109+
from airflow.triggers.base import StartTriggerArgs
110110
from airflow.utils.code_utils import get_python_source
111111
from airflow.utils.db import LazySelectSequence
112112

@@ -470,7 +470,6 @@ def serialize(
470470
:meta private:
471471
"""
472472
from airflow.sdk.definitions._internal.types import is_arg_set
473-
from airflow.sdk.exceptions import TaskDeferred
474473

475474
if not is_arg_set(var):
476475
return cls._encode(None, type_=DAT.ARG_NOT_SET)
@@ -535,7 +534,7 @@ def serialize(
535534
var._asdict(),
536535
type_=DAT.TASK_INSTANCE_KEY,
537536
)
538-
elif isinstance(var, (AirflowException, TaskDeferred)) and hasattr(var, "serialize"):
537+
elif isinstance(var, AirflowException) and hasattr(var, "serialize"):
539538
exc_cls_name, args, kwargs = var.serialize()
540539
return cls._encode(
541540
cls.serialize(
@@ -556,14 +555,6 @@ def serialize(
556555
),
557556
type_=DAT.BASE_EXC_SER,
558557
)
559-
elif isinstance(var, BaseTrigger):
560-
return cls._encode(
561-
cls.serialize(
562-
var.serialize(),
563-
strict=strict,
564-
),
565-
type_=DAT.BASE_TRIGGER,
566-
)
567558
elif callable(var):
568559
return str(get_python_source(var))
569560
elif isinstance(var, set):
@@ -672,10 +663,6 @@ def deserialize(cls, encoded_var: Any) -> Any:
672663
else:
673664
exc_cls = import_string(f"builtins.{exc_cls_name}")
674665
return exc_cls(*args, **kwargs)
675-
elif type_ == DAT.BASE_TRIGGER:
676-
tr_cls_name, kwargs = cls.deserialize(var)
677-
tr_cls = import_string(tr_cls_name)
678-
return tr_cls(**kwargs)
679666
elif type_ == DAT.SET:
680667
return {cls.deserialize(v) for v in var}
681668
elif type_ == DAT.TUPLE:

airflow-core/tests/unit/serialization/test_serialized_objects.py

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
AirflowFailException,
4040
AirflowRescheduleException,
4141
SerializationError,
42-
TaskDeferred,
4342
)
4443
from airflow.models.connection import Connection
4544
from airflow.models.dag import DAG
@@ -104,7 +103,6 @@
104103
LazyDeserializedDAG,
105104
_has_kubernetes,
106105
)
107-
from airflow.triggers.base import BaseTrigger
108106
from airflow.utils.db import LazySelectSequence
109107

110108
from unit.models import DEFAULT_DATE
@@ -570,42 +568,14 @@ def test_ser_of_asset_event_accessor():
570568
assert d[Asset(name="yo", uri="test://yo")].extra == {"this": "that", "the": "other"}
571569

572570

573-
class MyTrigger(BaseTrigger):
574-
def __init__(self, hi):
575-
self.hi = hi
576-
577-
def serialize(self):
578-
return "unit.serialization.test_serialized_objects.MyTrigger", {"hi": self.hi}
579-
580-
async def run(self):
581-
yield
582-
583-
584571
def test_roundtrip_exceptions():
585-
"""
586-
This is for AIP-44 when we need to send certain non-error exceptions
587-
as part of an RPC call e.g. TaskDeferred or AirflowRescheduleException.
588-
"""
572+
"""Non-error AirflowExceptions (e.g. AirflowRescheduleException) round-trip through BaseSerialization."""
589573
some_date = pendulum.now()
590574
resched_exc = AirflowRescheduleException(reschedule_date=some_date)
591575
ser = BaseSerialization.serialize(resched_exc)
592576
deser = BaseSerialization.deserialize(ser)
593577
assert isinstance(deser, AirflowRescheduleException)
594578
assert deser.reschedule_date == some_date
595-
del ser
596-
del deser
597-
exc = TaskDeferred(
598-
trigger=MyTrigger(hi="yo"),
599-
method_name="meth_name",
600-
kwargs={"have": "pie"},
601-
timeout=timedelta(seconds=30),
602-
)
603-
ser = BaseSerialization.serialize(exc)
604-
deser = BaseSerialization.deserialize(ser)
605-
assert deser.trigger.hi == "yo"
606-
assert deser.method_name == "meth_name"
607-
assert deser.kwargs == {"have": "pie"}
608-
assert deser.timeout == timedelta(seconds=30)
609579

610580

611581
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)