Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
1ed753d
Add Task Coordinators and Dag File Processor
jason810496 Apr 8, 2026
45538bc
Add initial Java provider for Apache Airflow
jason810496 Apr 8, 2026
6c08821
Add common selector loop utilities for socket I/O handling for subpro…
jason810496 Apr 10, 2026
b31e7c2
Implement Java DAG file processor with TCP communication bridge
jason810496 Apr 9, 2026
09b6cdd
Make JavaDagFileProcessor.can_handle aware of jar file content
jason810496 Apr 9, 2026
01b132b
Fix java process startup issue
jason810496 Apr 9, 2026
e71aaf7
Fix sockets bidning
jason810496 Apr 9, 2026
f0a53ec
Refactor Java DAG file processor to use selector-based I/O multiplexi…
jason810496 Apr 10, 2026
9acb172
Add BaseLocaleCoordinator for non-Python DAG file processing and task…
jason810496 Apr 10, 2026
119f3bd
Implement JavaLocaleCoordinator
jason810496 Apr 10, 2026
eaa81bf
Add Java task coordinator and entrypoint for locale-specific execution
jason810496 Apr 13, 2026
e1b84de
Refactor Java provider to with generic process coordinators and updat…
jason810496 Apr 13, 2026
42469c1
Fix Coordinator by getting the correct dag bundle and dag path
jason810496 Apr 14, 2026
8b0d96f
Make @task.stub(language=java) works
jason810496 Apr 15, 2026
83f65ff
Make coordinator respect Jar bundle based on TI workload type
jason810496 Apr 16, 2026
388ea94
Add java_sdk_setup script for Breeze
jason810496 Apr 16, 2026
0cad359
Add get_code_from_file interface for BaseLocaleCoordinator
jason810496 Apr 16, 2026
79e741d
Fix the 'Pure Java Dag' disappear in metadata DB issue
jason810496 Apr 17, 2026
b7f249f
Refactor process coordinators to runtime coordinators
jason810496 Apr 17, 2026
a50c53a
Rename stub operator language field as sdk
jason810496 Apr 23, 2026
f23647b
Rename languages.java provider to sdk.java
jason810496 Apr 23, 2026
39b02ef
Add unit tests for socket handling and selector loop functionality
jason810496 Apr 23, 2026
8a4e00a
Move TaskInstanceDTO to share to make task_runner retrieve TI.queue
jason810496 Apr 21, 2026
269b4d7
Add [workers/queue_to_runtime_mapping]
jason810496 Apr 22, 2026
ff18ddd
Remove the sdk field from stub operator and respect [workers/queue_to…
jason810496 Apr 23, 2026
3e819ea
Rename `[workers] queue_to_runtime_mapping` to `[sdk] queue_to_sdk`
jason810496 Apr 23, 2026
24e2b2e
Simplify coordinator-related names (#1569)
uranusjr Apr 24, 2026
bce5a55
CI: Add mypy and unit tests for shared/workloads
jason810496 Apr 28, 2026
da975a9
CI: Fix DB migration and breeze images
jason810496 Apr 28, 2026
0e2b17c
CI: Fix failing items
jason810496 Apr 28, 2026
5b18fcc
CI: Fix failing items
jason810496 Apr 28, 2026
19e160f
CI: Add compat for create_runtime_ti pytest fixture
jason810496 Apr 28, 2026
1af9bb2
CI: Fix Java provider test to include configuration options
jason810496 Apr 28, 2026
45ae7e7
CI: Fix self-review nits
jason810496 Apr 28, 2026
70f97ea
Revert MappedOperator change
jason810496 Apr 28, 2026
c1af2d2
CI: Fix failing items
jason810496 Apr 28, 2026
81d0ba1
CI: Fix Task SDK test_task_runner failures using TaskInstanceDTO
jason810496 Apr 28, 2026
0fa36e4
CI: Skip non-JAR paths in JavaCoordinator.can_handle_dag_file
jason810496 Apr 28, 2026
ba88ec7
CI: Drop literal Example: line from queue_to_sdk config description
jason810496 Apr 28, 2026
5c401bb
CI: Skip sdk-java provider in compat tests for older Airflow
jason810496 Apr 28, 2026
2f5acbf
CI: Fix MyPy Liskov violation in JavaCoordinator.task_execution_cmd
jason810496 Apr 28, 2026
672e963
CI: Fix sdk-java docs build warnings
jason810496 Apr 28, 2026
e76520c
CI: Update SDK Java configuration and documentation references
jason810496 Apr 29, 2026
40bede2
CI: Update map_index handling and add fixture to restore process cont…
jason810496 Apr 29, 2026
f8f46ba
CI: Refactor map_index handling and update time travel decorators for…
jason810496 Apr 30, 2026
e3817e1
CI: Replace TaskInstance with TaskInstanceDTO and add additional para…
jason810496 Apr 30, 2026
23882b9
CI: Update test cases to use -1 for map_index and add additional para…
jason810496 Apr 30, 2026
7f76aad
Remove shared workloads dependency and refactor TaskInstanceDTO usage
jason810496 Apr 30, 2026
9dee1dd
Refactor DAG file discovery by respecting coordinator
jason810496 May 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/1-airflow_bug_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ body:
- redis
- salesforce
- samba
- sdk-java
- segment
- sendgrid
- sftp
Expand Down
3 changes: 3 additions & 0 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ labelPRBasedOnFilePath:
provider:keycloak:
- providers/keycloak/**

provider:sdk-java:
- providers/sdk/java/**

provider:microsoft-azure:
- providers/microsoft/azure/**

Expand Down
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ repos:
language: python
pass_filenames: false
files: ^dev/registry/registry_tools/types\.py$|^registry/src/_data/types\.json$
- id: check-task-instance-dto-sync
name: Check BaseTaskInstanceDTO duplicate is in sync between core and task-sdk
entry: ./scripts/ci/prek/check_task_instance_dto_sync.py
language: python
pass_filenames: false
files: ^airflow-core/src/airflow/executors/workloads/task\.py$|^task-sdk/src/airflow/sdk/execution_time/workloads/task\.py$
- id: ruff
name: Run 'ruff' for extremely fast Python linting
description: "Run 'ruff' for extremely fast Python linting"
Expand Down
11 changes: 11 additions & 0 deletions airflow-core/docs/extra-packages-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ all the ``airflow`` packages together - similarly to what happened in Airflow 2.
``airflow-task-sdk`` separately, if you want to install providers, you need to install them separately as
``apache-airflow-providers-*`` distribution packages.

Multi-Language extras
=====================

These are extras that add dependencies needed for integration with other languages runtimes. Currently we have only Java SDK related extra, but in the future we might add more extras related to other languages runtimes.

+----------+------------------------------------------+------------------------------------------------------------------+
| extra | install command | enables |
+==========+==========================================+==================================================================+
| sdk.java | ``pip install apache-airflow[sdk.java]`` | JavaCoordinator for both dag processing and workload execution. |
+----------+------------------------------------------+------------------------------------------------------------------+

Apache Software extras
======================

Expand Down
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,21 @@ workers:
type: integer
example: ~
default: "60"
sdk:
description: Settings for non-Python SDK runtime coordination
options:
queue_to_sdk:
description: |
JSON mapping of queue names to SDK runtime coordinator names.

When a task's ``language`` field is not set, this mapping is checked
to route the task to a non-Python runtime coordinator based on its
queue. This is useful when queues are used as environment or
isolation identifiers (e.g. ``foo``, ``bar``).
version_added: 3.1.7
type: string
example: '{"foo": "java", "bar": "java", "go-queue": "go"}'
default: ~
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
Expand Down
78 changes: 74 additions & 4 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.typing_compat import assert_never
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.file import might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import (
Expand All @@ -88,6 +88,9 @@
from airflow.sdk.api.client import Client


log = logging.getLogger(__name__)


class DagParsingStat(NamedTuple):
"""Information on processing progress."""

Expand Down Expand Up @@ -158,6 +161,62 @@ def utc_epoch() -> datetime:
return result


def discover_dag_file_paths(
directory: str | os.PathLike[str] | None,
bundle_name: str = "",
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
) -> list[str]:
"""
Discover paths of DAG files within a directory.

Walks ``directory`` (honouring ``.airflowignore``) and returns each file that is
either a Python DAG candidate (``.py`` source or ZIP archive that passes
:func:`~airflow.utils.file.might_contain_dag`) or accepted by a registered coordinator's
:meth:`~airflow.sdk.execution_time.coordinator.BaseCoordinator.can_handle_dag_file`
(e.g. a ``.jar`` for the Java SDK, a self-contained executable for the Go SDK).

Coordinator handling takes precedence over the generic ZIP heuristic so that,
for example, a ``.jar`` is delegated to its coordinator rather than being
scanned for embedded ``.py`` modules.

:param directory: Directory to scan, or a single file path. ``None`` returns
an empty list. A single file is returned as-is without filtering.
:param bundle_name: Bundle name forwarded to ``can_handle_dag_file``.
:param safe_mode: Whether to apply the Python DAG heuristic; see
:func:`~airflow.utils.file.might_contain_dag`.
:return: Absolute paths discovered as DAG sources.
"""
if directory is None:
return []
if os.path.isfile(directory):
return [str(directory)]
if not os.path.isdir(directory):
return []

from airflow._shared.module_loading.file_discovery import find_path_from_directory
from airflow.providers_manager import ProvidersManager

coordinators = ProvidersManager().coordinators
ignore_file_syntax = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="glob")

file_paths: list[str] = []
for file_path in find_path_from_directory(directory, ".airflowignore", ignore_file_syntax):
path = Path(file_path)
try:
if not path.is_file():
continue
if path.suffix == ".py":
if might_contain_dag(file_path, safe_mode):
file_paths.append(file_path)
elif any(c.can_handle_dag_file(bundle_name, file_path) for c in coordinators):
file_paths.append(file_path)
elif zipfile.is_zipfile(path) and might_contain_dag(file_path, safe_mode):
file_paths.append(file_path)
except Exception:
log.exception("Error while examining %s", file_path)
return file_paths


class _StubSelector(selectors.BaseSelector):
"""
Stub to stand in until the real selector is created.
Expand Down Expand Up @@ -808,9 +867,11 @@ def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):

def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s at %s", bundle.name, bundle.path)
rel_paths = [Path(x).relative_to(bundle.path) for x in list_py_file_paths(bundle.path)]
rel_paths = [
Path(x).relative_to(bundle.path)
for x in discover_dag_file_paths(bundle.path, bundle_name=bundle.name)
]
self.log.info("Found %s files for bundle %s", len(rel_paths), bundle.name)

return rel_paths
Expand All @@ -822,7 +883,13 @@ def _get_observed_filelocs(self, present: set[DagFileInfo]) -> set[str]:
For regular files this includes the relative file path.
For ZIP archives this includes DAG-like inner paths such as
``archive.zip/dag.py``.

Files claimed by a registered runtime coordinator (e.g. ``.jar``)
are treated as opaque files rather than ZIP archives.
"""
from airflow.providers_manager import ProvidersManager

coordinators = ProvidersManager().coordinators

def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
"""Yield absolute paths for DAG-like files inside a ZIP archive."""
Expand All @@ -837,7 +904,10 @@ def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
observed_filelocs: set[str] = set()
for info in present:
abs_path = str(info.absolute_path)
if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
handled_by_coordinator = any(
c.can_handle_dag_file(info.bundle_name, abs_path) for c in coordinators
)
if abs_path.endswith(".py") or handled_by_coordinator or not zipfile.is_zipfile(abs_path):
observed_filelocs.add(str(info.rel_path))
else:
if TYPE_CHECKING:
Expand Down
62 changes: 58 additions & 4 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import contextlib
import functools
import importlib
import logging
import os
Expand Down Expand Up @@ -75,8 +76,6 @@
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from socket import socket

from structlog.typing import FilteringBoundLogger

from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
Expand All @@ -85,6 +84,7 @@
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.execution_time.supervisor import SelectorCallback
from airflow.typing_compat import Self


Expand Down Expand Up @@ -552,7 +552,14 @@ def start( # type: ignore[override]
) -> Self:
logger = kwargs["logger"]

_pre_import_airflow_modules(os.fspath(path), logger)
# Check if a provider-registered runtime coordinator should handle this file
logger.debug("Checking for provider-registered runtime coordinator entrypoint for file", path=path)
resolved_target = cls._resolve_processor_target(path, bundle_name, bundle_path, logger)
if resolved_target is not None:
target = resolved_target
logger.debug("Resolved provider-registered runtime coordinator entrypoint for file", path=path)
else:
_pre_import_airflow_modules(os.fspath(path), logger)

proc: Self = super().start(
target=target,
Expand All @@ -565,6 +572,53 @@ def start( # type: ignore[override]
proc._on_child_started(callbacks, path, bundle_path, bundle_name)
return proc

@staticmethod
def _resolve_processor_target(
path: str | os.PathLike[str],
bundle_name: str,
bundle_path: Path,
log: FilteringBoundLogger,
) -> Callable[[], None] | None:
"""
Return the entrypoint of the first provider runtime coordinator that can handle *path*.

The returned callable is a ``functools.partial`` that binds *path*, *bundle_name*
and *bundle_path* so the supervisor can pass it as a no-arg ``target`` to
``WatchedSubprocess.start``.
"""
from airflow.providers_manager import ProvidersManager

for coordinator_cls in ProvidersManager().coordinators:
try:
log.debug(
"Checking runtime coordinator %s for file %s",
coordinator_cls,
path,
)
if coordinator_cls.can_handle_dag_file(bundle_name, path):
log.debug(
"Using runtime coordinator %s for file %s",
coordinator_cls,
path,
)
return functools.partial(
coordinator_cls.run_dag_parsing,
path=os.fspath(path),
bundle_name=bundle_name,
bundle_path=os.fspath(bundle_path),
)
log.debug(
"Runtime coordinator %s cannot handle file %s with bundle name %s",
coordinator_cls,
path,
bundle_name,
)
except Exception:
log.warning("Failed to check runtime coordinator %s", coordinator_cls, exc_info=True)

log.debug("No runtime coordinator found for file %s, using default processor", path)
return None

def _on_child_started(
self,
callbacks: list[CallbackRequest],
Expand All @@ -590,7 +644,7 @@ def _get_target_loggers(self) -> tuple[FilteringBoundLogger, ...]:

def _create_log_forwarder(
self, loggers: tuple[FilteringBoundLogger, ...], name: str, log_level: int = logging.INFO
) -> Callable[[socket], bool]:
) -> SelectorCallback:
return super()._create_log_forwarder(loggers, name.replace("task.", "dag_processor.", 1), log_level)

def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int) -> None:
Expand Down
5 changes: 2 additions & 3 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,11 +650,10 @@ def run_workload(

if isinstance(workload, ExecuteTask):
from airflow.sdk.execution_time.supervisor import supervise_task
from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO as SDKTaskInstanceDTO

# workload.ti is a TaskInstanceDTO which duck-types as TaskInstance.
# TODO: Create a protocol for this.
return supervise_task(
ti=workload.ti, # type: ignore[arg-type]
ti=SDKTaskInstanceDTO.model_validate(workload.ti, from_attributes=True),
bundle_info=workload.bundle_info,
dag_rel_path=workload.dag_rel_path,
token=workload.token,
Expand Down
17 changes: 14 additions & 3 deletions airflow-core/src/airflow/executors/workloads/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@
from airflow.models.taskinstancekey import TaskInstanceKey


class TaskInstanceDTO(BaseModel):
"""Schema for TaskInstance with minimal required fields needed for Executors and Task SDK."""
class BaseTaskInstanceDTO(BaseModel):
"""
Base schema for TaskInstance with the minimal fields shared by Executors and the Task SDK.

This definition is duplicated in :mod:`airflow.sdk.execution_time.workloads.task`
and the two are kept in sync by the ``check-task-instance-dto-sync`` prek
hook. Update both files together.
"""

id: uuid.UUID
dag_version_id: uuid.UUID
Expand All @@ -48,11 +54,16 @@ class TaskInstanceDTO(BaseModel):
queue: str
priority_weight: int
executor_config: dict | None = Field(default=None, exclude=True)
external_executor_id: str | None = Field(default=None, exclude=True)

parent_context_carrier: dict | None = None
context_carrier: dict | None = None


class TaskInstanceDTO(BaseTaskInstanceDTO):
"""TaskInstanceDTO with executor-specific ``external_executor_id`` field and ``key`` property."""

external_executor_id: str | None = Field(default=None, exclude=True)

# TODO: Task-SDK: Can we replace TaskInstanceKey with just the uuid across the codebase?
@property
def key(self) -> TaskInstanceKey:
Expand Down
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ def code(cls, dag_id, session: Session = NEW_SESSION) -> str:

@staticmethod
def get_code_from_file(fileloc):
# Try from runtime coordinator first (classes are pre-loaded by ProvidersManager)
from airflow.providers_manager import ProvidersManager

for coordinator_cls in ProvidersManager().coordinators:
# TODO: Perhaps the `can_handle_dag_file` interface should just accept `path` only?
# Or maybe we can have different granularity for this. that 1 with bundle + path, another with just path
if coordinator_cls.can_handle_dag_file("", fileloc):
return coordinator_cls.get_code_from_file(fileloc)

# Then fallback to python native
try:
with open_maybe_zipped(fileloc, "r") as f:
code = f.read()
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,13 @@
}
}
},
"coordinators": {
"type": "array",
"description": "Runtime Coordinator class names (BaseCoordinator subclasses)",
"items": {
"type": "string"
}
},
"source-date-epoch": {
"type": "integer",
"description": "Source date epoch - seconds since epoch (gmtime) when the release documentation was prepared. Used to generate reproducible package builds with flint.",
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/provider_info.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@
"type": "string"
}
}
},
"coordinators": {
"type": "array",
"description": "Runtime Coordinator class names (BaseCoordinator subclasses)",
"items": {
"type": "string"
}
}
},
"definitions": {
Expand Down
Loading
Loading