diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 8ccdef2c6390c..bb90d80ec5bcd 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -27,6 +27,7 @@ import os import sys import types +import warnings from pathlib import Path from typing import TYPE_CHECKING, Any, Iterable @@ -431,6 +432,17 @@ def initialize_ti_deps_plugins(): registered_ti_dep_classes = {} for plugin in plugins: + if not plugin.ti_deps: + continue + + from airflow.exceptions import RemovedInAirflow3Warning + + warnings.warn( + "Using custom `ti_deps` on operators has been removed in Airflow 3.0", + RemovedInAirflow3Warning, + stacklevel=1, + ) + registered_ti_dep_classes.update( {qualname(ti_dep.__class__): ti_dep.__class__ for ti_dep in plugin.ti_deps} ) diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py index 2426352fc8531..aca1599fed031 100644 --- a/tests/plugins/test_plugins_manager.py +++ b/tests/plugins/test_plugins_manager.py @@ -28,6 +28,7 @@ import pytest +from airflow.exceptions import RemovedInAirflow3Warning from airflow.hooks.base import BaseHook from airflow.listeners.listener import get_listener_manager from airflow.plugins_manager import AirflowPlugin @@ -270,6 +271,17 @@ class AirflowAdminMenuLinksPlugin(AirflowPlugin): ), ] + def test_deprecate_ti_deps(self): + class DeprecatedTIDeps(AirflowPlugin): + name = "ti_deps" + + ti_deps = [mock.MagicMock()] + + with mock_plugin_manager(plugins=[DeprecatedTIDeps()]), pytest.warns(RemovedInAirflow3Warning): + from airflow import plugins_manager + + plugins_manager.initialize_ti_deps_plugins() + def test_should_not_warning_about_fab_plugins(self, caplog): class AirflowAdminViewsPlugin(AirflowPlugin): name = "test_admin_views_plugin" diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index d7f09c20ff9d3..266d80e481b2c 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1612,7 +1612,7 @@ class DummyTask(BaseOperator): ) as dag: DummyTask(task_id="task1") - with pytest.raises(SerializationError): + with pytest.raises(SerializationError), pytest.warns(RemovedInAirflow3Warning): SerializedBaseOperator.serialize_operator(dag.task_dict["task1"]) def test_error_on_unregistered_ti_dep_deserialization(self): @@ -1644,7 +1644,8 @@ class DummyTask(BaseOperator): with DAG(dag_id="test_serialize_custom_ti_deps", schedule=None, start_date=execution_date) as dag: DummyTask(task_id="task1") - serialize_op = SerializedBaseOperator.serialize_operator(dag.task_dict["task1"]) + with pytest.warns(RemovedInAirflow3Warning): + serialize_op = SerializedBaseOperator.serialize_operator(dag.task_dict["task1"]) assert serialize_op["deps"] == [ "airflow.ti_deps.deps.mapped_task_upstream_dep.MappedTaskUpstreamDep",