Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ repos:
^java-sdk/gradlew$|
^java-sdk/gradlew\.bat$|
^java-sdk/gradle|
^java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample\.scala$|

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which word in the file triggers this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The master("local[1]") call triggers the inclusive-language.

^task-sdk/tests/|
^.*changelog\.(rst|txt)$|
^.*CHANGELOG\.(rst|txt)$|
Expand Down
5 changes: 4 additions & 1 deletion airflow-e2e-tests/docker/Dockerfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

# Extends the standard Airflow image with a headless JRE so JavaCoordinator
# can spawn JVM subprocesses for @task.stub tasks.
#
# Pin Java 17 (rather than default-jre-headless): the Scala Spark example runs
# Apache Spark 3.5.x, which supports Java 8/11/17 but not Java 21.
ARG DOCKER_IMAGE
FROM ${DOCKER_IMAGE}

USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends default-jre-headless \
&& apt-get install -y --no-install-recommends openjdk-17-jre-headless \
&& rm -rf /var/lib/apt/lists/*
USER airflow
12 changes: 7 additions & 5 deletions airflow-e2e-tests/docker/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
#
# Replaces the stock airflow-worker image with one that has a JRE installed
# (built by conftest._setup_java_sdk_integration via Dockerfile.java), mounts
# the pre-built example bundle JARs under /opt/airflow/jars, and configures
# the worker to consume the "java" Celery queue where @task.stub tasks are
# routed.
# the pre-built bundle JARs (the Java example under /opt/airflow/java-jars and
# the Scala Spark example under /opt/airflow/scala-jars), and configures the
# worker to consume the "java" and "scala" Celery queues where @task.stub tasks
# are routed.
---
services:
airflow-worker:
image: airflow-java-worker
volumes:
- ./jars:/opt/airflow/jars:ro
command: celery worker -q java,default
- ./java-jars:/opt/airflow/java-jars:ro
- ./scala-jars:/opt/airflow/scala-jars:ro
command: celery worker -q java,scala,default
111 changes: 101 additions & 10 deletions airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import subprocess
from datetime import datetime
from pathlib import Path
from shutil import copyfile, copytree
from shutil import copyfile, copytree, rmtree

import pytest
from rich.console import Console
Expand Down Expand Up @@ -54,6 +54,8 @@
OPENSEARCH_PATH,
PROVIDERS_MOUNT_CONTAINER_PATH,
PROVIDERS_ROOT_PATH,
SCALA_SPARK_EXAMPLE_DAGS_PATH,
SCALA_SPARK_EXAMPLE_LIBS_PATH,
TEST_REPORT_FILE,
XCOM_BUCKET,
)
Expand Down Expand Up @@ -247,6 +249,34 @@ def _setup_xcom_object_storage_integration(dot_env_file, tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)


# Spark normally injects these JVM options through its own launcher; the raw
# JavaCoordinator launch bypasses that, so the bundle must carry them itself.
# This mirrors org.apache.spark.launcher.JavaModuleOptions.defaultModuleOptions()
# verbatim for the pinned Spark 3.5.8 (java-sdk/scala_spark_example/build.gradle).
# A partial set passes the toy aggregation here but breaks real Spark code paths
# (Kryo -> java.lang.reflect, off-heap cleaner -> jdk.internal.ref, charset ->
# sun.nio.cs, Kerberos -> sun.security.krb5); keep it in sync if Spark is bumped.
_SPARK_JAVA_MODULE_OPTIONS = [
"-XX:+IgnoreUnrecognizedVMOptions",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
"--add-opens=java.base/java.io=ALL-UNNAMED",
"--add-opens=java.base/java.net=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
"--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
"--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.cs=ALL-UNNAMED",
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED",
"-Djdk.reflect.useDirectMethodHandle=false",
]


def _setup_java_sdk_integration(dot_env_file, tmp_dir):
"""Set up the java_sdk E2E test mode.

Expand Down Expand Up @@ -290,6 +320,12 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
],
check=True,
)
# TODO: Make the following build steps parallel
# The Gradle `bundle` task is a Copy that never prunes its destination, so
# JARs from an earlier build linger. A stale dependency JAR with its own
# Main-Class would make JavaCoordinator's Main-Class discovery ambiguous, so
# start each bundle from an empty directory.
rmtree(JAVA_SDK_EXAMPLE_LIBS_PATH, ignore_errors=True)
console.print("[yellow]Building Java SDK example bundle (eclipse-temurin:17-jdk)...")
subprocess.run(
[
Expand All @@ -315,17 +351,54 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
],
check=True,
)
rmtree(SCALA_SPARK_EXAMPLE_LIBS_PATH, ignore_errors=True)
console.print("[yellow]Building Scala Spark example bundle (eclipse-temurin:17-jdk)...")
subprocess.run(
[
"docker",
"run",
"--rm",
"--user",
f"{os.getuid()}:{os.getgid()}",
"-e",
"GRADLE_USER_HOME=/repo/java-sdk/.gradle",
"-e",
"HOME=/workspace-home",
"-v",
f"{JAVA_SDK_MAVEN_CACHE_PATH}:/workspace-home/.m2",
"-v",
f"{AIRFLOW_ROOT_PATH}:/repo",
"-w",
"/repo/java-sdk/scala_spark_example",
"eclipse-temurin:17-jdk",
"../gradlew",
"bundle",
"--no-daemon",
],
check=True,
)

# Copy compose override and Dockerfile into the temp directory.
copyfile(JAVA_COMPOSE_PATH, tmp_dir / "java.yml")
copyfile(JAVA_DOCKERFILE_PATH, tmp_dir / "Dockerfile.java")

# Copy all JARs from installDist output so the compose bind-mount ./jars
# gives the worker everything JavaCoordinator needs to build a classpath.
copytree(JAVA_SDK_EXAMPLE_LIBS_PATH, tmp_dir / "jars")
# Copy each bundle's JARs into its own directory; the compose bind-mounts
# expose them to the worker, and each JavaCoordinator globs its own dir.
copytree(JAVA_SDK_EXAMPLE_LIBS_PATH, tmp_dir / "java-jars")
copytree(SCALA_SPARK_EXAMPLE_LIBS_PATH, tmp_dir / "scala-jars")

# Copy the Java SDK example Dag file so Airflow can discover it.
# Copy the Java SDK example Dag files so Airflow can discover them.
copyfile(JAVA_SDK_EXAMPLE_DAGS_PATH / "java_examples.py", tmp_dir / "dags" / "java_examples.py")
copyfile(
SCALA_SPARK_EXAMPLE_DAGS_PATH / "scala_spark_examples.py",
tmp_dir / "dags" / "scala_spark_examples.py",
)

# Keep the bundle JARs out of the build context: Dockerfile.java only adds a
# JRE and copies nothing from the context, so without this docker build would
# tar and stream the bundles (hundreds of MB of Spark JARs) to the daemon for
# nothing. The JARs reach the worker via the compose bind-mounts, not the image.
(tmp_dir / ".dockerignore").write_text("java-jars/\nscala-jars/\n")

# Build a local Docker image that extends DOCKER_IMAGE with a JRE.
# We do this explicitly so testcontainers' DockerCompose.start() does not
Expand All @@ -347,17 +420,35 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
check=True,
)

# Coordinator registry: maps the logical name "java-jdk" to JavaCoordinator.
# Queue mapping: routes tasks on the "java" Celery queue to "java-jdk".
# Coordinator registry: two JavaCoordinators on the same worker image,
# serving different bundles on different queues.
# java-jdk -> the lean Java example bundle (queue "java")
# scala-jdk -> the Scala Spark bundle (queue "scala"), which needs Spark's
# Java 17 module openings, a small driver heap, and a longer
# startup timeout for its large dependency classpath.
coordinator_config = json.dumps(
{
"java-jdk": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"jars_root": ["/opt/airflow/jars"]},
}
"kwargs": {"jars_root": ["/opt/airflow/java-jars"]},
},
"scala-jdk": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {
"jars_root": ["/opt/airflow/scala-jars"],
# Pin the entry point so Spark's large dependency classpath
# cannot make Main-Class discovery ambiguous.
"main_class": "org.apache.airflow.example.ScalaSparkBundleBuilder",
# Small driver heap plus Spark's full Java 17 module openings.
"jvm_args": ["-Xmx512m", *_SPARK_JAVA_MODULE_OPTIONS],
# Booting a JVM over the large Spark classpath takes longer
# than the 10 s default before it connects back.
"task_startup_timeout": 60.0,
},
},
}
)
queue_to_coordinator = json.dumps({"java": "java-jdk"})
queue_to_coordinator = json.dumps({"java": "java-jdk", "scala": "scala-jdk"})

# Connection expected by the Java example bundle tasks. The JSON form
# covers all connection fields, in particular the port: wire integers
Expand Down
5 changes: 5 additions & 0 deletions airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
JAVA_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "java.yml"
JAVA_DOCKERFILE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "Dockerfile.java"

# Scala Spark example is a separate Java-SDK bundle, served by its own
# coordinator/queue but exercised within the same java_sdk E2E mode.
SCALA_SPARK_EXAMPLE_DAGS_PATH = JAVA_SDK_ROOT_PATH / "scala_spark_example" / "src" / "resources" / "dags"
SCALA_SPARK_EXAMPLE_LIBS_PATH = JAVA_SDK_ROOT_PATH / "scala_spark_example" / "build" / "bundle"

# Go SDK E2E test paths
GO_SDK_ROOT_PATH = AIRFLOW_ROOT_PATH / "go-sdk"
GO_SDK_DAGS_PATH = GO_SDK_ROOT_PATH / "dags"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,72 @@ def test_application_logs_preserve_their_level(self):
assert str(record.get("level", "")).lower() == "info", (
f"application INFO log should keep its level, got {record.get('level')!r}; record: {record}"
)


# Each Scala task spins up its own local SparkSession; allow generous time for
# three sequential JVM + Spark startups in a constrained CI container.
_SPARK_TASK_TIMEOUT = 1200

# Mirror the fixed dataset that is the single source of truth in
# ScalaSparkExample.scala (``SalesData.rows``): 5 sales rows whose amounts
# (100+200+300+150+250) sum to 1000. Keep these in sync if that dataset changes.
_SPARK_EXPECTED_ROW_COUNT = 5
_SPARK_EXPECTED_TOTAL_REVENUE = 1000


class TestJavaSDKScalaSparkExample:
"""Verify the Scala + Apache Spark ETL example bundle executes correctly."""

airflow_client = AirflowClient()

def test_spark_etl_pipeline(self):
"""The three Scala Spark stubs run in order and pass scalar results via XCom.

Each runs in its own JVM through ``JavaCoordinator`` with real Spark.
"""
resp = self.airflow_client.trigger_dag(
"scala_spark_example",
json={"logical_date": datetime.now(timezone.utc).isoformat()},
)
run_id = resp["dag_run_id"]

dag_state = self.airflow_client.wait_for_dag_run(
dag_id="scala_spark_example",
run_id=run_id,
timeout=_SPARK_TASK_TIMEOUT,
)

ti_resp = self.airflow_client.get_task_instances(dag_id="scala_spark_example", run_id=run_id)
ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", [])}

for task_id in ("spark_extract", "spark_transform", "spark_load"):
assert ti_map.get(task_id, {}).get("state") == "success", (
f"Scala Spark {task_id!r} task did not succeed.\n"
f" task state : {ti_map.get(task_id, {}).get('state')!r}\n"
f" dag state : {dag_state!r}\n"
f" all tasks : { {k: v.get('state') for k, v in ti_map.items()} }"
)

extract_xcom = self.airflow_client.get_xcom_value(
dag_id="scala_spark_example", task_id="spark_extract", run_id=run_id, key="return_value"
)
assert extract_xcom.get("value") == _SPARK_EXPECTED_ROW_COUNT, (
f"Expected spark_extract to push row count {_SPARK_EXPECTED_ROW_COUNT}, "
f"got {extract_xcom.get('value')!r}"
)

transform_xcom = self.airflow_client.get_xcom_value(
dag_id="scala_spark_example", task_id="spark_transform", run_id=run_id, key="return_value"
)
assert transform_xcom.get("value") == _SPARK_EXPECTED_TOTAL_REVENUE, (
f"Expected spark_transform to aggregate total revenue {_SPARK_EXPECTED_TOTAL_REVENUE}, "
f"got {transform_xcom.get('value')!r}"
)

load_xcom = self.airflow_client.get_xcom_value(
dag_id="scala_spark_example", task_id="spark_load", run_id=run_id, key="return_value"
)
assert load_xcom.get("value") == _SPARK_EXPECTED_TOTAL_REVENUE, (
f"Expected spark_load to return total revenue {_SPARK_EXPECTED_TOTAL_REVENUE}, "
f"got {load_xcom.get('value')!r}"
)
44 changes: 44 additions & 0 deletions java-sdk/scala_spark_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Scala Spark example bundle

A Scala + Apache Spark bundle for the Java SDK, exercised by the `java_sdk`
end-to-end test. It shows a non-Java JVM language driving Spark from a
`@task.stub` task and routing Log4j 2 logs into Airflow via `airflow-sdk-log4j2`.

The `scala_spark_example` Dag chains three tasks, each running in its own JVM
with a local `SparkSession` and passing scalar results over XCom:

- `spark_extract` - builds a DataFrame, pushes its row count.
- `spark_transform` - aggregates total revenue.
- `spark_load` - returns the persisted total.

## Build

```bash
# From java-sdk/: publish the SDK to the local Maven repository first.
./gradlew publishToMavenLocal -PskipSigning=true

cd scala_spark_example
../gradlew bundle
```

`fatJar` is disabled, so `build/bundle/` holds the bundle JAR plus every runtime
JAR (Spark included) — copy it into a Java coordinator's `jars_root`.
Loading
Loading