Add Coordinator Layer and Java Coordinator#65958
Add Coordinator Layer and Java Coordinator#65958jason810496 wants to merge 51 commits intoapache:mainfrom
Conversation
688d569 to
59d5a47
Compare
| Multi-Language extras | ||
| ===================== | ||
|
|
||
| These are extras that add dependencies needed for integration with other languages runtimes. Currently we have only Java SDK related extra, but in the future we might add more extras related to other languages runtimes. |
There was a problem hiding this comment.
Go SDK would not be listed here?
There was a problem hiding this comment.
Once the go-sdk adapt the coordinator interface as a provider, I will update the description here to avoid the confusion.
| from airflow.providers_manager import ProvidersManager | ||
|
|
||
| extensions: list[str] = [] | ||
| for coordinator_cls in ProvidersManager().coordinators: |
There was a problem hiding this comment.
Would we assume that (if multiple) all Dag Parsers load the language interpreters? I could well imagine I spin one (or multiple) Dag parser for Python and one additional for Java - then would deploy the JAR and JDK only to the instances where needed... and on the Python Dag parser would add the GitSyncBundle... (which on the Java side probably is not used).
Not sure if everybody likes to deploy a JDK into each Dag parser environment
There was a problem hiding this comment.
The answer is similar to #65956 (comment) comment.
Only if the dag-processor install the target sdk.<lang> provider will enable the dag-parsing for pure--dag.
|
|
||
| def _start_server() -> socket.socket: | ||
| """Create a TCP server socket bound to a random port on localhost.""" | ||
| server = socket.socket() |
There was a problem hiding this comment.
As in the other PR - I am sceptcal that TCP sockets should be used as well as I do not think it is a good idea defining a proprietary protocol.
There was a problem hiding this comment.
On the other hand, the Python implementation in use since 3.0 already uses the same mechnism. (It just creates the TCP sockets in another way.)
d2f28c8 to
52dcb2a
Compare
| ], | ||
| ) | ||
| @time_machine.travel("2025-01-01 00:00:00", tick=False) | ||
| @time_machine.travel(datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc), tick=False) |
There was a problem hiding this comment.
| @time_machine.travel(datetime(2025, 1, 1, 0, 0, 0, tzinfo=timezone.utc), tick=False) | |
| @time_machine.travel(datetime(2025, 1, 1, tzinfo=timezone.utc), tick=False) |
…_runtime_mapping] config
Tweak coordinator class names, attribute names, and method names to be shorter and avoid the term 'runtime'.
- Remove Java SDK setup in Dockerfile - add multi-language extras documentation - Update TaskInstanceDTO description, and adjust API version in generated files
- Update JavaCoordinator to use TaskInstanceDTO - add compatibility check for Airflow >= 3.3.0
- Updated the Airflow issue template to include 'sdk-java' as an option. - Added unit tests for JavaCoordinator functionality. - Created a new test file for Java bundle scanning. - Updated uv.lock to reflect new dependency requirements for tomli.
Replace TaskInstance with TaskInstanceDTO in StartupDetails fixtures and add the required pool_slots, queue, and priority_weight fields.
DagCode.get_code_from_file probes every coordinator's can_handle_dag_file on each fileloc, including .py paths nested inside ZIP DAGs (e.g. test_zip.zip/test_zip.py). The Java coordinator opened these as JAR files, raising NotADirectoryError because the parent path is a ZIP file rather than a directory. Short-circuit on the .jar suffix and add NotADirectoryError to the suppressed exceptions for safety.
The config.yml description duplicated the example field as a literal "Example:" line in the description text. With --include-descriptions this rendered as "# Example:", which trips test_cli_show_config_shows_descriptions. The example is already in the dedicated example field, so remove the duplicate from the description.
apache-airflow-providers-sdk-java requires apache-airflow>=3.3.0, so installing it against the 2.11.1 / 3.0.6 / 3.1.8 / 3.2.1 compat targets fails dependency resolution. Add it to remove-providers for each older-Airflow row in PROVIDERS_COMPATIBILITY_TESTS_MATRIX. Also silence mypy no-redef on dev/registry tomli fallback imports, which now trip the mypy-dev hook because tomli is resolvable in the mypy environment after recent uv.lock updates.
Import TaskInstanceDTO from the same airflow.sdk._shared.workloads namespace that BaseCoordinator uses. The previous import via airflow._shared.workloads pointed at the same physical file via a symlink but mypy treated the two namespaces as distinct types, flagging the override as a Liskov violation.
* Add 'sdk' to empty_subpackages in provider_conf so the autoapi- generated _api/airflow/providers/sdk/index.rst is excluded the same way the other namespace-only directories are. Without this, Sphinx warned that the document was not in any toctree. * Fix the relative include paths in security.rst and installing- providers-from-sources.rst. Nested providers (those under a namespace package like sdk/) sit one directory deeper than flat providers, so the include needs four ../ segments instead of three to reach devel-common/src/sphinx_exts/includes/.
… timezone awareness in tests
…meters in test_supervisor
…meters in TestCommsDecoder
- Removed the shared workloads dependency from pyproject.toml and related files. - Deleted the workloads directory and its references in the codebase. - Refactored imports of TaskInstanceDTO to point to the new location in execution_time.workloads.task. - Introduced new files for TaskInstanceDTO and its base class in the execution_time module. - Updated tests to reflect the changes in TaskInstanceDTO imports.
b1125a1 to
3d719d1
Compare
…ample Add JavaCoordinator, jvm, openjdk, Xmx to the docs spelling wordlist so the rendered configurations-ref doesn't fail Sphinx spellcheck on the [sdk] coordinators example. Also indent multi-line example/default values by 8 spaces in the shared sections-and-options template so the rendered RST code-block keeps consistent indentation and doesn't break the field list.
Add Coordinator Layer and Java Coordinator
Why
Airflow's DAG file processor and task runner only understand Python. To run DAGs and tasks authored in other languages (Java now, Go/Rust later), both the parsing pipeline and the execution pipeline need a language-agnostic extension point that delegates to an external runtime subprocess.
How
The Coordinator Abstraction
A new
BaseCoordinatorbase class in the Task SDK (task-sdk/src/airflow/sdk/execution_time/coordinator.py) defines the extension point. Language providers subclass it and implement three methods:can_handle_dag_file(bundle_name, path)dag_parsing_runtime_cmd(...)task_execution_runtime_cmd(...)The base class owns the full subprocess lifecycle: TCP server creation, subprocess spawning, connection acceptance, and a selector-based byte-forwarding bridge between the Airflow supervisor (fd 0) and the language runtime (TCP socket). The shared I/O loop is extracted into
selector_loop.pyand reused byWatchedSubprocess.Discovery and Routing
Providers register coordinators in
provider.yamlunder a newcoordinatorskey.ProvidersManager(airflow-core) andProvidersManagerTaskRuntime(task-sdk) both discover them:DagFileProcessorProcess._resolve_processor_target()iterates registered coordinators — the first whosecan_handle_dag_file()returnsTruehandles the file.task_runner._resolve_runtime_entrypoint()uses a two-step resolution: first it consults the[sdk] queue_to_sdkmapping (queue name to coordinator runtime name), then it falls back to matching DAG file extensions against registered coordinators.Queue-Based Runtime Routing
Tasks are routed to non-Python runtimes via their
queueassignment and a configuration mapping. Operators setqueue="java-queue"(or any custom queue name), and the[sdk] queue_to_sdkconfig maps queue names to coordinator runtime names:This avoids adding new columns or API fields -- the existing
queuefield carries the routing signal from scheduling to execution, and the mapping is resolved at task execution time.Java Provider
A new
apache-airflow-providers-sdk-javaprovider implementsJavaCoordinator:can_handle_dag_file: checks if the file is a JAR with valid Airflow Java SDK manifest attributesdag_parsing_runtime_cmd: constructsjava -classpath <bundle>/* <MainClass> --comm=... --logs=...task_execution_runtime_cmd: handles both pure Java DAGs (JAR path) and Python stub DAGs (resolves bundle from[java] bundles_folderconfig)get_code_from_file: extracts embedded.javasource from the JAR for Airflow UI displayWhat
Task SDK (
task-sdk/)BaseCoordinatorabstract base class with full subprocess bridge lifecycleselector_loop.py— shared selector-based I/O utilities, refactored out ofsupervisor.py_resolve_runtime_entrypoint()totask_runner.pywith queue-based and file-extension-based dispatchQueueToCoordinatorMapperfor resolving queue names to coordinators via[sdk] queue_to_sdkconfigresolve_bundle()helper for reuse by both Python and coordinator pathscoordinatorsdiscovery inProvidersManagerTaskRuntimeAirflow Core (
airflow-core/)[sdk] queue_to_sdkconfiguration option for queue-to-runtime mappingDagFileProcessorProcess.start()with_resolve_processor_target()for coordinator delegationDagFileProcessorManagerto recognize runtime file extensions (e.g.,.jar) and skip ZIP inspection for themDagCode.get_code_from_file()to delegate to coordinator'sget_code_from_file()coordinatorsextension point toprovider.yaml.schema.jsonandprovider_info.schema.jsoncoordinatorsdiscovery inProvidersManagerJava Provider (
providers/sdk/java/)JavaCoordinatorwith DAG parsing, task execution, and code extractionBundleScannerfor JAR manifest inspection and bundle resolutionprovider.yamlwithcoordinatorsregistration and[java] bundles_folderconfigpyproject.toml, docs, LICENSE, NOTICE)java_sdk_setup.shfor Breeze development environmentWas generative AI tooling used to co-author this PR?
Co-authored-by: Tzu-ping Chung uranusjr@gmail.com