From 8dad3bdee1d4751a80d3794e79139a3997a2b028 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 24 Jun 2026 19:01:15 +0900 Subject: [PATCH 1/4] Add Scala Spark ETL example bundle to the Java SDK e2e tests ## Why Demonstrate and regression-test that the Java SDK can run a real Scala + Apache Spark workload, with task logs routed into Airflow via Log4j 2. ## What - Add `java-sdk/scala_spark_example`: a standalone Scala + Spark 3.5 (local mode) ETL bundle whose three tasks pass scalar results over XCom and log through Log4j 2 (`airflow-sdk-log4j2`). - Run it inside the existing `java_sdk` e2e via a second coordinator and queue (`scala-jdk` / `scala`) with its own `jars_root`, keeping the Java example bundle Spark-free. - Pin the e2e worker JRE to Java 17 and pass Spark's `--add-opens` JVM args. - Add `TestJavaSDKScalaSparkExample` asserting the tasks succeed and the XComs match the fixed dataset (5 rows, total revenue 1000). --- .pre-commit-config.yaml | 1 + airflow-e2e-tests/docker/Dockerfile.java | 5 +- airflow-e2e-tests/docker/java.yml | 12 +- .../tests/airflow_e2e_tests/conftest.py | 89 +++++++++-- .../tests/airflow_e2e_tests/constants.py | 5 + .../java_sdk_tests/test_java_sdk_dag.py | 60 +++++++ java-sdk/scala_spark_example/README.md | 44 +++++ java-sdk/scala_spark_example/build.gradle | 64 ++++++++ .../scala_spark_example/gradle.properties | 1 + java-sdk/scala_spark_example/settings.gradle | 33 ++++ .../src/main/resources/log4j2.xml | 37 +++++ .../resources/dags/scala_spark_examples.py | 40 +++++ .../airflow/example/ScalaSparkExample.scala | 150 ++++++++++++++++++ 13 files changed, 525 insertions(+), 16 deletions(-) create mode 100644 java-sdk/scala_spark_example/README.md create mode 100644 java-sdk/scala_spark_example/build.gradle create mode 120000 java-sdk/scala_spark_example/gradle.properties create mode 100644 java-sdk/scala_spark_example/settings.gradle create mode 100644 java-sdk/scala_spark_example/src/main/resources/log4j2.xml create mode 100644 java-sdk/scala_spark_example/src/resources/dags/scala_spark_examples.py create mode 100644 java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f3461ed417cd4..e42e987109ac5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -730,6 +730,7 @@ repos: ^java-sdk/gradlew$| ^java-sdk/gradlew\.bat$| ^java-sdk/gradle| + ^java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample\.scala$| ^task-sdk/tests/| ^.*changelog\.(rst|txt)$| ^.*CHANGELOG\.(rst|txt)$| diff --git a/airflow-e2e-tests/docker/Dockerfile.java b/airflow-e2e-tests/docker/Dockerfile.java index 7fc3363825ecb..68ccff525a359 100644 --- a/airflow-e2e-tests/docker/Dockerfile.java +++ b/airflow-e2e-tests/docker/Dockerfile.java @@ -17,11 +17,14 @@ # Extends the standard Airflow image with a headless JRE so JavaCoordinator # can spawn JVM subprocesses for @task.stub tasks. +# +# Pin Java 17 (rather than default-jre-headless): the Scala Spark example runs +# Apache Spark 3.5.x, which supports Java 8/11/17 but not Java 21. ARG DOCKER_IMAGE FROM ${DOCKER_IMAGE} USER root RUN apt-get update \ - && apt-get install -y --no-install-recommends default-jre-headless \ + && apt-get install -y --no-install-recommends openjdk-17-jre-headless \ && rm -rf /var/lib/apt/lists/* USER airflow diff --git a/airflow-e2e-tests/docker/java.yml b/airflow-e2e-tests/docker/java.yml index 3a01c66dd181b..d3609f4e8bee0 100644 --- a/airflow-e2e-tests/docker/java.yml +++ b/airflow-e2e-tests/docker/java.yml @@ -19,13 +19,15 @@ # # Replaces the stock airflow-worker image with one that has a JRE installed # (built by conftest._setup_java_sdk_integration via Dockerfile.java), mounts -# the pre-built example bundle JARs under /opt/airflow/jars, and configures -# the worker to consume the "java" Celery queue where @task.stub tasks are -# routed. +# the pre-built bundle JARs (the Java example under /opt/airflow/java-jars and +# the Scala Spark example under /opt/airflow/scala-jars), and configures the +# worker to consume the "java" and "scala" Celery queues where @task.stub tasks +# are routed. --- services: airflow-worker: image: airflow-java-worker volumes: - - ./jars:/opt/airflow/jars:ro - command: celery worker -q java,default + - ./java-jars:/opt/airflow/java-jars:ro + - ./scala-jars:/opt/airflow/scala-jars:ro + command: celery worker -q java,scala,default diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py index ce65c6fc9a420..895fc1ff5289c 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py @@ -21,7 +21,7 @@ import subprocess from datetime import datetime from pathlib import Path -from shutil import copyfile, copytree +from shutil import copyfile, copytree, rmtree import pytest from rich.console import Console @@ -54,6 +54,8 @@ OPENSEARCH_PATH, PROVIDERS_MOUNT_CONTAINER_PATH, PROVIDERS_ROOT_PATH, + SCALA_SPARK_EXAMPLE_DAGS_PATH, + SCALA_SPARK_EXAMPLE_LIBS_PATH, TEST_REPORT_FILE, XCOM_BUCKET, ) @@ -290,6 +292,12 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir): ], check=True, ) + # TODO: Make the following build steps parallel + # The Gradle `bundle` task is a Copy that never prunes its destination, so + # JARs from an earlier build linger. A stale dependency JAR with its own + # Main-Class would make JavaCoordinator's Main-Class discovery ambiguous, so + # start each bundle from an empty directory. + rmtree(JAVA_SDK_EXAMPLE_LIBS_PATH, ignore_errors=True) console.print("[yellow]Building Java SDK example bundle (eclipse-temurin:17-jdk)...") subprocess.run( [ @@ -315,17 +323,48 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir): ], check=True, ) + rmtree(SCALA_SPARK_EXAMPLE_LIBS_PATH, ignore_errors=True) + console.print("[yellow]Building Scala Spark example bundle (eclipse-temurin:17-jdk)...") + subprocess.run( + [ + "docker", + "run", + "--rm", + "--user", + f"{os.getuid()}:{os.getgid()}", + "-e", + "GRADLE_USER_HOME=/repo/java-sdk/.gradle", + "-e", + "HOME=/workspace-home", + "-v", + f"{JAVA_SDK_MAVEN_CACHE_PATH}:/workspace-home/.m2", + "-v", + f"{AIRFLOW_ROOT_PATH}:/repo", + "-w", + "/repo/java-sdk/scala_spark_example", + "eclipse-temurin:17-jdk", + "../gradlew", + "bundle", + "--no-daemon", + ], + check=True, + ) # Copy compose override and Dockerfile into the temp directory. copyfile(JAVA_COMPOSE_PATH, tmp_dir / "java.yml") copyfile(JAVA_DOCKERFILE_PATH, tmp_dir / "Dockerfile.java") - # Copy all JARs from installDist output so the compose bind-mount ./jars - # gives the worker everything JavaCoordinator needs to build a classpath. - copytree(JAVA_SDK_EXAMPLE_LIBS_PATH, tmp_dir / "jars") + # Copy each bundle's JARs into its own directory; the compose bind-mounts + # expose them to the worker, and each JavaCoordinator globs its own dir. + copytree(JAVA_SDK_EXAMPLE_LIBS_PATH, tmp_dir / "java-jars") + copytree(SCALA_SPARK_EXAMPLE_LIBS_PATH, tmp_dir / "scala-jars") - # Copy the Java SDK example Dag file so Airflow can discover it. + # Copy the Java SDK example Dag files so Airflow can discover them. copyfile(JAVA_SDK_EXAMPLE_DAGS_PATH / "java_examples.py", tmp_dir / "dags" / "java_examples.py") + copyfile( + SCALA_SPARK_EXAMPLE_DAGS_PATH / "scala_spark_examples.py", + tmp_dir / "dags" / "scala_spark_examples.py", + ) # Build a local Docker image that extends DOCKER_IMAGE with a JRE. # We do this explicitly so testcontainers' DockerCompose.start() does not @@ -347,17 +386,47 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir): check=True, ) - # Coordinator registry: maps the logical name "java-jdk" to JavaCoordinator. - # Queue mapping: routes tasks on the "java" Celery queue to "java-jdk". + # Coordinator registry: two JavaCoordinators on the same worker image, + # serving different bundles on different queues. + # java-jdk -> the lean Java example bundle (queue "java") + # scala-jdk -> the Scala Spark bundle (queue "scala"), which needs Spark's + # Java 17 module openings, a small driver heap, and a longer + # startup timeout for its large dependency classpath. coordinator_config = json.dumps( { "java-jdk": { "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", - "kwargs": {"jars_root": ["/opt/airflow/jars"]}, - } + "kwargs": {"jars_root": ["/opt/airflow/java-jars"]}, + }, + "scala-jdk": { + "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", + "kwargs": { + "jars_root": ["/opt/airflow/scala-jars"], + # Pin the entry point so Spark's large dependency classpath + # cannot make Main-Class discovery ambiguous. + "main_class": "org.apache.airflow.example.ScalaSparkBundleBuilder", + "jvm_args": [ + "-Xmx512m", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", + ], + # Booting a JVM over the large Spark classpath takes longer + # than the 10 s default before it connects back. + "task_startup_timeout": 60.0, + }, + }, } ) - queue_to_coordinator = json.dumps({"java": "java-jdk"}) + queue_to_coordinator = json.dumps({"java": "java-jdk", "scala": "scala-jdk"}) # Connection expected by the Java example bundle tasks. The JSON form # covers all connection fields, in particular the port: wire integers diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py index ae3b64a94faf6..0f347b8cfa19f 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py @@ -58,6 +58,11 @@ JAVA_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "java.yml" JAVA_DOCKERFILE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "Dockerfile.java" +# Scala Spark example is a separate Java-SDK bundle, served by its own +# coordinator/queue but exercised within the same java_sdk E2E mode. +SCALA_SPARK_EXAMPLE_DAGS_PATH = JAVA_SDK_ROOT_PATH / "scala_spark_example" / "src" / "resources" / "dags" +SCALA_SPARK_EXAMPLE_LIBS_PATH = JAVA_SDK_ROOT_PATH / "scala_spark_example" / "build" / "bundle" + # Go SDK E2E test paths GO_SDK_ROOT_PATH = AIRFLOW_ROOT_PATH / "go-sdk" GO_SDK_DAGS_PATH = GO_SDK_ROOT_PATH / "dags" diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py index 709987cb2c744..c2b0634643ea3 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py @@ -241,3 +241,63 @@ def test_application_logs_preserve_their_level(self): assert str(record.get("level", "")).lower() == "info", ( f"application INFO log should keep its level, got {record.get('level')!r}; record: {record}" ) + + +# Each Scala task spins up its own local SparkSession; allow generous time for +# three sequential JVM + Spark startups in a constrained CI container. +_SPARK_TASK_TIMEOUT = 1200 + +# Derived from the fixed in-memory dataset in ScalaSparkExample.scala: 5 sales +# rows whose amounts (100+200+300+150+250) sum to 1000. +_SPARK_EXPECTED_ROW_COUNT = 5 +_SPARK_EXPECTED_TOTAL_REVENUE = 1000 + + +class TestJavaSDKScalaSparkExample: + """Verify the Scala + Apache Spark ETL example bundle executes correctly.""" + + airflow_client = AirflowClient() + + def test_spark_etl_pipeline(self): + """The three Scala Spark stubs run in order and pass scalar results via XCom. + + Each runs in its own JVM through ``JavaCoordinator`` with real Spark. + """ + resp = self.airflow_client.trigger_dag( + "scala_spark_example", + json={"logical_date": datetime.now(timezone.utc).isoformat()}, + ) + run_id = resp["dag_run_id"] + + dag_state = self.airflow_client.wait_for_dag_run( + dag_id="scala_spark_example", + run_id=run_id, + timeout=_SPARK_TASK_TIMEOUT, + ) + + ti_resp = self.airflow_client.get_task_instances(dag_id="scala_spark_example", run_id=run_id) + ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", [])} + + for task_id in ("spark_extract", "spark_transform", "spark_load"): + assert ti_map.get(task_id, {}).get("state") == "success", ( + f"Scala Spark {task_id!r} task did not succeed.\n" + f" task state : {ti_map.get(task_id, {}).get('state')!r}\n" + f" dag state : {dag_state!r}\n" + f" all tasks : { {k: v.get('state') for k, v in ti_map.items()} }" + ) + + extract_xcom = self.airflow_client.get_xcom_value( + dag_id="scala_spark_example", task_id="spark_extract", run_id=run_id, key="return_value" + ) + assert extract_xcom.get("value") == _SPARK_EXPECTED_ROW_COUNT, ( + f"Expected spark_extract to push row count {_SPARK_EXPECTED_ROW_COUNT}, " + f"got {extract_xcom.get('value')!r}" + ) + + load_xcom = self.airflow_client.get_xcom_value( + dag_id="scala_spark_example", task_id="spark_load", run_id=run_id, key="return_value" + ) + assert load_xcom.get("value") == _SPARK_EXPECTED_TOTAL_REVENUE, ( + f"Expected spark_load to return total revenue {_SPARK_EXPECTED_TOTAL_REVENUE}, " + f"got {load_xcom.get('value')!r}" + ) diff --git a/java-sdk/scala_spark_example/README.md b/java-sdk/scala_spark_example/README.md new file mode 100644 index 0000000000000..352672d7d2c42 --- /dev/null +++ b/java-sdk/scala_spark_example/README.md @@ -0,0 +1,44 @@ + + +# Scala Spark example bundle + +A Scala + Apache Spark bundle for the Java SDK, exercised by the `java_sdk` +end-to-end test. It shows a non-Java JVM language driving Spark from a +`@task.stub` task and routing Log4j 2 logs into Airflow via `airflow-sdk-log4j2`. + +The `scala_spark_example` Dag chains three tasks, each running in its own JVM +with a local `SparkSession` and passing scalar results over XCom: + +- `spark_extract` - builds a DataFrame, pushes its row count. +- `spark_transform` - aggregates total revenue. +- `spark_load` - returns the persisted total. + +## Build + +```bash +# From java-sdk/: publish the SDK to the local Maven repository first. +./gradlew publishToMavenLocal -PskipSigning=true + +cd scala_spark_example +../gradlew bundle +``` + +`fatJar` is disabled, so `build/bundle/` holds the bundle JAR plus every runtime +JAR (Spark included) — copy it into a Java coordinator's `jars_root`. diff --git a/java-sdk/scala_spark_example/build.gradle b/java-sdk/scala_spark_example/build.gradle new file mode 100644 index 0000000000000..34ede7dd580fe --- /dev/null +++ b/java-sdk/scala_spark_example/build.gradle @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("scala") + id("org.apache.airflow.sdk") version "${projectVersion}" +} + +repositories { + mavenLocal() // Not needed for your own project. + mavenCentral() +} + +dependencies { + implementation("org.apache.airflow:airflow-sdk:${projectVersion}") + + // Routes this bundle's Log4j 2 logging (and Spark's own) into Airflow task logs. + implementation("org.apache.airflow:airflow-sdk-log4j2:${projectVersion}") + + // Spark 3.5.x ships its 2.13 artifacts built against Scala 2.13.8. + implementation("org.scala-lang:scala-library:2.13.8") + implementation("org.apache.spark:spark-sql_2.13:3.5.8") + + // Spark pins Log4j 2 to 2.20.0 and airflow-sdk-log4j2 to 2.26.0; align the + // whole family onto one version so the AirflowAppender's api/core agree. + implementation(platform("org.apache.logging.log4j:log4j-bom:2.26.0")) +} + +java { + // Spark 3.5.x supports Java 8/11/17; match the Java 17 worker runtime. + toolchain { + languageVersion.set(JavaLanguageVersion.of(17)) + } +} + +sourceSets { + main { + scala.srcDir("src/scala") + } +} + +airflowBundle { + mainClass = "org.apache.airflow.example.ScalaSparkBundleBuilder" + // Spark drags in hundreds of dependency JARs; a thin bundle copies every + // runtime JAR alongside the bundle JAR (JavaCoordinator globs them all) + // rather than shadow-merging, which collides on Log4j2Plugins.dat. + fatJar = false +} diff --git a/java-sdk/scala_spark_example/gradle.properties b/java-sdk/scala_spark_example/gradle.properties new file mode 120000 index 0000000000000..7677fb73be823 --- /dev/null +++ b/java-sdk/scala_spark_example/gradle.properties @@ -0,0 +1 @@ +../gradle.properties \ No newline at end of file diff --git a/java-sdk/scala_spark_example/settings.gradle b/java-sdk/scala_spark_example/settings.gradle new file mode 100644 index 0000000000000..1d3179d8af02d --- /dev/null +++ b/java-sdk/scala_spark_example/settings.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// This is only needed since we want to route the plugin to the local build. +// You don't need it in your own project. +pluginManagement { + repositories { + mavenLocal() + gradlePluginPortal() + } +} + +plugins { + id("org.gradle.toolchains.foojay-resolver-convention") version "0.10.0" +} + +rootProject.name = "airflow-java-sdk-scala-spark-example" diff --git a/java-sdk/scala_spark_example/src/main/resources/log4j2.xml b/java-sdk/scala_spark_example/src/main/resources/log4j2.xml new file mode 100644 index 0000000000000..9eaab8293cc00 --- /dev/null +++ b/java-sdk/scala_spark_example/src/main/resources/log4j2.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + diff --git a/java-sdk/scala_spark_example/src/resources/dags/scala_spark_examples.py b/java-sdk/scala_spark_example/src/resources/dags/scala_spark_examples.py new file mode 100644 index 0000000000000..4d9855ddd5edf --- /dev/null +++ b/java-sdk/scala_spark_example/src/resources/dags/scala_spark_examples.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflow.sdk import dag, task + + +@task.stub(queue="scala") +def spark_extract(): ... + + +@task.stub(queue="scala") +def spark_transform(): ... + + +@task.stub(queue="scala") +def spark_load(): ... + + +@dag(dag_id="scala_spark_example") +def scala_spark_example(): + spark_extract() >> spark_transform() >> spark_load() + + +scala_spark_example() diff --git a/java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala b/java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala new file mode 100644 index 0000000000000..8f501610743e2 --- /dev/null +++ b/java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.example + +import org.apache.airflow.sdk.{Bundle, BundleBuilder, Client, Context, Dag, Server, Task} +import org.apache.logging.log4j.{LogManager, Logger} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.sum + +/** + * A Scala + Apache Spark ETL bundle for the Java SDK. Each task runs in its own + * JVM with a fresh local `SparkSession` and passes scalar results over XCom. + * See README.md for the overview. + */ + +/** Deterministic in-memory sales dataset so downstream assertions are stable. */ +private object SalesData { + // (id, category, amount) + val rows: Seq[(Int, String, Long)] = Seq( + (1, "electronics", 100L), + (2, "books", 200L), + (3, "electronics", 300L), + (4, "clothing", 150L), + (5, "books", 250L), + ) + + val rowCount: Long = rows.size.toLong // 5 + val totalRevenue: Long = rows.map(_._3).sum // 1000 +} + +private object SparkEtl { + val DagId = "scala_spark_example" + val ExtractTaskId = "spark_extract" + val TransformTaskId = "spark_transform" + val LoadTaskId = "spark_load" + + // Loopback driver + no UI so Spark binds no ports and skips hostname lookup. + def newSession(appName: String): SparkSession = + SparkSession + .builder() + .appName(appName) + .master("local[1]") + .config("spark.ui.enabled", "false") + .config("spark.sql.shuffle.partitions", "1") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .getOrCreate() + + // 3-arg getXCom(key, dagId, taskId) reads the upstream return value; wire + // integers arrive boxed. + def readUpstreamLong(client: Client, context: Context, taskId: String): Long = + client.getXCom("return_value", context.dagRun.dagId, taskId).asInstanceOf[Number].longValue() + + def pushLong(client: Client, value: Long): Unit = + client.setXCom("return_value", java.lang.Long.valueOf(value)) +} + +/** Builds the Spark DataFrame and reports how many records were extracted. */ +class SparkExtract extends Task { + private val log: Logger = LogManager.getLogger(classOf[SparkExtract]) + + override def execute(context: Context, client: Client): Unit = { + log.info("Starting Scala Spark extract task") + val spark = SparkEtl.newSession("scala-spark-etl-extract") + try { + import spark.implicits._ + val raw = SalesData.rows.toDF("id", "category", "amount") + val count = raw.count() + log.info("Extracted {} sales records with Spark", java.lang.Long.valueOf(count)) + SparkEtl.pushLong(client, count) + } finally { + spark.stop() + } + } +} + +/** Aggregates total revenue across the extracted records with Spark. */ +class SparkTransform extends Task { + private val log: Logger = LogManager.getLogger(classOf[SparkTransform]) + + override def execute(context: Context, client: Client): Unit = { + val extractedCount = SparkEtl.readUpstreamLong(client, context, SparkEtl.ExtractTaskId) + log.info("Transform received {} records from extract", java.lang.Long.valueOf(extractedCount)) + + val spark = SparkEtl.newSession("scala-spark-etl-transform") + try { + import spark.implicits._ + val raw = SalesData.rows.toDF("id", "category", "amount") + val total = raw.agg(sum($"amount")).first().getLong(0) + log.info("Computed total revenue {} with Spark", java.lang.Long.valueOf(total)) + SparkEtl.pushLong(client, total) + } finally { + spark.stop() + } + } +} + +/** "Loads" the aggregated revenue, returning the persisted value. */ +class SparkLoad extends Task { + private val log: Logger = LogManager.getLogger(classOf[SparkLoad]) + + override def execute(context: Context, client: Client): Unit = { + val total = SparkEtl.readUpstreamLong(client, context, SparkEtl.TransformTaskId) + log.info("Loading aggregated revenue {}", java.lang.Long.valueOf(total)) + + val spark = SparkEtl.newSession("scala-spark-etl-load") + try { + import spark.implicits._ + val loaded = Seq(("total_revenue", total)).toDF("metric", "value") + val value = loaded.first().getLong(1) + log.info("Load complete; persisted total_revenue={}", java.lang.Long.valueOf(value)) + SparkEtl.pushLong(client, value) + } finally { + spark.stop() + } + } +} + +object ScalaSparkExample { + def build(): Dag = + new Dag(SparkEtl.DagId) + .addTask(SparkEtl.ExtractTaskId, classOf[SparkExtract]) + .addTask(SparkEtl.TransformTaskId, classOf[SparkTransform]) + .addTask(SparkEtl.LoadTaskId, classOf[SparkLoad]) +} + +/** Bundle entry point served to Airflow's Java coordinator. */ +object ScalaSparkBundleBuilder extends BundleBuilder { + override def getDags(): java.lang.Iterable[Dag] = java.util.List.of(ScalaSparkExample.build()) + + def main(args: Array[String]): Unit = + Server.create(args).serve(new Bundle(getDags())) +} From 00c5ae29050b938c7836d8398196acca72ab2eb6 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Wed, 24 Jun 2026 19:28:29 +0900 Subject: [PATCH 2/4] Avoid copying jars for docker build --- airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py index 895fc1ff5289c..6e1876b689833 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py @@ -366,6 +366,12 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir): tmp_dir / "dags" / "scala_spark_examples.py", ) + # Keep the bundle JARs out of the build context: Dockerfile.java only adds a + # JRE and copies nothing from the context, so without this docker build would + # tar and stream the bundles (hundreds of MB of Spark JARs) to the daemon for + # nothing. The JARs reach the worker via the compose bind-mounts, not the image. + (tmp_dir / ".dockerignore").write_text("java-jars/\nscala-jars/\n") + # Build a local Docker image that extends DOCKER_IMAGE with a JRE. # We do this explicitly so testcontainers' DockerCompose.start() does not # need to handle the build itself (which avoids --no-build vs --build flag From f46122ace1e7a6a21b150b0d55dd54340f6e6e57 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 25 Jun 2026 15:08:32 +0900 Subject: [PATCH 3/4] Verify the Scala Spark transform stage in the Java-SDK e2e test The e2e test asserted only the extract and load XComs, so the aggregation stage in the middle of the pipeline could regress without the test noticing. Assert its XCom as well, drop the unused dataset constants, and note why the transform reads the upstream count it does not reuse. --- .../java_sdk_tests/test_java_sdk_dag.py | 13 +++++++++++-- .../apache/airflow/example/ScalaSparkExample.scala | 5 ++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py index c2b0634643ea3..1e26a872ef5ca 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py @@ -247,8 +247,9 @@ def test_application_logs_preserve_their_level(self): # three sequential JVM + Spark startups in a constrained CI container. _SPARK_TASK_TIMEOUT = 1200 -# Derived from the fixed in-memory dataset in ScalaSparkExample.scala: 5 sales -# rows whose amounts (100+200+300+150+250) sum to 1000. +# Mirror the fixed dataset that is the single source of truth in +# ScalaSparkExample.scala (``SalesData.rows``): 5 sales rows whose amounts +# (100+200+300+150+250) sum to 1000. Keep these in sync if that dataset changes. _SPARK_EXPECTED_ROW_COUNT = 5 _SPARK_EXPECTED_TOTAL_REVENUE = 1000 @@ -294,6 +295,14 @@ def test_spark_etl_pipeline(self): f"got {extract_xcom.get('value')!r}" ) + transform_xcom = self.airflow_client.get_xcom_value( + dag_id="scala_spark_example", task_id="spark_transform", run_id=run_id, key="return_value" + ) + assert transform_xcom.get("value") == _SPARK_EXPECTED_TOTAL_REVENUE, ( + f"Expected spark_transform to aggregate total revenue {_SPARK_EXPECTED_TOTAL_REVENUE}, " + f"got {transform_xcom.get('value')!r}" + ) + load_xcom = self.airflow_client.get_xcom_value( dag_id="scala_spark_example", task_id="spark_load", run_id=run_id, key="return_value" ) diff --git a/java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala b/java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala index 8f501610743e2..ae779bf02135a 100644 --- a/java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala +++ b/java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample.scala @@ -40,9 +40,6 @@ private object SalesData { (4, "clothing", 150L), (5, "books", 250L), ) - - val rowCount: Long = rows.size.toLong // 5 - val totalRevenue: Long = rows.map(_._3).sum // 1000 } private object SparkEtl { @@ -96,6 +93,8 @@ class SparkTransform extends Task { private val log: Logger = LogManager.getLogger(classOf[SparkTransform]) override def execute(context: Context, client: Client): Unit = { + // Read the upstream count only to demonstrate XCom passing between JVM + // tasks; the aggregation below recomputes from the source dataset. val extractedCount = SparkEtl.readUpstreamLong(client, context, SparkEtl.ExtractTaskId) log.info("Transform received {} records from extract", java.lang.Long.valueOf(extractedCount)) From 05770a1946def4d3b7fa433594e32316b40d978d Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 25 Jun 2026 16:06:09 +0900 Subject: [PATCH 4/4] Carry Spark's full Java 17 module options in the Scala Spark e2e test The Scala Spark coordinator launched the bundle JVM with a hand-curated subset of Spark's Java 17 module openings. Spark normally injects its full default set through its own launcher, which the raw JavaCoordinator launch bypasses. The subset is enough for the toy aggregation but omits openings that real Spark code paths need (Kryo reflection, off-heap cleaner, charset decoding, Kerberos), so the example would mislead anyone copying it for a non-trivial Spark workload. Mirror Spark 3.5.8's full default module option set instead. --- .../tests/airflow_e2e_tests/conftest.py | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py index 6e1876b689833..f150b6237dfb4 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py @@ -249,6 +249,34 @@ def _setup_xcom_object_storage_integration(dot_env_file, tmp_dir): os.environ["ENV_FILE_PATH"] = str(dot_env_file) +# Spark normally injects these JVM options through its own launcher; the raw +# JavaCoordinator launch bypasses that, so the bundle must carry them itself. +# This mirrors org.apache.spark.launcher.JavaModuleOptions.defaultModuleOptions() +# verbatim for the pinned Spark 3.5.8 (java-sdk/scala_spark_example/build.gradle). +# A partial set passes the toy aggregation here but breaks real Spark code paths +# (Kryo -> java.lang.reflect, off-heap cleaner -> jdk.internal.ref, charset -> +# sun.nio.cs, Kerberos -> sun.security.krb5); keep it in sync if Spark is bumped. +_SPARK_JAVA_MODULE_OPTIONS = [ + "-XX:+IgnoreUnrecognizedVMOptions", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", + "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", + "--add-opens=java.base/java.io=ALL-UNNAMED", + "--add-opens=java.base/java.net=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", + "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", + "--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED", + "--add-opens=java.base/sun.security.action=ALL-UNNAMED", + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", + "--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED", + "-Djdk.reflect.useDirectMethodHandle=false", +] + + def _setup_java_sdk_integration(dot_env_file, tmp_dir): """Set up the java_sdk E2E test mode. @@ -411,20 +439,8 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir): # Pin the entry point so Spark's large dependency classpath # cannot make Main-Class discovery ambiguous. "main_class": "org.apache.airflow.example.ScalaSparkBundleBuilder", - "jvm_args": [ - "-Xmx512m", - "--add-opens=java.base/java.lang=ALL-UNNAMED", - "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", - "--add-opens=java.base/java.io=ALL-UNNAMED", - "--add-opens=java.base/java.net=ALL-UNNAMED", - "--add-opens=java.base/java.nio=ALL-UNNAMED", - "--add-opens=java.base/java.util=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED", - "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED", - "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", - "--add-opens=java.base/sun.security.action=ALL-UNNAMED", - "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", - ], + # Small driver heap plus Spark's full Java 17 module openings. + "jvm_args": ["-Xmx512m", *_SPARK_JAVA_MODULE_OPTIONS], # Booting a JVM over the large Spark classpath takes longer # than the 10 s default before it connects back. "task_startup_timeout": 60.0,