Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3c9a985
Add Task Coordinators and Dag File Processor
jason810496 Apr 8, 2026
c3862e6
Add initial Java provider for Apache Airflow
jason810496 Apr 8, 2026
32276f0
Add common selector loop utilities for socket I/O handling for subpro…
jason810496 Apr 10, 2026
5167db1
Implement Java DAG file processor with TCP communication bridge
jason810496 Apr 9, 2026
f9c67b5
Make JavaDagFileProcessor.can_handle aware of jar file content
jason810496 Apr 9, 2026
6825e04
Fix java process startup issue
jason810496 Apr 9, 2026
564a0ab
Fix sockets bidning
jason810496 Apr 9, 2026
83d0a3d
Refactor Java DAG file processor to use selector-based I/O multiplexi…
jason810496 Apr 10, 2026
adfb68e
Add BaseLocaleCoordinator for non-Python DAG file processing and task…
jason810496 Apr 10, 2026
9c61249
Implement JavaLocaleCoordinator
jason810496 Apr 10, 2026
d7c9e2d
Add Java task coordinator and entrypoint for locale-specific execution
jason810496 Apr 13, 2026
c2ec72a
Refactor Java provider to with generic process coordinators and updat…
jason810496 Apr 13, 2026
5fc2ae0
Fix Coordinator by getting the correct dag bundle and dag path
jason810496 Apr 14, 2026
bb6cdc1
Make @task.stub(language=java) works
jason810496 Apr 15, 2026
343b302
Make coordinator respect Jar bundle based on TI workload type
jason810496 Apr 16, 2026
4022ce7
Add java_sdk_setup script for Breeze
jason810496 Apr 16, 2026
4a63d24
Add get_code_from_file interface for BaseLocaleCoordinator
jason810496 Apr 16, 2026
08dea47
Fix the 'Pure Java Dag' disappear in metadata DB issue
jason810496 Apr 17, 2026
cc5be91
Refactor process coordinators to runtime coordinators
jason810496 Apr 17, 2026
0e796a0
Rename stub operator language field as sdk
jason810496 Apr 23, 2026
94eee15
Rename languages.java provider to sdk.java
jason810496 Apr 23, 2026
38780e7
Add unit tests for socket handling and selector loop functionality
jason810496 Apr 23, 2026
02b22f1
Move TaskInstanceDTO to share to make task_runner retrieve TI.queue
jason810496 Apr 21, 2026
869b931
Add [workers/queue_to_runtime_mapping]
jason810496 Apr 22, 2026
52f0f4c
Remove the sdk field from stub operator and respect [workers/queue_to…
jason810496 Apr 23, 2026
96a4fc5
Rename `[workers] queue_to_runtime_mapping` to `[sdk] queue_to_sdk`
jason810496 Apr 23, 2026
7096500
Simplify coordinator-related names (#1569)
uranusjr Apr 24, 2026
8e56e24
CI: Add mypy and unit tests for shared/workloads
jason810496 Apr 28, 2026
8f9c4e4
CI: Fix DB migration and breeze images
jason810496 Apr 28, 2026
913eab7
CI: Fix failing items
jason810496 Apr 28, 2026
26d3802
CI: Fix failing items
jason810496 Apr 28, 2026
95d40f1
CI: Add compat for create_runtime_ti pytest fixture
jason810496 Apr 28, 2026
68c72b2
CI: Fix Java provider test to include configuration options
jason810496 Apr 28, 2026
6f7b570
CI: Fix self-review nits
jason810496 Apr 28, 2026
82db9e7
Revert MappedOperator change
jason810496 Apr 28, 2026
e6b4c2d
CI: Fix failing items
jason810496 Apr 28, 2026
e5b24bb
CI: Fix Task SDK test_task_runner failures using TaskInstanceDTO
jason810496 Apr 28, 2026
a0dc159
CI: Skip non-JAR paths in JavaCoordinator.can_handle_dag_file
jason810496 Apr 28, 2026
bee63fc
CI: Drop literal Example: line from queue_to_sdk config description
jason810496 Apr 28, 2026
5c586dd
CI: Skip sdk-java provider in compat tests for older Airflow
jason810496 Apr 28, 2026
c40cf3e
CI: Fix MyPy Liskov violation in JavaCoordinator.task_execution_cmd
jason810496 Apr 28, 2026
8b2a565
CI: Fix sdk-java docs build warnings
jason810496 Apr 28, 2026
b8f6f4f
CI: Update SDK Java configuration and documentation references
jason810496 Apr 29, 2026
518b876
CI: Update map_index handling and add fixture to restore process cont…
jason810496 Apr 29, 2026
2a5d6f6
CI: Refactor map_index handling and update time travel decorators for…
jason810496 Apr 30, 2026
8fce19f
CI: Replace TaskInstance with TaskInstanceDTO and add additional para…
jason810496 Apr 30, 2026
23e3734
CI: Update test cases to use -1 for map_index and add additional para…
jason810496 Apr 30, 2026
53b83f0
Remove shared workloads dependency and refactor TaskInstanceDTO usage
jason810496 Apr 30, 2026
3d719d1
Move sdk.java out of provider as apache-airflow-coordinators-java dis…
jason810496 May 6, 2026
ed38952
Enhance documentation for BaseCoordinator lifecycle methods and IPC m…
jason810496 May 7, 2026
bc32136
CI: Fix docs spellcheck and code-block indent for sdk coordinators ex…
jason810496 May 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
!task-sdk/
!airflow-ctl/
!go-sdk/
!sdk/

# Add all "test" distributions
!tests
Expand Down
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ repos:
language: python
pass_filenames: false
files: ^dev/registry/registry_tools/types\.py$|^registry/src/_data/types\.json$
- id: check-task-instance-dto-sync
name: Check BaseTaskInstanceDTO duplicate is in sync between core and task-sdk
entry: ./scripts/ci/prek/check_task_instance_dto_sync.py
language: python
pass_filenames: false
files: ^airflow-core/src/airflow/executors/workloads/task\.py$|^task-sdk/src/airflow/sdk/execution_time/workloads/task\.py$
- id: ruff
name: Run 'ruff' for extremely fast Python linting
description: "Run 'ruff' for extremely fast Python linting"
Expand Down
11 changes: 11 additions & 0 deletions airflow-core/docs/extra-packages-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,17 @@ all the ``airflow`` packages together - similarly to what happened in Airflow 2.
``airflow-task-sdk`` separately, if you want to install providers, you need to install them separately as
``apache-airflow-providers-*`` distribution packages.

Multi-Language extras
=====================

These are extras that add dependencies needed for integration with other languages runtimes. Currently we have only Java SDK related extra, but in the future we might add more extras related to other languages runtimes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go SDK would not be listed here?

Copy link
Copy Markdown
Member Author

@jason810496 jason810496 Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the go-sdk adapt the coordinator interface as a provider, I will update the description here to avoid the confusion.


+----------+------------------------------------------+------------------------------------------------------------------+
| extra | install command | enables |
+==========+==========================================+==================================================================+
| sdk.java | ``pip install apache-airflow[sdk.java]`` | JavaCoordinator for both dag processing and workload execution. |
+----------+------------------------------------------+------------------------------------------------------------------+

Apache Software extras
======================

Expand Down
37 changes: 37 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,43 @@ workers:
type: integer
example: ~
default: "60"
sdk:
description: Settings for non-Python SDK runtime coordination
options:
coordinators:
description: |
JSON list of runtime coordinator entries.

Each entry is an object with ``name``, ``classpath`` and optional
``kwargs``. ``classpath`` is resolved via ``import_string`` and
constructed with ``kwargs`` once per process. Entries are
independent instances, so the same ``classpath`` can be configured
multiple times with different ``kwargs`` (for example, two
``JavaCoordinator`` instances pinned to different JDK versions).
version_added: 3.1.7
type: string
example: |
[
{
"name": "jdk-17",
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"]}
}
]
default: ~
queue_to_coordinator:
description: |
JSON mapping of queue names to coordinator ``name`` from
``[sdk] coordinators``.

When a task's ``language`` field is not set, this mapping is checked
to route the task to a configured coordinator instance based on its
queue. This is useful when queues are used as environment or
isolation identifiers (e.g. ``legacy-java``, ``modern-java``).
version_added: 3.1.7
type: string
example: '{"legacy-java": "jdk-11", "modern-java": "jdk-17"}'
default: ~
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
Expand Down
19 changes: 18 additions & 1 deletion airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ class DagFileProcessorManager(LoggingMixin):
factory=_config_get_factory("dag_processor", "file_parsing_sort_mode")
)

_runtime_file_extensions: tuple[str, ...] | None = attrs.field(default=None, init=False)
"""File extensions registered by runtime coordinators (e.g. ".jar"). Lazily populated."""

_api_server: InProcessExecutionAPI = attrs.field(init=False, factory=InProcessExecutionAPI)
"""API server to interact with Metadata DB"""

Expand Down Expand Up @@ -815,14 +818,28 @@ def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:

return rel_paths

def _get_runtime_file_extensions(self) -> tuple[str, ...]:
"""Collect file extensions from configured runtime coordinators (cached after first call)."""
if self._runtime_file_extensions is not None:
return self._runtime_file_extensions

from airflow.sdk.execution_time.coordinator import get_coordinator_manager

self._runtime_file_extensions = get_coordinator_manager().file_extensions()
return self._runtime_file_extensions

def _get_observed_filelocs(self, present: set[DagFileInfo]) -> set[str]:
"""
Return observed DAG source paths for bundle entries.

For regular files this includes the relative file path.
For ZIP archives this includes DAG-like inner paths such as
``archive.zip/dag.py``.

Runtime coordinator file extensions (e.g. ``.jar``) are treated as
opaque files rather than ZIP archives.
"""
runtime_extensions = self._get_runtime_file_extensions()

def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
"""Yield absolute paths for DAG-like files inside a ZIP archive."""
Expand All @@ -837,7 +854,7 @@ def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
observed_filelocs: set[str] = set()
for info in present:
abs_path = str(info.absolute_path)
if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
if abs_path.endswith((".py", *runtime_extensions)) or not zipfile.is_zipfile(abs_path):
observed_filelocs.add(str(info.rel_path))
else:
if TYPE_CHECKING:
Expand Down
44 changes: 40 additions & 4 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import contextlib
import functools
import importlib
import logging
import os
Expand Down Expand Up @@ -76,8 +77,6 @@
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from socket import socket

from structlog.typing import FilteringBoundLogger

from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
Expand All @@ -86,6 +85,7 @@
from airflow.sdk.definitions.context import Context
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.mappedoperator import MappedOperator
from airflow.sdk.execution_time.supervisor import SelectorCallback
from airflow.typing_compat import Self


Expand Down Expand Up @@ -553,7 +553,14 @@ def start( # type: ignore[override]
) -> Self:
logger = kwargs["logger"]

_pre_import_airflow_modules(os.fspath(path), logger)
# Check if a configured runtime coordinator should handle this file
logger.debug("Checking for runtime coordinator entrypoint for file", path=path)
resolved_target = cls._resolve_processor_target(path, bundle_name, bundle_path, logger)
if resolved_target is not None:
target = resolved_target
logger.debug("Resolved runtime coordinator entrypoint for file", path=path)
else:
_pre_import_airflow_modules(os.fspath(path), logger)

proc: Self = super().start(
target=target,
Expand All @@ -566,6 +573,35 @@ def start( # type: ignore[override]
proc._on_child_started(callbacks, path, bundle_path, bundle_name)
return proc

@staticmethod
def _resolve_processor_target(
path: str | os.PathLike[str],
bundle_name: str,
bundle_path: Path,
log: FilteringBoundLogger,
) -> Callable[[], None] | None:
"""
Return the entrypoint of the first runtime coordinator that can handle *path*.

The returned callable is a ``functools.partial`` that binds *path*, *bundle_name*
and *bundle_path* so the supervisor can pass it as a no-arg ``target`` to
``WatchedSubprocess.start``.
"""
from airflow.sdk.execution_time.coordinator import get_coordinator_manager

coordinator = get_coordinator_manager().for_dag_file(bundle_name, path)
if coordinator is None:
log.debug("No runtime coordinator found for file %s, using default processor", path)
return None

log.debug("Using runtime coordinator %s for file %s", type(coordinator).__qualname__, path)
return functools.partial(
coordinator.run_dag_parsing,
path=os.fspath(path),
bundle_name=bundle_name,
bundle_path=os.fspath(bundle_path),
)

def _on_child_started(
self,
callbacks: list[CallbackRequest],
Expand All @@ -591,7 +627,7 @@ def _get_target_loggers(self) -> tuple[FilteringBoundLogger, ...]:

def _create_log_forwarder(
self, loggers: tuple[FilteringBoundLogger, ...], name: str, log_level: int = logging.INFO
) -> Callable[[socket], bool]:
) -> SelectorCallback:
return super()._create_log_forwarder(loggers, name.replace("task.", "dag_processor.", 1), log_level)

def _handle_request(self, msg: ToManager, log: FilteringBoundLogger, req_id: int) -> None:
Expand Down
5 changes: 2 additions & 3 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,11 +650,10 @@ def run_workload(

if isinstance(workload, ExecuteTask):
from airflow.sdk.execution_time.supervisor import supervise_task
from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO as SDKTaskInstanceDTO

# workload.ti is a TaskInstanceDTO which duck-types as TaskInstance.
# TODO: Create a protocol for this.
return supervise_task(
ti=workload.ti, # type: ignore[arg-type]
ti=SDKTaskInstanceDTO.model_validate(workload.ti, from_attributes=True),
bundle_info=workload.bundle_info,
dag_rel_path=workload.dag_rel_path,
token=workload.token,
Expand Down
17 changes: 14 additions & 3 deletions airflow-core/src/airflow/executors/workloads/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,14 @@
from airflow.models.taskinstancekey import TaskInstanceKey


class TaskInstanceDTO(BaseModel):
"""Schema for TaskInstance with minimal required fields needed for Executors and Task SDK."""
class BaseTaskInstanceDTO(BaseModel):
"""
Base schema for TaskInstance with the minimal fields shared by Executors and the Task SDK.

This definition is duplicated in :mod:`airflow.sdk.execution_time.workloads.task`
and the two are kept in sync by the ``check-task-instance-dto-sync`` prek
hook. Update both files together.
"""

id: uuid.UUID
dag_version_id: uuid.UUID
Expand All @@ -48,11 +54,16 @@ class TaskInstanceDTO(BaseModel):
queue: str
priority_weight: int
executor_config: dict | None = Field(default=None, exclude=True)
external_executor_id: str | None = Field(default=None, exclude=True)

parent_context_carrier: dict | None = None
context_carrier: dict | None = None


class TaskInstanceDTO(BaseTaskInstanceDTO):
"""TaskInstanceDTO with executor-specific ``external_executor_id`` field and ``key`` property."""

external_executor_id: str | None = Field(default=None, exclude=True)

# TODO: Task-SDK: Can we replace TaskInstanceKey with just the uuid across the codebase?
@property
def key(self) -> TaskInstanceKey:
Expand Down
8 changes: 8 additions & 0 deletions airflow-core/src/airflow/models/dagcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ def code(cls, dag_id, session: Session = NEW_SESSION) -> str:

@staticmethod
def get_code_from_file(fileloc):
# Try from runtime coordinator first.
from airflow.sdk.execution_time.coordinator import get_coordinator_manager

coordinator = get_coordinator_manager().for_dag_file("", fileloc)
if coordinator is not None:
return coordinator.get_code_from_file(fileloc)

# Then fallback to python native
try:
with open_maybe_zipped(fileloc, "r") as f:
code = f.read()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def get_serialized_fields(cls):
"ignore_first_depends_on_past",
"inlets",
"is_setup",
"sdk",
"is_teardown",
"map_index_template",
"max_active_tis_per_dag",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
("scripts", "/opt/airflow/scripts"),
("uv.lock", "/opt/airflow/uv.lock"),
("scripts/docker/entrypoint_ci.sh", "/entrypoint"),
("sdk", "/opt/airflow/sdk"),
("shared", "/opt/airflow/shared"),
("task-sdk", "/opt/airflow/task-sdk"),
]
Expand Down
2 changes: 1 addition & 1 deletion devel-common/src/docs/provider_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@

# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
empty_subpackages = ["apache", "atlassian", "common", "cncf", "dbt", "microsoft"]
empty_subpackages = ["apache", "atlassian", "common", "cncf", "dbt", "microsoft", "sdk"]
exclude_patterns = [
"operators/_partials",
"_api/airflow/index.rst",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
{% if default and "\n" in default %}
.. code-block::

{{ default }}
{{ default | indent(width=8) }}
{% else %}
``{{ "''" if default == "" else default }}``
{% endif %}
Expand All @@ -85,7 +85,7 @@
{% if "\n" in example %}
.. code-block::

{{ example }}
{{ example | indent(width=8) }}
{% else %}
``{{ example }}``
{% endif %}
Expand Down
17 changes: 14 additions & 3 deletions devel-common/src/tests_common/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2510,7 +2510,6 @@ def execute(self, context):
from uuid6 import uuid7

from airflow.sdk import DAG
from airflow.sdk.api.datamodels._generated import TaskInstance
from airflow.sdk.execution_time.comms import BundleInfo, StartupDetails
from airflow.timetables.base import TimeRestriction

Expand Down Expand Up @@ -2538,6 +2537,15 @@ def _create_task_instance(
should_retry: bool | None = None,
max_tries: int | None = None,
) -> RuntimeTaskInstance:
from tests_common.test_utils.version_compat import AIRFLOW_V_3_3_PLUS

if AIRFLOW_V_3_3_PLUS:
from airflow.sdk.execution_time.workloads.task import TaskInstanceDTO
else:
from airflow.sdk.api.datamodels._generated import ( # type: ignore[no-redef,assignment]
TaskInstance as TaskInstanceDTO,
)

from airflow.sdk.api.datamodels._generated import DagRun, DagRunState, TIRunContext
from airflow.utils.types import DagRunType

Expand Down Expand Up @@ -2615,14 +2623,17 @@ def _create_task_instance(
}

startup_details = StartupDetails(
ti=TaskInstance(
ti=TaskInstanceDTO(
id=ti_id,
task_id=task.task_id,
dag_id=dag_id,
run_id=run_id,
try_number=try_number,
map_index=map_index,
map_index=map_index, # type: ignore[arg-type]
dag_version_id=uuid7(),
pool_slots=1,
queue="default",
priority_weight=1,
),
dag_rel_path="",
bundle_info=BundleInfo(name="anything", version="any"),
Expand Down
4 changes: 4 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ iTerm
iterm
itertools
Jarek
JavaCoordinator
javascript
jaydebeapi
Jdbc
Expand Down Expand Up @@ -894,6 +895,7 @@ jsonl
juli
Jupyter
jupyter
jvm
jwks
JWT
jwt
Expand Down Expand Up @@ -1125,6 +1127,7 @@ openai
openapi
openfaas
OpenID
openjdk
openlineage
OpenSearch
opensearch
Expand Down Expand Up @@ -1852,6 +1855,7 @@ XComs
Xiaodong
xlarge
xml
Xmx
xpath
XSS
xyz
Expand Down
Loading
Loading