diff --git a/.github/ISSUE_TEMPLATE/1-airflow_bug_report.yml b/.github/ISSUE_TEMPLATE/1-airflow_bug_report.yml
index 633ef433f4b63..e8ef6d7af228e 100644
--- a/.github/ISSUE_TEMPLATE/1-airflow_bug_report.yml
+++ b/.github/ISSUE_TEMPLATE/1-airflow_bug_report.yml
@@ -192,6 +192,7 @@ body:
- redis
- salesforce
- samba
+ - sdk-java
- segment
- sendgrid
- sftp
diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
index 21b44a9fb840f..3f5477f4512c5 100644
--- a/.github/boring-cyborg.yml
+++ b/.github/boring-cyborg.yml
@@ -189,6 +189,9 @@ labelPRBasedOnFilePath:
provider:keycloak:
- providers/keycloak/**
+ provider:sdk-java:
+ - providers/sdk/java/**
+
provider:microsoft-azure:
- providers/microsoft/azure/**
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 295688d115f4c..6605d28649497 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -463,6 +463,12 @@ repos:
language: python
pass_filenames: false
files: ^dev/registry/registry_tools/types\.py$|^registry/src/_data/types\.json$
+ - id: check-task-instance-dto-sync
+ name: Check BaseTaskInstanceDTO duplicate is in sync between core and task-sdk
+ entry: ./scripts/ci/prek/check_task_instance_dto_sync.py
+ language: python
+ pass_filenames: false
+ files: ^airflow-core/src/airflow/executors/workloads/task\.py$|^task-sdk/src/airflow/sdk/execution_time/workloads/task\.py$
- id: ruff
name: Run 'ruff' for extremely fast Python linting
description: "Run 'ruff' for extremely fast Python linting"
diff --git a/airflow-core/docs/extra-packages-ref.rst b/airflow-core/docs/extra-packages-ref.rst
index 2646b0a7c3079..9fb579c9b08ec 100644
--- a/airflow-core/docs/extra-packages-ref.rst
+++ b/airflow-core/docs/extra-packages-ref.rst
@@ -178,6 +178,17 @@ all the ``airflow`` packages together - similarly to what happened in Airflow 2.
``airflow-task-sdk`` separately, if you want to install providers, you need to install them separately as
``apache-airflow-providers-*`` distribution packages.
+Multi-Language extras
+=====================
+
+These are extras that add dependencies needed for integration with other languages runtimes. Currently we have only Java SDK related extra, but in the future we might add more extras related to other languages runtimes.
+
++----------+------------------------------------------+------------------------------------------------------------------+
+| extra | install command | enables |
++==========+==========================================+==================================================================+
+| sdk.java | ``pip install apache-airflow[sdk.java]`` | JavaCoordinator for both dag processing and workload execution. |
++----------+------------------------------------------+------------------------------------------------------------------+
+
Apache Software extras
======================
diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml
index c37989b9b7486..e173080b69062 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1967,6 +1967,21 @@ workers:
type: integer
example: ~
default: "60"
+sdk:
+ description: Settings for non-Python SDK runtime coordination
+ options:
+ queue_to_sdk:
+ description: |
+ JSON mapping of queue names to SDK runtime coordinator names.
+
+ When a task's ``language`` field is not set, this mapping is checked
+ to route the task to a non-Python runtime coordinator based on its
+ queue. This is useful when queues are used as environment or
+ isolation identifiers (e.g. ``foo``, ``bar``).
+ version_added: 3.1.7
+ type: string
+ example: '{"foo": "java", "bar": "java", "go-queue": "go"}'
+ default: ~
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py
index 8d497ca7508e3..c2dab6e3fe6d5 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -66,7 +66,7 @@
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.typing_compat import assert_never
-from airflow.utils.file import list_py_file_paths, might_contain_dag
+from airflow.utils.file import might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import (
@@ -88,6 +88,9 @@
from airflow.sdk.api.client import Client
+log = logging.getLogger(__name__)
+
+
class DagParsingStat(NamedTuple):
"""Information on processing progress."""
@@ -158,6 +161,62 @@ def utc_epoch() -> datetime:
return result
+def discover_dag_file_paths(
+ directory: str | os.PathLike[str] | None,
+ bundle_name: str = "",
+ safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
+) -> list[str]:
+ """
+ Discover paths of DAG files within a directory.
+
+ Walks ``directory`` (honouring ``.airflowignore``) and returns each file that is
+ either a Python DAG candidate (``.py`` source or ZIP archive that passes
+ :func:`~airflow.utils.file.might_contain_dag`) or accepted by a registered coordinator's
+ :meth:`~airflow.sdk.execution_time.coordinator.BaseCoordinator.can_handle_dag_file`
+ (e.g. a ``.jar`` for the Java SDK, a self-contained executable for the Go SDK).
+
+ Coordinator handling takes precedence over the generic ZIP heuristic so that,
+ for example, a ``.jar`` is delegated to its coordinator rather than being
+ scanned for embedded ``.py`` modules.
+
+ :param directory: Directory to scan, or a single file path. ``None`` returns
+ an empty list. A single file is returned as-is without filtering.
+ :param bundle_name: Bundle name forwarded to ``can_handle_dag_file``.
+ :param safe_mode: Whether to apply the Python DAG heuristic; see
+ :func:`~airflow.utils.file.might_contain_dag`.
+ :return: Absolute paths discovered as DAG sources.
+ """
+ if directory is None:
+ return []
+ if os.path.isfile(directory):
+ return [str(directory)]
+ if not os.path.isdir(directory):
+ return []
+
+ from airflow._shared.module_loading.file_discovery import find_path_from_directory
+ from airflow.providers_manager import ProvidersManager
+
+ coordinators = ProvidersManager().coordinators
+ ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")
+
+ file_paths: list[str] = []
+ for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax):
+ path = Path(file_path)
+ try:
+ if not path.is_file():
+ continue
+ if path.suffix == ".py":
+ if might_contain_dag(file_path, safe_mode):
+ file_paths.append(file_path)
+ elif any(c.can_handle_dag_file(bundle_name, file_path) for c in coordinators):
+ file_paths.append(file_path)
+ elif zipfile.is_zipfile(path) and might_contain_dag(file_path, safe_mode):
+ file_paths.append(file_path)
+ except Exception:
+ log.exception("Error while examining %s", file_path)
+ return file_paths
+
+
class _StubSelector(selectors.BaseSelector):
"""
Stub to stand in until the real selector is created.
@@ -808,9 +867,11 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
- # Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s at %s", bundle.name, bundle.path)
- rel_paths = [Path(x).relative_to(bundle.path) for x in list_py_file_paths(bundle.path)]
+ rel_paths = [
+ Path(x).relative_to(bundle.path)
+ for x in discover_dag_file_paths(bundle.path, bundle_name=bundle.name)
+ ]
self.log.info("Found %s files for bundle %s", len(rel_paths), bundle.name)
return rel_paths
@@ -822,7 +883,13 @@ def _get_observed_filelocs(self, present: set[DagFileInfo]) -> set[str]:
For regular files this includes the relative file path.
For ZIP archives this includes DAG-like inner paths such as
``archive.zip/dag.py``.
+
+ Files claimed by a registered runtime coordinator (e.g. ``.jar``)
+ are treated as opaque files rather than ZIP archives.
"""
+ from airflow.providers_manager import ProvidersManager
+
+ coordinators = ProvidersManager().coordinators
def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
"""Yield absolute paths for DAG-like files inside a ZIP archive."""
@@ -837,7 +904,10 @@ def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
observed_filelocs: set[str] = set()
for info in present:
abs_path = str(info.absolute_path)
- if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
+ handled_by_coordinator = any(
+ c.can_handle_dag_file(info.bundle_name, abs_path) for c in coordinators
+ )
+ if abs_path.endswith(".py") or handled_by_coordinator or not zipfile.is_zipfile(abs_path):
observed_filelocs.add(str(info.rel_path))
else:
if TYPE_CHECKING:
diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py
index 30ad827ede798..b1fb4d48d9cb4 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import contextlib
+import functools
import importlib
import logging
import os
@@ -75,8 +76,6 @@
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
- from socket import socket
-
from structlog.typing import FilteringBoundLogger
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
@@ -85,6 +84,7 @@
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.mappedoperator import MappedOperator
+ from airflow.sdk.execution_time.supervisor import SelectorCallback
from airflow.typing_compat import Self
@@ -552,7 +552,14 @@ def start( # type: ignore[override]
) -> Self:
logger = kwargs["logger"]
- _pre_import_airflow_modules(os.fspath(path), logger)
+ # Check if a provider-registered runtime coordinator should handle this file
+ logger.debug("Checking for provider-registered runtime coordinator entrypoint for file", path=path)
+ resolved_target = cls._resolve_processor_target(path, bundle_name, bundle_path, logger)
+ if resolved_target is not None:
+ target = resolved_target
+ logger.debug("Resolved provider-registered runtime coordinator entrypoint for file", path=path)
+ else:
+ _pre_import_airflow_modules(os.fspath(path), logger)
proc: Self = super().start(
target=target,
@@ -565,6 +572,53 @@ def start( # type: ignore[override]
proc._on_child_started(callbacks, path, bundle_path, bundle_name)
return proc
+ @staticmethod
+ def _resolve_processor_target(
+ path: str | os.PathLike[str],
+ bundle_name: str,
+ bundle_path: Path,
+ log: FilteringBoundLogger,
+ ) -> Callable[[], None] | None:
+ """
+ Return the entrypoint of the first provider runtime coordinator that can handle *path*.
+
+ The returned callable is a ``functools.partial`` that binds *path*, *bundle_name*
+ and *bundle_path* so the supervisor can pass it as a no-arg ``target`` to
+ ``WatchedSubprocess.start``.
+ """
+ from airflow.providers_manager import ProvidersManager
+
+ for coordinator_cls in ProvidersManager().coordinators:
+ try:
+ log.debug(
+ "Checking runtime coordinator %s for file %s",
+ coordinator_cls,
+ path,
+ )
+ if coordinator_cls.can_handle_dag_file(bundle_name, path):
+ log.debug(
+ "Using runtime coordinator %s for file %s",
+ coordinator_cls,
+ path,
+ )
+ return functools.partial(
+ coordinator_cls.run_dag_parsing,
+ path=os.fspath(path),
+ bundle_name=bundle_name,
+ bundle_path=os.fspath(bundle_path),
+ )
+ log.debug(
+ "Runtime coordinator %s cannot handle file %s with bundle name %s",
+ coordinator_cls,
+ path,
+ bundle_name,
+ )
+ except Exception:
+ log.warning("Failed to check runtime coordinator %s", coordinator_cls, exc_info=True)
+
+ log.debug("No runtime coordinator found for file %s, using default processor", path)
+ return None
+
def _on_child_started(
self,
callbacks: list[CallbackRequest],
@@ -590,7 +644,7 @@ def _get_target_loggers(self) -> tuple[FilteringBoundLogger, ...]:
def _create_log_forwarder(
self, loggers: tuple[FilteringBoundLogger, ...], name: str, log_level: int = logging.INFO
- ) -> Callable[[socket], bool]:
+ ) -> SelectorCallback:
return super()._create_log_forwarder(loggers, name.replace("task.", "dag_processor.", 1), log_level)
def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int) -> None:
diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py
index 9c9487c5377bc..bced160a5f4c7 100644
--- a/airflow-core/src/airflow/executors/base_executor.py
+++ b/airflow-core/src/airflow/executors/base_executor.py
@@ -650,11 +650,10 @@ def run_workload(
if isinstance(workload, ExecuteTask):
from airflow.sdk.execution_time.supervisor import supervise_task
+ from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO as SDKTaskInstanceDTO
- # workload.ti is a TaskInstanceDTO which duck-types as TaskInstance.
- # TODO: Create a protocol for this.
return supervise_task(
- ti=workload.ti, # type: ignore[arg-type]
+ ti=SDKTaskInstanceDTO.model_validate(workload.ti, from_attributes=True),
bundle_info=workload.bundle_info,
dag_rel_path=workload.dag_rel_path,
token=workload.token,
diff --git a/airflow-core/src/airflow/executors/workloads/task.py b/airflow-core/src/airflow/executors/workloads/task.py
index d05affe433096..9af3f33c10efd 100644
--- a/airflow-core/src/airflow/executors/workloads/task.py
+++ b/airflow-core/src/airflow/executors/workloads/task.py
@@ -33,8 +33,14 @@
from airflow.models.taskinstancekey import TaskInstanceKey
-class TaskInstanceDTO(BaseModel):
- """Schema for TaskInstance with minimal required fields needed for Executors and Task SDK."""
+class BaseTaskInstanceDTO(BaseModel):
+ """
+ Base schema for TaskInstance with the minimal fields shared by Executors and the Task SDK.
+
+ This definition is duplicated in :mod:`airflow.sdk.execution_time.workloads.task`
+ and the two are kept in sync by the ``check-task-instance-dto-sync`` prek
+ hook. Update both files together.
+ """
id: uuid.UUID
dag_version_id: uuid.UUID
@@ -48,11 +54,16 @@ class TaskInstanceDTO(BaseModel):
queue: str
priority_weight: int
executor_config: dict | None = Field(default=None, exclude=True)
- external_executor_id: str | None = Field(default=None, exclude=True)
parent_context_carrier: dict | None = None
context_carrier: dict | None = None
+
+class TaskInstanceDTO(BaseTaskInstanceDTO):
+ """TaskInstanceDTO with executor-specific ``external_executor_id`` field and ``key`` property."""
+
+ external_executor_id: str | None = Field(default=None, exclude=True)
+
# TODO: Task-SDK: Can we replace TaskInstanceKey with just the uuid across the codebase?
@property
def key(self) -> TaskInstanceKey:
diff --git a/airflow-core/src/airflow/models/dagcode.py b/airflow-core/src/airflow/models/dagcode.py
index 60ee91c8b59b5..528859f4cd311 100644
--- a/airflow-core/src/airflow/models/dagcode.py
+++ b/airflow-core/src/airflow/models/dagcode.py
@@ -119,6 +119,16 @@ def code(cls, dag_id, session: Session = NEW_SESSION) -> str:
@staticmethod
def get_code_from_file(fileloc):
+ # Try from runtime coordinator first (classes are pre-loaded by ProvidersManager)
+ from airflow.providers_manager import ProvidersManager
+
+ for coordinator_cls in ProvidersManager().coordinators:
+ # TODO: Perhaps the `can_handle_dag_file` interface should just accept `path` only?
+ # Or maybe we can have different granularity for this. that 1 with bundle + path, another with just path
+ if coordinator_cls.can_handle_dag_file("", fileloc):
+ return coordinator_cls.get_code_from_file(fileloc)
+
+ # Then fallback to python native
try:
with open_maybe_zipped(fileloc, "r") as f:
code = f.read()
diff --git a/airflow-core/src/airflow/provider.yaml.schema.json b/airflow-core/src/airflow/provider.yaml.schema.json
index 5714b8db658c5..1c41b906289cf 100644
--- a/airflow-core/src/airflow/provider.yaml.schema.json
+++ b/airflow-core/src/airflow/provider.yaml.schema.json
@@ -624,6 +624,13 @@
}
}
},
+ "coordinators": {
+ "type": "array",
+ "description": "Runtime Coordinator class names (BaseCoordinator subclasses)",
+ "items": {
+ "type": "string"
+ }
+ },
"source-date-epoch": {
"type": "integer",
"description": "Source date epoch - seconds since epoch (gmtime) when the release documentation was prepared. Used to generate reproducible package builds with flint.",
diff --git a/airflow-core/src/airflow/provider_info.schema.json b/airflow-core/src/airflow/provider_info.schema.json
index 86fc726a05168..92601fc58af74 100644
--- a/airflow-core/src/airflow/provider_info.schema.json
+++ b/airflow-core/src/airflow/provider_info.schema.json
@@ -446,6 +446,13 @@
"type": "string"
}
}
+ },
+ "coordinators": {
+ "type": "array",
+ "description": "Runtime Coordinator class names (BaseCoordinator subclasses)",
+ "items": {
+ "type": "string"
+ }
}
},
"definitions": {
diff --git a/airflow-core/src/airflow/providers_manager.py b/airflow-core/src/airflow/providers_manager.py
index 6fefcbc39b06d..8945589b4b046 100644
--- a/airflow-core/src/airflow/providers_manager.py
+++ b/airflow-core/src/airflow/providers_manager.py
@@ -41,6 +41,7 @@
if TYPE_CHECKING:
from airflow.cli.cli_config import CLICommand
+ from airflow.sdk.execution_time.coordinator import BaseCoordinator
log = logging.getLogger(__name__)
@@ -448,6 +449,7 @@ def __init__(self):
)
# Set of plugins contained in providers
self._plugins_set: set[PluginInfo] = set()
+ self._coordinators: list[type[BaseCoordinator]] = []
self._init_airflow_core_hooks()
self._runtime_manager = None
@@ -625,6 +627,12 @@ def initialize_providers_configuration(self):
self.initialize_providers_list()
self._discover_config()
+ @provider_info_cache("coordinators")
+ def initialize_providers_coordinators(self):
+ """Lazy initialization of providers runtime coordinators."""
+ self.initialize_providers_list()
+ self._discover_coordinators()
+
@provider_info_cache("plugins")
def initialize_providers_plugins(self):
self.initialize_providers_list()
@@ -1280,6 +1288,19 @@ def _discover_config(self) -> None:
if provider.data.get("config"):
self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment]
+ def _discover_coordinators(self) -> None:
+ """Retrieve and pre-load all coordinators defined in the providers."""
+ seen: set[str] = set()
+ for provider_package, provider in self._provider_dict.items():
+ for coordinator_class_path in provider.data.get("coordinators", []):
+ if coordinator_class_path in seen:
+ continue
+ coordinator_cls = _correctness_check(provider_package, coordinator_class_path, provider)
+ if coordinator_cls:
+ seen.add(coordinator_class_path)
+ self._coordinators.append(coordinator_cls)
+ self._coordinators = sorted(self._coordinators, key=lambda c: c.__qualname__)
+
def _discover_plugins(self) -> None:
"""Retrieve all plugins defined in the providers."""
for provider_package, provider in self._provider_dict.items():
@@ -1477,6 +1498,12 @@ def db_managers(self) -> list[str]:
self.initialize_providers_db_managers()
return sorted(self._db_manager_class_name_set)
+ @property
+ def coordinators(self) -> list[type[BaseCoordinator]]:
+ """Returns pre-loaded coordinator classes available in providers."""
+ self.initialize_providers_coordinators()
+ return self._coordinators
+
@property
def filesystem_module_names(self) -> list[str]:
self.initialize_providers_filesystems()
@@ -1548,6 +1575,7 @@ def _cleanup(self):
self._trigger_info_set.clear()
self._notification_info_set.clear()
self._plugins_set.clear()
+ self._coordinators.clear()
self._cli_command_functions_set.clear()
self._cli_command_provider_name_set.clear()
diff --git a/airflow-core/src/airflow/serialization/definitions/baseoperator.py b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
index 6bafc5891235a..9eaf9cc3ed906 100644
--- a/airflow-core/src/airflow/serialization/definitions/baseoperator.py
+++ b/airflow-core/src/airflow/serialization/definitions/baseoperator.py
@@ -195,6 +195,7 @@ def get_serialized_fields(cls):
"ignore_first_depends_on_past",
"inlets",
"is_setup",
+ "sdk",
"is_teardown",
"map_index_template",
"max_active_tis_per_dag",
diff --git a/airflow-core/src/airflow/utils/file.py b/airflow-core/src/airflow/utils/file.py
index c614cfff0ad96..25e191cdccfd8 100644
--- a/airflow-core/src/airflow/utils/file.py
+++ b/airflow-core/src/airflow/utils/file.py
@@ -19,7 +19,6 @@
import ast
import hashlib
-import logging
import os
import re
import zipfile
@@ -30,8 +29,6 @@
from airflow.configuration import conf
-log = logging.getLogger(__name__)
-
MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
@@ -74,49 +71,6 @@ def open_maybe_zipped(fileloc, mode="r"):
return open(fileloc, mode=mode)
-def list_py_file_paths(
- directory: str | os.PathLike[str] | None,
- safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
-) -> list[str]:
- """
- Traverse a directory and look for Python files.
-
- :param directory: the directory to traverse
- :param safe_mode: whether to use a heuristic to determine whether a file
- contains Airflow DAG definitions. If not provided, use the
- core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
- to safe.
- :return: a list of paths to Python files in the specified directory
- """
- file_paths: list[str] = []
- if directory is None:
- file_paths = []
- elif os.path.isfile(directory):
- file_paths = [str(directory)]
- elif os.path.isdir(directory):
- file_paths.extend(find_dag_file_paths(directory, safe_mode))
- return file_paths
-
-
-def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]:
- """Find file paths of all DAG files."""
- from airflow._shared.module_loading.file_discovery import find_path_from_directory
-
- file_paths = []
- ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")
-
- for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax):
- path = Path(file_path)
- try:
- if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)):
- if might_contain_dag(file_path, safe_mode):
- file_paths.append(file_path)
- except Exception:
- log.exception("Error while examining %s", file_path)
-
- return file_paths
-
-
COMMENT_PATTERN = re.compile(r"\s*#.*")
diff --git a/airflow-core/tests/unit/always/test_providers_manager.py b/airflow-core/tests/unit/always/test_providers_manager.py
index afa473e80a4f0..b13930c98d1c2 100644
--- a/airflow-core/tests/unit/always/test_providers_manager.py
+++ b/airflow-core/tests/unit/always/test_providers_manager.py
@@ -258,6 +258,34 @@ def test_dialects(self):
assert len(dialect_class_names) == 3
assert dialect_class_names == ["default", "mssql", "postgresql"]
+ @patch("airflow.providers_manager.import_string")
+ def test_coordinators(self, mock_import_string):
+ class ACoordinator:
+ pass
+
+ class ZCoordinator:
+ pass
+
+ mock_import_string.side_effect = lambda path: {
+ "airflow.providers.sdk.java.coordinator.ACoordinator": ACoordinator,
+ "airflow.providers.sdk.java.coordinator.ZCoordinator": ZCoordinator,
+ }[path]
+ providers_manager = ProvidersManager()
+ providers_manager._provider_dict = LazyDictWithCache()
+ providers_manager._provider_dict["apache-airflow-providers-sdk-java"] = ProviderInfo(
+ version="0.0.1",
+ data={
+ "coordinators": [
+ "airflow.providers.sdk.java.coordinator.ZCoordinator",
+ "airflow.providers.sdk.java.coordinator.ACoordinator",
+ "airflow.providers.sdk.java.coordinator.ZCoordinator",
+ ]
+ },
+ )
+
+ with patch.object(providers_manager, "initialize_providers_list"):
+ assert providers_manager.coordinators == [ACoordinator, ZCoordinator]
+
class TestWithoutCheckProviderManager:
@pytest.fixture(autouse=True)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py
index c10e73473bfef..281a378b26c1e 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -31,6 +31,7 @@
from collections import defaultdict, deque
from datetime import datetime, timedelta
from pathlib import Path
+from pprint import pformat
from socket import socket, socketpair
from unittest import mock
from unittest.mock import MagicMock
@@ -41,6 +42,7 @@
from sqlalchemy import func, select
from uuid6 import uuid7
+from airflow._shared.module_loading import find_path_from_directory
from airflow._shared.timezones import timezone
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.dag_processing.bundles.base import BaseDagBundle
@@ -51,6 +53,7 @@
DagFileInfo,
DagFileProcessorManager,
DagFileStat,
+ discover_dag_file_paths,
)
from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess
from airflow.models import DagModel, DbCallbackRequest
@@ -60,6 +63,7 @@
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.team import Team
+from airflow.utils import file as file_utils
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
@@ -85,6 +89,11 @@
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
+def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None):
+ """Custom callable injected via conf_vars in TestDagFileDiscovery.test_might_contain_dag."""
+ return False
+
+
def _get_file_infos(files: list[str | Path]) -> list[DagFileInfo]:
return [DagFileInfo(bundle_name="testing", bundle_path=TEST_DAGS_FOLDER, rel_path=Path(f)) for f in files]
@@ -114,6 +123,34 @@ def encode_mtime_in_filename(val):
return out
+class _FakeCoordinator:
+ """Test double recording every can_handle_dag_file call and matching by extension."""
+
+ file_extension: str = ".fakeext"
+ invocations: list[tuple[str, str]] = []
+
+ @classmethod
+ def reset(cls) -> None:
+ cls.invocations = []
+
+ @classmethod
+ def can_handle_dag_file(cls, bundle_name: str, path) -> bool:
+ cls.invocations.append((bundle_name, str(path)))
+ return str(path).endswith(cls.file_extension)
+
+
+@pytest.fixture
+def fake_coordinator():
+ """Inject a fake coordinator into ProvidersManager.coordinators for the duration of a test."""
+ _FakeCoordinator.reset()
+ with mock.patch(
+ "airflow.providers_manager.ProvidersManager.coordinators",
+ new_callable=mock.PropertyMock,
+ return_value=[_FakeCoordinator],
+ ):
+ yield _FakeCoordinator
+
+
def _create_zip_bundle_with_valid_and_broken_dags(zip_path: Path) -> None:
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr(
@@ -285,6 +322,43 @@ def test_get_observed_filelocs_expands_zip_inner_paths(self, tmp_path):
"test_zip.zip/broken_dag.py",
}
+ def test_get_observed_filelocs_treats_coordinator_handled_zip_as_opaque(self, tmp_path, fake_coordinator):
+ """A coordinator-claimed file that happens to be a ZIP must NOT be expanded into inner paths."""
+ # Coordinator handles ".fakeext"; the file is a real ZIP archive so
+ # without the coordinator check it would be enumerated like a dag-zip.
+ bundle_file = tmp_path / "bundle.fakeext"
+ _create_zip_bundle_with_valid_and_broken_dags(bundle_file)
+
+ manager = DagFileProcessorManager(max_runs=1)
+ observed_filelocs = manager._get_observed_filelocs(
+ {
+ DagFileInfo(
+ bundle_name="testing",
+ rel_path=Path("bundle.fakeext"),
+ bundle_path=tmp_path,
+ )
+ }
+ )
+
+ assert observed_filelocs == {"bundle.fakeext"}
+
+ def test_get_observed_filelocs_forwards_bundle_name_to_coordinator(self, tmp_path, fake_coordinator):
+ bundle_file = tmp_path / "bundle.fakeext"
+ bundle_file.write_bytes(b"opaque payload")
+
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._get_observed_filelocs(
+ {
+ DagFileInfo(
+ bundle_name="my_bundle",
+ rel_path=Path("bundle.fakeext"),
+ bundle_path=tmp_path,
+ )
+ }
+ )
+
+ assert fake_coordinator.invocations == [("my_bundle", str(bundle_file))]
+
@pytest.mark.usefixtures("clear_parse_import_errors")
def test_refresh_dag_bundles_keeps_zip_inner_file_errors(self, session, tmp_path, configure_dag_bundles):
bundle_path = tmp_path / "bundleone"
@@ -2462,3 +2536,162 @@ def test_refresh_dag_bundles_update_bundle_state_failure_still_scans_files(self)
# _bundle_versions must NOT advance — DB still holds the old version, so the next
# iteration will see a version mismatch and re-refresh rather than skip incorrectly
assert "mock_bundle" not in manager._bundle_versions
+
+
+class TestDagFileDiscovery:
+ def test_find_path_from_directory_regex_ignore(self):
+ should_ignore = [
+ "test_invalid_cron.py",
+ "test_invalid_param.py",
+ "test_ignore_this.py",
+ ]
+ files = find_path_from_directory(TEST_DAGS_FOLDER, ".airflowignore")
+
+ assert files
+ assert all(os.path.basename(file) not in should_ignore for file in files)
+
+ def test_find_path_from_directory_glob_ignore(self):
+ should_ignore = {
+ "should_ignore_this.py",
+ "test_explicit_ignore.py",
+ "test_invalid_cron.py",
+ "test_invalid_param.py",
+ "test_ignore_this.py",
+ "test_prev_dagrun_dep.py",
+ "test_nested_dag.py",
+ ".airflowignore",
+ }
+ should_not_ignore = {
+ "test_on_kill.py",
+ "test_negate_ignore.py",
+ "test_dont_ignore_this.py",
+ "test_nested_negate_ignore.py",
+ "test_explicit_dont_ignore.py",
+ }
+ actual_files = list(find_path_from_directory(TEST_DAGS_FOLDER, ".airflowignore_glob", "glob"))
+
+ assert actual_files
+ assert all(os.path.basename(file) not in should_ignore for file in actual_files)
+ actual_included_filenames = {
+ os.path.basename(f) for f in actual_files if os.path.basename(f) in should_not_ignore
+ }
+ assert actual_included_filenames == should_not_ignore, (
+ f"actual_included_filenames: {pformat(actual_included_filenames)}\nexpected_included_filenames: {pformat(should_not_ignore)}"
+ )
+
+ def test_might_contain_dag_with_default_callable(self):
+ file_path_with_dag = os.path.join(TEST_DAGS_FOLDER, "test_scheduler_dags.py")
+
+ assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=True)
+
+ @conf_vars({("core", "might_contain_dag_callable"): "unit.dag_processing.test_manager.might_contain_dag"})
+ def test_might_contain_dag(self):
+ """Test might_contain_dag_callable"""
+ file_path_with_dag = os.path.join(TEST_DAGS_FOLDER, "test_scheduler_dags.py")
+
+ # There is a DAG defined in the file_path_with_dag, however, the might_contain_dag_callable
+ # returns False no matter what, which is used to test might_contain_dag_callable actually
+ # overrides the default function
+ assert not file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=True)
+
+ # With safe_mode is False, the user defined callable won't be invoked
+ assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False)
+
+ def test_get_modules(self):
+ file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.py")
+
+ modules = list(file_utils.iter_airflow_imports(file_path))
+
+ assert len(modules) == 4
+ assert "airflow.utils" in modules
+ assert "airflow.decorators" in modules
+ assert "airflow.models" in modules
+ assert "airflow.sensors" in modules
+ # this one is a local import, we don't want it.
+ assert "airflow.local_import" not in modules
+ # this one is in a comment, we don't want it
+ assert "airflow.in_comment" not in modules
+ # we don't want imports under conditions
+ assert "airflow.if_branch" not in modules
+ assert "airflow.else_branch" not in modules
+
+ def test_get_modules_from_invalid_file(self):
+ file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file
+
+ # should not error
+ modules = list(file_utils.iter_airflow_imports(file_path))
+
+ assert len(modules) == 0
+
+ def test_discover_dag_file_paths(self, test_zip_path):
+ expected_files = set()
+ # No_dags is empty, _invalid_ is ignored by .airflowignore
+ ignored_files = {
+ "no_dags.py",
+ "should_ignore_this.py",
+ "test_explicit_ignore.py",
+ "test_invalid_cron.py",
+ "test_invalid_dup_task.py",
+ "test_ignore_this.py",
+ "test_invalid_param.py",
+ "test_invalid_param2.py",
+ "test_invalid_param3.py",
+ "test_invalid_param4.py",
+ "test_nested_dag.py",
+ "test_imports.py",
+ "test_nested_negate_ignore.py",
+ "file_no_airflow_dag.py", # no_dag test case in test_zip folder
+ "test.py", # no_dag test case in test_zip_module folder
+ "__init__.py",
+ }
+ for root, _, files in os.walk(TEST_DAGS_FOLDER):
+ for file_name in files:
+ if file_name.endswith((".py", ".zip")):
+ if file_name not in ignored_files:
+ expected_files.add(f"{root}/{file_name}")
+ detected_files = set(discover_dag_file_paths(str(TEST_DAGS_FOLDER)))
+ assert detected_files == expected_files, (
+ f"Detected files mismatched expected files:\ndetected_files: {pformat(detected_files)}\nexpected_files: {pformat(expected_files)}"
+ )
+
+ def test_discover_returns_empty_for_none(self):
+ assert discover_dag_file_paths(None) == []
+
+ def test_discover_returns_empty_for_missing_path(self, tmp_path):
+ assert discover_dag_file_paths(tmp_path / "does_not_exist") == []
+
+ def test_discover_returns_single_file_as_is(self, tmp_path):
+ single = tmp_path / "anything.bin"
+ single.write_bytes(b"opaque")
+ assert discover_dag_file_paths(single) == [str(single)]
+
+ def test_discover_includes_coordinator_handled_files(self, tmp_path, fake_coordinator):
+ coord_file = tmp_path / "bundle.fakeext"
+ coord_file.write_bytes(b"opaque payload")
+ py_file = tmp_path / "dag.py"
+ py_file.write_text("from airflow.sdk import DAG\nDAG('d')")
+
+ assert set(discover_dag_file_paths(tmp_path)) == {str(coord_file), str(py_file)}
+
+ def test_discover_coordinator_takes_precedence_over_zip_heuristic(self, tmp_path, fake_coordinator):
+ """A coordinator-claimed file that is also a ZIP must NOT also be included via the generic ZIP path."""
+ coord_zip = tmp_path / "bundle.fakeext"
+ _create_zip_bundle_with_valid_and_broken_dags(coord_zip)
+
+ # File appears exactly once: claimed by coordinator, generic zip branch skipped.
+ assert discover_dag_file_paths(tmp_path) == [str(coord_zip)]
+
+ def test_discover_forwards_bundle_name_to_coordinator(self, tmp_path, fake_coordinator):
+ coord_file = tmp_path / "bundle.fakeext"
+ coord_file.write_bytes(b"opaque payload")
+
+ discover_dag_file_paths(tmp_path, bundle_name="my_bundle")
+
+ # Only one non-.py file, so exactly one coordinator invocation, with the bundle name.
+ assert fake_coordinator.invocations == [("my_bundle", str(coord_file))]
+
+ def test_discover_skips_non_matching_unknown_file(self, tmp_path, fake_coordinator):
+ """A file no coordinator claims and that isn't .py / a ZIP must not appear in results."""
+ (tmp_path / "random.bin").write_bytes(b"unknown payload")
+
+ assert discover_dag_file_paths(tmp_path) == []
diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py
index b34ab12dd4aef..5666ad9dd3212 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -41,6 +41,7 @@
from airflow._shared.timezones.timezone import datetime as datetime_tz
from airflow.configuration import conf
from airflow.dag_processing.dagbag import BundleDagBag, DagBag
+from airflow.dag_processing.manager import discover_dag_file_paths
from airflow.exceptions import AirflowException
from airflow.models.asset import (
AssetAliasModel,
@@ -91,7 +92,6 @@
NullTimetable,
OnceTimetable,
)
-from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
@@ -1135,7 +1135,7 @@ def test_dag_is_deactivated_upon_dagfile_deletion(self, dag_maker):
DagModel.deactivate_deleted_dags(
bundle_name=orm_dag.bundle_name,
- rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER),
+ rel_filelocs=discover_dag_file_paths(settings.DAGS_FOLDER),
)
orm_dag = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id))
diff --git a/airflow-core/tests/unit/utils/test_file.py b/airflow-core/tests/unit/utils/test_file.py
index cc55c1ac0632e..a1e14d22a45f0 100644
--- a/airflow-core/tests/unit/utils/test_file.py
+++ b/airflow-core/tests/unit/utils/test_file.py
@@ -18,29 +18,18 @@
from __future__ import annotations
import os
-import zipfile
-from pprint import pformat
from unittest import mock
import pytest
-from airflow._shared.module_loading import find_path_from_directory
from airflow.utils import file as file_utils
from airflow.utils.file import (
correct_maybe_zipped,
- list_py_file_paths,
open_maybe_zipped,
)
-from tests_common.test_utils.config import conf_vars
from unit.models import TEST_DAGS_FOLDER
-TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
-
-
-def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None):
- return False
-
class TestCorrectMaybeZipped:
@mock.patch("zipfile.is_zipfile")
@@ -95,124 +84,6 @@ def test_open_maybe_zipped_archive(self, test_zip_path):
assert isinstance(content, str)
-class TestListPyFilesPath:
- def test_find_path_from_directory_regex_ignore(self):
- should_ignore = [
- "test_invalid_cron.py",
- "test_invalid_param.py",
- "test_ignore_this.py",
- ]
- files = find_path_from_directory(TEST_DAGS_FOLDER, ".airflowignore")
-
- assert files
- assert all(os.path.basename(file) not in should_ignore for file in files)
-
- def test_find_path_from_directory_glob_ignore(self):
- should_ignore = {
- "should_ignore_this.py",
- "test_explicit_ignore.py",
- "test_invalid_cron.py",
- "test_invalid_param.py",
- "test_ignore_this.py",
- "test_prev_dagrun_dep.py",
- "test_nested_dag.py",
- ".airflowignore",
- }
- should_not_ignore = {
- "test_on_kill.py",
- "test_negate_ignore.py",
- "test_dont_ignore_this.py",
- "test_nested_negate_ignore.py",
- "test_explicit_dont_ignore.py",
- }
- actual_files = list(find_path_from_directory(TEST_DAGS_FOLDER, ".airflowignore_glob", "glob"))
-
- assert actual_files
- assert all(os.path.basename(file) not in should_ignore for file in actual_files)
- actual_included_filenames = set(
- [os.path.basename(f) for f in actual_files if os.path.basename(f) in should_not_ignore]
- )
- assert actual_included_filenames == should_not_ignore, (
- f"actual_included_filenames: {pformat(actual_included_filenames)}\nexpected_included_filenames: {pformat(should_not_ignore)}"
- )
-
- def test_might_contain_dag_with_default_callable(self):
- file_path_with_dag = os.path.join(TEST_DAGS_FOLDER, "test_scheduler_dags.py")
-
- assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=True)
-
- @conf_vars({("core", "might_contain_dag_callable"): "unit.utils.test_file.might_contain_dag"})
- def test_might_contain_dag(self):
- """Test might_contain_dag_callable"""
- file_path_with_dag = os.path.join(TEST_DAGS_FOLDER, "test_scheduler_dags.py")
-
- # There is a DAG defined in the file_path_with_dag, however, the might_contain_dag_callable
- # returns False no matter what, which is used to test might_contain_dag_callable actually
- # overrides the default function
- assert not file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=True)
-
- # With safe_mode is False, the user defined callable won't be invoked
- assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False)
-
- def test_get_modules(self):
- file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.py")
-
- modules = list(file_utils.iter_airflow_imports(file_path))
-
- assert len(modules) == 4
- assert "airflow.utils" in modules
- assert "airflow.decorators" in modules
- assert "airflow.models" in modules
- assert "airflow.sensors" in modules
- # this one is a local import, we don't want it.
- assert "airflow.local_import" not in modules
- # this one is in a comment, we don't want it
- assert "airflow.in_comment" not in modules
- # we don't want imports under conditions
- assert "airflow.if_branch" not in modules
- assert "airflow.else_branch" not in modules
-
- def test_get_modules_from_invalid_file(self):
- file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file
-
- # should not error
- modules = list(file_utils.iter_airflow_imports(file_path))
-
- assert len(modules) == 0
-
- def test_list_py_file_paths(self, test_zip_path):
- detected_files = set()
- expected_files = set()
- # No_dags is empty, _invalid_ is ignored by .airflowignore
- ignored_files = {
- "no_dags.py",
- "should_ignore_this.py",
- "test_explicit_ignore.py",
- "test_invalid_cron.py",
- "test_invalid_dup_task.py",
- "test_ignore_this.py",
- "test_invalid_param.py",
- "test_invalid_param2.py",
- "test_invalid_param3.py",
- "test_invalid_param4.py",
- "test_nested_dag.py",
- "test_imports.py",
- "test_nested_negate_ignore.py",
- "file_no_airflow_dag.py", # no_dag test case in test_zip folder
- "test.py", # no_dag test case in test_zip_module folder
- "__init__.py",
- }
- for root, _, files in os.walk(TEST_DAG_FOLDER):
- for file_name in files:
- if file_name.endswith((".py", ".zip")):
- if file_name not in ignored_files:
- expected_files.add(f"{root}/{file_name}")
- detected_files = set(list_py_file_paths(TEST_DAG_FOLDER))
- assert detected_files == expected_files, (
- f"Detected files mismatched expected files:\ndetected_files: {pformat(detected_files)}\nexpected_files: {pformat(expected_files)}"
- )
-
-
@pytest.mark.parametrize(
("edge_filename", "expected_modification"),
[
diff --git a/dev/breeze/doc/images/output_build-docs.svg b/dev/breeze/doc/images/output_build-docs.svg
index 1858bbb097e91..2a0812c05163e 100644
--- a/dev/breeze/doc/images/output_build-docs.svg
+++ b/dev/breeze/doc/images/output_build-docs.svg
@@ -240,8 +240,8 @@
| hashicorp | helm-chart | http | imap | influxdb | informatica | jdbc | jenkins | keycloak | microsoft.azure |
microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage |
opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |
-salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | standard |
-tableau | task-sdk | telegram | teradata | trino | vertica | vespa | weaviate | yandex | ydb | zendesk]...
+salesforce | samba | sdk.java | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh |
+standard | tableau | task-sdk | telegram | teradata | trino | vertica | vespa | weaviate | yandex | ydb | zendesk]...
Build documents.
diff --git a/dev/breeze/doc/images/output_build-docs.txt b/dev/breeze/doc/images/output_build-docs.txt
index 54d8d4e3f39bb..247bee9e56ff6 100644
--- a/dev/breeze/doc/images/output_build-docs.txt
+++ b/dev/breeze/doc/images/output_build-docs.txt
@@ -1 +1 @@
-c5f2067ec852773089ed0ca7b8d1d533
+b4c249b4d1f7605a443774262109694a
diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.svg b/dev/breeze/doc/images/output_release-management_add-back-references.svg
index f17f7f47ed43b..37e9086660253 100644
--- a/dev/breeze/doc/images/output_release-management_add-back-references.svg
+++ b/dev/breeze/doc/images/output_release-management_add-back-references.svg
@@ -155,8 +155,8 @@
| hashicorp | helm-chart | http | imap | influxdb | informatica | jdbc | jenkins | keycloak | microsoft.azure |
microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage |
opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |
-salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | standard |
-tableau | task-sdk | telegram | teradata | trino | vertica | vespa | weaviate | yandex | ydb | zendesk]...
+salesforce | samba | sdk.java | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh |
+standard | tableau | task-sdk | telegram | teradata | trino | vertica | vespa | weaviate | yandex | ydb | zendesk]...
Command to add back references for documentation to make it backward compatible.
diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.txt b/dev/breeze/doc/images/output_release-management_add-back-references.txt
index ffc7eeea6018b..a43ec033fc2a6 100644
--- a/dev/breeze/doc/images/output_release-management_add-back-references.txt
+++ b/dev/breeze/doc/images/output_release-management_add-back-references.txt
@@ -1 +1 @@
-3df401aef0085547b08fe896a9a65381
+a44de0a6fcf0ad832e0b2a73a883f0a0
diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg
index 8fe24cdf434e6..6566b6c97716f 100644
--- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg
+++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg
@@ -149,9 +149,9 @@
github | google | grpc | hashicorp | http | imap | influxdb | informatica | jdbc | jenkins | keycloak |
microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai |
openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres |
-presto | qdrant | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake |
-sqlite | ssh | standard | tableau | telegram | teradata | trino | vertica | vespa | weaviate | yandex | ydb |
-zendesk]...
+presto | qdrant | redis | salesforce | samba | sdk.java | segment | sendgrid | sftp | singularity | slack | smtp |
+snowflake | sqlite | ssh | standard | tableau | telegram | teradata | trino | vertica | vespa | weaviate | yandex |
+ydb | zendesk]...
Generates content for issue to test the release.
diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt
index c6189be26338f..0c327de82828f 100644
--- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt
+++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt
@@ -1 +1 @@
-a85c889b710aa347eb6c47fc36b11720
+ee99c790838efb1d5e5a3b06e6c49846
diff --git a/dev/breeze/doc/images/output_release-management_generate-providers-metadata.svg b/dev/breeze/doc/images/output_release-management_generate-providers-metadata.svg
index 867b9fedc0357..742e316f5a754 100644
--- a/dev/breeze/doc/images/output_release-management_generate-providers-metadata.svg
+++ b/dev/breeze/doc/images/output_release-management_generate-providers-metadata.svg
@@ -1,4 +1,4 @@
-