Skip to content

Add Java SDK#65956

Draft
jason810496 wants to merge 16 commits intoapache:mainfrom
astronomer:feature/java-sdk
Draft

Add Java SDK#65956
jason810496 wants to merge 16 commits intoapache:mainfrom
astronomer:feature/java-sdk

Conversation

@jason810496
Copy link
Copy Markdown
Member

@jason810496 jason810496 commented Apr 27, 2026

Add Java SDK

Merge order: This is PR 1 of 3 — merge this first.

  1. Add Java SDK #65956 (this PR)
  2. Add Coordinator Layer and Java Coordinator #65958
  3. Add CI, E2E Tests, and Pre-commit Hooks for Java SDK #65959
  • Try it out: A combined PoC branch with all changes cherry-picked is available at [DON'T MERGE] Java SDK All #65960 for reviewers who want to test the full integration end-to-end.

Why

To enable DAGs and tasks to be written entirely in Java (or mixed with Python via @task.stub), Airflow needs a Java SDK that speaks the Task SDK protocol — msgpack-framed communication over TCP for DAG parsing and task execution.

How

The Java SDK is a standalone Gradle project (java-sdk/) that implements the Airflow Task SDK protocol in Kotlin/Java. It communicates with the Airflow task runner through the same TCP comm/logs channels that the Python SDK uses, producing byte-compatible serialized DAG JSON.

Architecture

The SDK follows a client-server model where the JVM process connects to the coordinator's TCP servers:

Airflow Task Runner (Python)              Java SDK Subprocess
────────────────────────────              ────────────────────
                                          
  TCP comm server (127.0.0.1:random) ◄──── CoordinatorComm (msgpack framing)
  TCP logs server (127.0.0.1:random) ◄──── Logger (structured JSON)
  • DAG Parsing: On DagFileParseRequest, the SDK calls DagBundle.getDags(), serializes to DagSerialization v3 format, and returns a DagFileParsingResult.
  • Task Execution: On StartupDetails, the SDK looks up the task, executes Task.execute(Client), and sends SucceedTask / TaskState. During execution, the task can call client.getConnection(), client.getVariable(), client.getXCom(), client.setXCom() — each is a synchronous request/response over the comm channel.

Serialization Compatibility

Both SDKs share test cases in test_dags.yaml. A field-by-field comparison (compare.py) ensures the Java SDK produces structurally equivalent output to Python's DagSerialization.serialize_dag().

What

  • Add java-sdk/ directory with the full SDK implementation:
    • Public API: Task interface, Client, Dag, DagBundle, Bundle, Connection
    • Execution internals (Kotlin): CoordinatorComm (msgpack TCP framing), Serde (DagSerialization v3 serializer), TaskSdkFrames (protocol message encoding/decoding), TaskRunner, DagParser, Server, Supervisor, Logger
    • Build-time tooling: BundleInspector (generates airflow-metadata.yaml), BundleScanner (discovers bundles from JAR manifests)
    • Configuration: Config (reads airflow.cfg for Java bundles folder, execution API URL, etc.)
  • Add java-sdk/example/ with a complete JavaExample bundle demonstrating pure Java DAG authoring and @task.stub interop
  • Add java-sdk/dags/stub_dag.py demonstrating mixed Python + Java DAGs via @task.stub(sdk="java")
  • Add cross-language serialization validation infrastructure (test_dags.yaml, serialize_python.py, compare.py)
  • Add unit tests for BundleScanner, CoordinatorComm, Serde, TaskRunner, DagParser, Config

Was generative AI tooling used to co-author this PR?

Co-authored-by: Tzu-ping Chung uranusjr@gmail.com

@jason810496 jason810496 changed the title Add Java SDK for Apache Airflow Add Java SDK Apr 27, 2026
@jason810496 jason810496 self-assigned this Apr 27, 2026
@uranusjr uranusjr added the AIP-108: java-sdk Change this to an 'area:' label after AIP acceptance. label Apr 28, 2026
public class JavaExample implements DagBundle {

public static class Extract implements Task {
public void execute(Client client) throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structural question: Why has the parameter in the task be named clientand not like context as we have it in Python? Would it not be better to have this consistent?

And are the execute methods always void or can the return also produce an XCom like in Python? (Here it looks like it implies XCom as result must be always pushed.)

Comment on lines +97 to +98
@dag(dag_id="java_example")
def simple_dag():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we not better make this consistent for an example?

Suggested change
@dag(dag_id="java_example")
def simple_dag():
@dag(dag_id="java_example")
def java_example():

Alternative:

Suggested change
@dag(dag_id="java_example")
def simple_dag():
@dag
def java_example():

Comment on lines +104 to +105
> **Note:** The current `DagBundle` interface used in pure Java DAGs is subject to review before the SDK reaches 1.0. Subclassing `Dag` directly may be a more natural fit and is being considered for post-OSS-integration.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think subclassing is closer to the Python world and I'd favor it. But then there need to be a registration mechanism, so a DagBag still would be needed to register/expose the defined Dag classes.

# Discovery (called by DAG File Processor)

@classmethod
def can_handle_dag_file(cls, bundle_name: str, path: str | os.PathLike) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this mechanism also be leveraed for YAML based Dag definition/generators like dag-factory?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is designed for pure target language Dag (e.g. authoring the whole Dag in Java or Go without Python @task.stub involved at all.

In this case, the JavaCoordinator will check if the file is a valid .jar to parse.
So I don't think the dag-factory is related to this method or could you elaborate more about it? Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today when using dag-factory you need to add a pseudo Dag file (Python) which calls the dag-factory YAML parser... this then globs for YAML file and produces the Dags.

I thought - and not in this PR ... but would this hook also be an option for a YAML dag parser to hook-in for all *.yaml and return the parsed dag structure, such that no pseudo Dag is needed? (see https://astronomer.github.io/dag-factory/latest/configuration/load_yaml_dags/)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point now. Yes, it's possible to leverage coordinator interface to port the current dag-factory into language-runtime that parse YAML and return the DagParsingResult so users don't need to have a pseudo python Dag anymore.

This's one of the cool direction for the new coordinator interface.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jens you’re spoiling my next AIP! But in short, yes. My end goal is for Airflow to be able to parse anything with the right plugin.

comm_addr: str, # host:port for msgpack comm channel
logs_addr: str, # host:port for structured JSON log channel
) -> list[str]:
"""Return the subprocess command for DAG file parsing."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the Coordinator pass-back a CMD for executing the parsing? Why not calling a method which inside the coordinator will trigger the parsing and monitor it?

(Same probably for the task execution CMD below)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep Coordinator interface a) as simple as possible b) leveraging existing TaskSDK implementation as much as possible, so that each language coordinator implementation only needs to "translate" the Airflow-Core terms / info into the correct command to start the target language process.

Take the task_execution_cmd method of JavaCoordinator as example, we need to compose to corresponding commands based on the given dag_file_path.

https://github.com/apache/airflow/pull/65958/changes#diff-8f8ad59d831afc67a103291c4d49dabd2ec11f963d96b7fe05f301ae6cb2895cR85-R86

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, understand you want to make it easy. But is it not "simpler" in terms of interface to call a python method and this makes the CLI call? I mean you define a function that returns a command line and then the coordinator needs to call.

From coupling this is like 1980.

If a function then you could have specific exceptions for problems and also take care for a case (future thought) where e.g. no CMD call but a REST API to a Dag repository is called. (Not in this scope)

Copy link
Copy Markdown
Member Author

@jason810496 jason810496 Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial goal of the coordinator interface is to make Airflow-Core / TaskSDK start the ephemeral runtime of other language subprocess.

Even with the later use cases you mentioned ( handle by REST API or the actions other than starting a subprocess ), the users could still override the whole run_dag_parsing and run_task_execution public methods themself instead of relying on those public *_cmd methods.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since dag files written in a programming language (such a Python) can do too much, currently Airflow always launches a child process to run them. So in this way personally I feel this is the correct coupling since it covers the use case exactly.

What I think you’re looking for (from the comment thread above) if for Airflow to be able to parse non-programming-language (I’m going to call it static) files such as YAML (dag-factory format), which can be parsed in-process without worrying side effects (although YAML is an inheritantly problematic format… that’s a different topic). However, this would not fit into the current design of DagFileProcessProcessorManager → DagFileProcessorProcess → child process because the decision to launch a child process is already made in the first (processor manager) layer, while the new coordinator layer is going to be added in the second (processor process) layer to decide what kind of child process to launch. To satisfy the need to parse static files, we need to hook into the first (manager) layer instead; this can probably still be done with the coordinator class, but needs to be a separate function instead. It is not a good idea IMO to do this with the Java SDK addition since the concern is separate.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have not thought about this but ... yes you are right plugging in some kind of YAML Dag parser would require to launch a process even if you just want to drop your next AIP hooking in here.

I rather thoguth of something like... assume you'd be able to load the Java runtime as a pybind and run Java as native process. With the current interface you force and assume you spawn a process always.

Anyway dropped my point. But might be debatable. So just some thinking but no hard blocker. Just see specific coupling and a function call would leave more room for technical options in the future compared to force a process is spawned.

def my_java_task(): ...
```

**2. Serialization — Each Language SDK Produces SDK-Compatible Serialized DAG JSON**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"SDK Compatible" - is the schema defined somewhere?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we already had airflow-core/src/airflow/serialization/schema.json.

Actually, the above serialization/schema.json is not accurate enough to capture all the detailed rules, I tried with generating the data models from schema.json (similar with generating Pydantic model from the OpenAPI spec yaml), and the auto-generating approach turned out that it missing a lot of necessary data model to represent the SDK serialized JSON format then I had to wire another data model layer to fill the gap between auto-generating data models and expected SDK serialized JSON format.

That's why we go with implementing the SDK serialization for each target language and add the "SDK Compatible" check to guard the compatibility.


The matched coordinator's `run_dag_parsing()` (a concrete method on `BaseCoordinator`) delegates to `_runtime_subprocess_entrypoint()`, which handles all the TCP/process plumbing:

1. Creates two TCP servers on `127.0.0.1` with random ports (comm + logs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not Unix sockets? 127.0.0.1smells like it will also make problems on pure IPv6 systems soon. Similar like Docker also talks HTTP over /var/run/docker.sock.

```
Airflow Supervisor Bridge Language Runtime
│ │ │
├── DagFileParseRequest ──────────┼──────────────────────►│
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like very low level. Why do we invent a custom protocol here? Would it not be better suited to use HTTP REST or gRPC for the communictaion?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how current TaskSDK works (more specifically task-sdk/src/airflow/sdk/execution_time/supervisor.py)

We didn't invent a custom protocol here. Instead, we follow the existing protocol (msgpack over IPC, task-sdk/src/airflow/sdk/execution_time/comms.py). The "Bridge" here only wire the Language Runtime sockets to existing Airflow TaskSDK sockets.


### DagFileParsingResult Format

The language runtime must produce a `DagFileParsingResult` that matches Python Airflow's DagSerialization format exactly. The Airflow scheduler deserializes this into its internal model — any divergence causes parsing failures.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we express the model also as Pydantic model and generate a JSON schema out of this? (Or other technical means as well accepted but we use Pydantic on FastAPI already. Would also be favored when having HTTP as communication as well.)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The python side of DagFileParseRequest definition is using Pydantic model. However, the LazyDeserializedDAG model is heavily relying on our Airflow internal serialization. This also echo why we need to implement "SDK-Compatible Serialized DAG JSON" for each language.

class DagFileParsingResult(BaseModel):
"""
Result of DAG File Parsing.
This is the result of a successful DAG parse, in this class, we gather all serialized DAGs,
import errors and warnings to send back to the scheduler to store in the DB.
"""
fileloc: str
serialized_dags: list[LazyDeserializedDAG]
warnings: list | None = None
import_errors: dict[str, str] | None = None
type: Literal["DagFileParsingResult"] = "DagFileParsingResult"

class LazyDeserializedDAG(pydantic.BaseModel):
"""
Lazily build information from the serialized DAG structure.
An object that will present "enough" of the DAG like interface to update DAG db models etc, without having
to deserialize the full DAG and Task hierarchy.
"""
data: dict
last_loaded: datetime.datetime | None = None
NULLABLE_PROPERTIES: ClassVar[set[str]] = {
# Non attr fields that should be nullable, or attrs with a different default
"owner",
"owner_links",
"dag_display_name",
"has_on_success_callback",
"has_on_failure_callback",
"tags",
# Attr properties that are nullable, or have a default that loads from config
"description",
"start_date",
"end_date",
"template_searchpath",
"user_defined_macros",
"user_defined_filters",
"max_active_tasks",
"max_active_runs",
"max_consecutive_failed_dag_runs",
"dagrun_timeout",
"deadline",
"allowed_run_types",
"catchup",
"doc_md",
"access_control",
"is_paused_upon_creation",
"jinja_environment_kwargs",
"relative_fileloc",
"disable_bundle_versioning",
"fail_fast",
"last_loaded",
}
@classmethod
def from_dag(cls, dag: DAG | LazyDeserializedDAG) -> LazyDeserializedDAG:
if isinstance(dag, LazyDeserializedDAG):
return dag
return cls(data=DagSerialization.to_dict(dag))
@property
def hash(self) -> str:
from airflow.models.serialized_dag import SerializedDagModel
return SerializedDagModel.hash(self.data)
def next_dagrun_info(self, *args, **kwargs) -> DagRunInfo | None:
# This function is complex to implement, for now we delegate deserialize the dag and delegate to that.
return self._real_dag.next_dagrun_info(*args, **kwargs)
@property
def access_control(self) -> Mapping[str, Mapping[str, Collection[str]] | Collection[str]] | None:
return BaseSerialization.deserialize(self.data["dag"].get("access_control"))
@cached_property
def _real_dag(self):
try:
return DagSerialization.from_dict(self.data)
except Exception:
log.exception("Failed to deserialize DAG")
raise
def __getattr__(self, name: str, /) -> Any:
if name in self.NULLABLE_PROPERTIES:
return self.data["dag"].get(name)
try:
return self.data["dag"][name]
except KeyError:
raise AttributeError(f"{type(self).__name__!r} object has no attribute {name!r}") from None
@property
def timetable(self) -> Timetable:
return decode_timetable(self.data["dag"]["timetable"])
@property
def has_task_concurrency_limits(self) -> bool:
return any(
task[Encoding.VAR].get("max_active_tis_per_dag") is not None
or task[Encoding.VAR].get("max_active_tis_per_dagrun") is not None
or task[Encoding.VAR].get("partial_kwargs", {}).get("max_active_tis_per_dag") is not None
or task[Encoding.VAR].get("partial_kwargs", {}).get("max_active_tis_per_dagrun") is not None
for task in self.data["dag"]["tasks"]
)
@property
def owner(self) -> str:
return ", ".join(
set(filter(None, (task[Encoding.VAR].get("owner") for task in self.data["dag"]["tasks"])))
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal representation is actually defined in airflow-core/src/airflow/serialization/schema.json, although I don’t know if there’s an established tool that generates POJO from a schema (reverse tools exist).

We can probably use a pre-commit check to make sure the serialized DAG definition in Java matches the JSON schema?

Regarding DagFileParsingResult (and also StartupDetails really), I don’t think they are currently documented anywhere. Now that they are no longer used for Python-to-Python, we should probably generate schema files for them too. This probably should be done separately since it’s not really in-scope of this feature specifically.

When `task_runner.main()` starts, before any Python task execution:

```
task_runner.main()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not assume every runner would be capable of running all languages. I'd assume I need to deploy a worker with a JDK installed that can run Java tasks but that one might not execute Go code.

So we should be able to configure (per worker potentially) which SDK coordinators are supported. Workload distribution must be by intend (e.g. JAR of tasks must be available at respective worker in the right version as well)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not assume every runner would be capable of running all languages. I'd assume I need to deploy a worker with a JDK installed that can run Java tasks but that one might not execute Go code.

That's correct. Only if the sdk.<lang> provider is installed on that runner then it will be able to leverage the corresponding coordinator.

So we should be able to configure (per worker potentially) which SDK coordinators are supported.

There's [sdk] queue_to_sdk = '{"foo": "java", "bar": "java", "go-queue": "go"}' config, which should match what you mentioned here.

(e.g. JAR of tasks must be available at respective worker in the right version as well)

Yes, the Airflow-Java-SDK-Version metadata in JAR should match the corresponding Java coordinator version. (Though I haven't added the validation for this one)

https://github.com/apache/airflow/pull/65958/changes#diff-7b5ede5793d4407e3d13b6bb0d6db4f11586fe5eebfe66c74b700fe3edf16f16R38

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we use the existing executor mechanism in Airflow, the idea of a worker varies depending on the executor in use. LocalExecutor for example does not really have a long-running worker concept; CeleryExecutor would require you to have at least the same number of worker processes as you want parallelism.

I think another way to put it is this design does not really go into how workers are managed. It simply assumes you have JRE set up to run Java, the same way as Python and needed provider packages are assumed when you run Python tasks.

(This brings up the interesting case of Kubernetes, where we can have different images for different languages, but currently the design requires you to have an image that can handle both. This is a possible future improvement work.)

BaseCoordinator._runtime_subprocess_entrypoint(TaskExecutionInfo)
├─ 1. Create TCP comm_server + logs_server on 127.0.0.1:random
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments for communication and protocol like above.

```
Airflow Supervisor Bridge Language Runtime
│ │ │
│ [StartupDetails sent by bridge directly] │
Copy link
Copy Markdown
Contributor

@jscheffl jscheffl Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We assume one Language Runtime is executing (only) 1 task in parallel, correct? So if I want to pool 5 tasks on a single worker I'd need to assure that 5 Java runtimes are started and are running in paralllel.

Also once a task is completed, assume the (other language) runtime will stay alive and pick the next task? Or is it stopped/killed and restarted for the next task?
(Or in other words: What is the runtime lifecycle?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I want to pool 5 tasks on a single worker I'd need to assure that 5 Java runtimes are started and are running in paralllel.

This is correct.

(Or in other words: What is the runtime lifecycle?)

The Language Runtime lifecycle is same as the "Task" itself. If there're 5 Java tasks running, we should expect to see 5 java -classpath ... --comm=... --logs=... processes.


| Message | Fields | Purpose |
|---|---|---|
| `GetConnection` | `conn_id` | Fetch an Airflow connection by ID |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will interface from structure similar to Execution API on the central site?

Copy link
Copy Markdown
Member

@uranusjr uranusjr Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not similar—this is the Execution API. The Java SDK generates these message model from the public Execution API spec with org.openapi.generator

Generation configs: https://github.com/apache/airflow/pull/65956/changes#diff-d42e52421dbb6f0599bf438e46041a190c981aec2b69860c53bdd64404b6f0e1R65-R93


```java
// sdk: org.apache.airflow.sdk.Client
public class Client {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alongside to comment above: Propose to name it "Context" to make ti similar to the Python world

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a Context type to hold most data from Airflow (ds, ti, etc.)

Client is separated from Context since Java is less flexible than Python and it’s more difficult to put in magic objects like outlet_events that does API calls in the background. (It’s possible, I just didn’t want to jump through that many hoops.) So instead Client exposes the Execution API mostly directly (just wrapping the models to make it nicer to use), while Context is used for static data that is cheap to fetch.

The Go SDK uses the same pattern. (Although I don’t know if it’s for the same reasons.)

Comment thread java-sdk/adr/0004-pure-java-dags.md Outdated
- Build-time metadata generation means DAG IDs can be discovered without JVM startup — important for `BundleScanner` and tooling.
- Source code packaging enables Airflow UI display with no changes to Airflow Core's `DagCode` infrastructure.
- The manifest convention (`Airflow-Java-SDK-*` attributes) is extensible — future attributes can carry additional metadata without breaking existing tooling.
- The build-time `BundleInspector` step adds a compile-time dependency on the SDK and requires the `DagBundle` class to be instantiable without side effects (no I/O, no connections in the constructor).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will the deployment and lifecycle look like? If new versions of Dag and task packages/JARs are created - would it just swap files? Or does the worker need to be drained and restarted?

In the Python-world we are quire flexible because all is source-based but if switching to JAR files and potentially long-living processes running on the JAR files... how to update?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The users just need to swap the JAR files, and no need to restart the dag-processor or worker at all.

a) For Dag-processing:

The dag-processor consists of DagFileProcessorManager that spawns DagFileProcessorProcess sub-subprocesses. Each DagFileProcessorProcess sub-process will parse a dag file, then DagFileProcessorManager will collect the DagFileParseRequest from DagFileProcessorProcess sub-process.

Additionally, the lifecycle of language runtime that parse the pure-java-dag is same as DagFileProcessorProcess and each DagFileProcessorProcess sub-process is ephemeral (will be killed after returning the result).

https://github.com/apache/airflow/pull/65958/changes#diff-564fd0a8fbe4cc47864a8043fcc1389b33120c88bb35852b26f45c36b902f70bR540-R562

This means each DagFileProcessorManager (airflow dag-processer process) will alway be alive and receive the result from ephemeral sub-processes then update to DB if necessary.

b) For Workload-execution:

The worker will execute each task with supervise entypoint, which is task-runner under the hook.

def supervise(**kwargs) -> int:

The task-runner will get the StartUpDetails that contains dag_id, task_id, queue, etc. Here's the coordinator comes to play: It will check if there's any JAR in [java] bundles_folder directory match the given StartUpDetails. In another word, the task-runner will check the metadata of the JAR in runtime. (the metadata of the JAR I mean -> airflow-metadata.yaml that contains dag_id and task_id : https://github.com/apache/airflow/pull/65956/changes/BASE..aa124668ae5041ba51f7058b0212ceb5b25d678b#diff-0bd84e58e5f97fbca586e41335d4786f53c10c7703fd17abfad5efa20cfe2b97R142-R155 )

https://github.com/apache/airflow/pull/65958/changes#diff-5bef10ab2956abf7360dbf9b509b6e1113407874d24abcc1b276475051f13abfR1881

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we prevent binary JAR files being checked-in and rather keep source tree only?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s best practice to check in Gradle-generated JARs. They are more like pre-compiled build scripts, not artifacts.

https://docs.gradle.org/current/userguide/gradle_wrapper.html#sec:adding_wrapper

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the Java SDK written in KT and not pure Java? Sounds a bit strange.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question when I start this project, here's the comparison between Kotlin and Java that I ask Claude before.

Perhaps naming the SDK and corresponding provider as jvm might be more accurate?


🐍 → ☕ Your Mental Map: Python vs Java vs Kotlin

What Are They?

  | Python | Java | Kotlin -- | -- | -- | -- Type System | Dynamic (types checked at runtime) | Static (types checked at compile time) | Static (but smarter — often inferred) Compilation | Interpreted | Compiled → JVM bytecode | Compiled → JVM bytecode (or JS/Native) Verbosity | Low | High | Medium (Java's "fix") Performance | Slower | Fast | Fast (same as Java) Null Safety | None can crash anywhere | null can crash anywhere (NullPointerException) | Null safety built into the type system Age | 1991 | 1995 | 2011 (by JetBrains)

🤝 Why Are Java & Kotlin Used Together?

This is the key insight: Kotlin compiles to the same JVM bytecode as Java. They are 100% interoperable.

Your Kotlin Code (.kt)  ──┐
                           ├──► JVM Bytecode (.class) ──► Runs on JVM
Your Java Code   (.java) ──┘

Real-world reasons they coexist:

  1. Legacy — Millions of lines of Java exist in production. Companies can't rewrite everything overnight.
  2. Gradual migration — Teams adopt Kotlin file-by-file, calling Java from Kotlin and vice versa.
  3. Android — Google made Kotlin the official Android language in 2017, but all Android libraries were written in Java.
  4. Libraries — The entire Java ecosystem (Spring, Hibernate, etc.) is available to Kotlin for free.

import org.apache.airflow.sdk.execution.api.route.XComsApi
import java.time.LocalDate

interface Client {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some product maturity we should add documentation to the public interfaces a.k.a. JavaDoc. So that a SDK documentation can be produced from source that is also helping in IDEs. Assume this would come?

Comment on lines +46 to +52
* Primitives pass through; complex types are wrapped in {"__type": ..., "__var": ...}.
* This matches the Python BaseSerialization.serialize() output exactly:
* - dict -> {"__type": "dict", "__var": {k: serialize(v), ...}}
* - set -> {"__type": "set", "__var": [sorted items]}
* - datetime -> {"__type": "datetime", "__var": epoch_seconds}
* - timedelta -> {"__type": "timedelta", "__var": total_seconds}
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above:: Before implementing a custom ommunication protocol as well as serialization I'd rather recommend to consider gRPC and ProtoBuf or similar serialization. There are mature frameworks and we do not need to re-invent the wheel here. (Is a garant for security vulnerabilities else...)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, having multi-lang serialization like gRPC wouldn't help here due to Airflow internal SDK serialization.

The goal of SDK serialization for each language SDK here is to translate the user-facing API (e.g. the @task for Python, func transform(ctx context.Context, client sdk.VariableClient, log *slog.Logger) error for Go, public static class Transform implements Task { for Java) into some kind of format for representing "what is the Dag and Task structure".

There're couple of reason that lead to having SDK-compatible serialization is the only way to achieve it.

  1. The IPC format of DagFileParseRequest for dag-processor
  2. If we use ProtoBuf as IPC format, this means we're designing another intermediate format for representing the final SDK serialized DAG JSON format.
  3. Each language SDK still require to implement the serialization between user-facing API and the ProtoBuf.

The #65956 (comment) comment also point out the current restriction (we're using custom serialization for Python SDK already) and should answer partial of the question.

)

# Floating-point tolerance for timestamp / duration comparisons.
FLOAT_TOLERANCE = 1e-6
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WTF? Are there difference precisions in the interpreters?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will go through the comparison throughly later, I will blame to claude code for now :D .

Comment thread java-sdk/gradlew.bat
Comment thread java-sdk/README.md
validation/serialization/test_dags.yaml \
validation/serialization/serialized_python.json

# 3. Compare
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered implementing this via a pytest?

Copy link
Copy Markdown
Member Author

@jason810496 jason810496 Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to finish it in #65959.

It should be integrated as part of CI, either in prek hook static check or pytest to guard the compatibility between each language and the breaking changes in Airflow internal serialization format.

@jscheffl
Copy link
Copy Markdown
Contributor

Wowo a lot of code and a couple of fundamental things. I'd propose to render an AIP for discussion, not sure if in code it is the best to "discuss" and adjust?

For some details, especially process lifecycle and and interprocess communication I am missing a elaboration into options and am reluctant to follow to implement an own communication protocol. Before making all the beginner failures we should in my view rather pick one of the mature frameworks and solutions existing.

Copy link
Copy Markdown
Contributor

@1fanwang 1fanwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the direction, posted minor comments mostly around enforcing compatibility

Comment thread java-sdk/adr/0003-workload-execution.md
@uranusjr uranusjr mentioned this pull request May 5, 2026
1 task
uranusjr and others added 7 commits May 6, 2026 07:41
The existing DagBundle class is renamed to BundleBuilder with a new
'build()' method to abstract away bundle creation details from the user.

The dag declaration in the example bundle is moved to a separate class
to better mirror a real-world use case. In practice, a bundle would
contain more than one dag, and the previous example is unclear about how
the user should structure the code to do that. The new example structure
uses a builder pattern to create one dag from a class declaration, while
the bundle builder implementation aggregates all dags and acts as the
executable's entry point.
I ran ./gradlew ktlintFormat through to code base and it changed some
files. These are probably missed since we have not migrated the
pre-commit hooks to the Airflow repo and the unformatted code isn't
warned in the Java SDK PR.
Since we're not (explicitly) supporting pure Java Dags for the moment,
these params are not useful. Removing them makes it easier to implement
the annotation-based authoring interface.

We will revert this commit when we come back to finish pure Java Dag
support.
- cut-date: 2026, May 6th
- add sections on public API surface, IPC forward-compatibility, and deployment updates
- introduce new ADR for coordinator packaging and registration.
- Updated the Java SDK Airflow integration documentation (ADR-0001) to clarify the role of the Coordinator layer, including the new structure for coordinator instances and their registration in Airflow configuration.
- Revised the DAG parsing documentation (ADR-0002) to reflect the shift from provider-based registration to instance-based configuration for coordinators.
- Enhanced the workload execution documentation (ADR-0003) to detail the task execution process and the opt-in nature of worker capabilities for different language runtimes.
- Established a new packaging and registration model for coordinators (ADR-0005), distinguishing them from traditional Airflow providers and introducing a namespace package for language-specific coordinators.
- Updated configuration examples to demonstrate the new `[sdk] coordinators` and `queue_to_coordinator` settings, allowing for multiple instances of the same coordinator with different runtime configurations.
uranusjr added 2 commits May 6, 2026 20:29
This allows the task to access runtime information similar to Python's
template context.

We should be more stingent what we put in this (things like conn and
var should use the client instead), but these probably must go in.

This should be done before the initial release of the SDK to avoid
backward compatibility issues.
* Introduce DagBuilder annotations

This provides two annotations: DagBuilder and DagBuilder.Task that
*slightly* reduces the boilerplate needed to define tasks in Java (about
two lines per task, and two more lines for the dag).

The way this works is you do

    @DagBuilder
    public class MyDag {
      @DagBuilder.Task
      public void myTask(...) { ... }
    }

and the compiler uses our annotation processor to generate a wrapper
class named MyDagBuilder with the needed tasks and dependencies defined.
You then can register the dag to the bundle via the builder by calling

    MyDagBuilder.build()

in the BundleBuilder's getDags().

A lot of code for a little benefit for the moment, but this should make
taskflow-style XCom a lot easier. (This is not implemented yet.)

* Push XCom from return value

* Support auto XCom set and get

Generate extra code around the annotated function to pass in XCom
references, and set XCom from the return value.

* Move builder annotations to 'Builder'

* Allow customizing generated builder class name

* Add tests for annotation processor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AIP-108: java-sdk Change this to an 'area:' label after AIP acceptance.

Development

Successfully merging this pull request may close these issues.

5 participants