Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ repos:
^java-sdk/gradlew$|
^java-sdk/gradlew\.bat$|
^java-sdk/gradle|
^java-sdk/scala_spark_example/src/scala/org/apache/airflow/example/ScalaSparkExample\.scala$|

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which word in the file triggers this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The master("local[1]") call triggers the inclusive-language.

^task-sdk/tests/|
^.*changelog\.(rst|txt)$|
^.*CHANGELOG\.(rst|txt)$|
Expand Down
5 changes: 4 additions & 1 deletion airflow-e2e-tests/docker/Dockerfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

# Extends the standard Airflow image with a headless JRE so JavaCoordinator
# can spawn JVM subprocesses for @task.stub tasks.
#
# Pin Java 17 (rather than default-jre-headless): the Scala Spark example runs
# Apache Spark 3.5.x, which supports Java 8/11/17 but not Java 21.
ARG DOCKER_IMAGE
FROM ${DOCKER_IMAGE}

USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends default-jre-headless \
&& apt-get install -y --no-install-recommends openjdk-17-jre-headless \
&& rm -rf /var/lib/apt/lists/*
USER airflow
12 changes: 7 additions & 5 deletions airflow-e2e-tests/docker/java.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
#
# Replaces the stock airflow-worker image with one that has a JRE installed
# (built by conftest._setup_java_sdk_integration via Dockerfile.java), mounts
# the pre-built example bundle JARs under /opt/airflow/jars, and configures
# the worker to consume the "java" Celery queue where @task.stub tasks are
# routed.
# the pre-built bundle JARs (the Java example under /opt/airflow/java-jars and
# the Scala Spark example under /opt/airflow/scala-jars), and configures the
# worker to consume the "java" and "scala" Celery queues where @task.stub tasks
# are routed.
---
services:
airflow-worker:
image: airflow-java-worker
volumes:
- ./jars:/opt/airflow/jars:ro
command: celery worker -q java,default
- ./java-jars:/opt/airflow/java-jars:ro
- ./scala-jars:/opt/airflow/scala-jars:ro
command: celery worker -q java,scala,default
95 changes: 85 additions & 10 deletions airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import subprocess
from datetime import datetime
from pathlib import Path
from shutil import copyfile, copytree
from shutil import copyfile, copytree, rmtree

import pytest
from rich.console import Console
Expand Down Expand Up @@ -54,6 +54,8 @@
OPENSEARCH_PATH,
PROVIDERS_MOUNT_CONTAINER_PATH,
PROVIDERS_ROOT_PATH,
SCALA_SPARK_EXAMPLE_DAGS_PATH,
SCALA_SPARK_EXAMPLE_LIBS_PATH,
TEST_REPORT_FILE,
XCOM_BUCKET,
)
Expand Down Expand Up @@ -290,6 +292,12 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
],
check=True,
)
# TODO: Make the following build steps parallel
# The Gradle `bundle` task is a Copy that never prunes its destination, so
# JARs from an earlier build linger. A stale dependency JAR with its own
# Main-Class would make JavaCoordinator's Main-Class discovery ambiguous, so
# start each bundle from an empty directory.
rmtree(JAVA_SDK_EXAMPLE_LIBS_PATH, ignore_errors=True)
console.print("[yellow]Building Java SDK example bundle (eclipse-temurin:17-jdk)...")
subprocess.run(
[
Expand All @@ -315,17 +323,54 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
],
check=True,
)
rmtree(SCALA_SPARK_EXAMPLE_LIBS_PATH, ignore_errors=True)
console.print("[yellow]Building Scala Spark example bundle (eclipse-temurin:17-jdk)...")
subprocess.run(
[
"docker",
"run",
"--rm",
"--user",
f"{os.getuid()}:{os.getgid()}",
"-e",
"GRADLE_USER_HOME=/repo/java-sdk/.gradle",
"-e",
"HOME=/workspace-home",
"-v",
f"{JAVA_SDK_MAVEN_CACHE_PATH}:/workspace-home/.m2",
"-v",
f"{AIRFLOW_ROOT_PATH}:/repo",
"-w",
"/repo/java-sdk/scala_spark_example",
"eclipse-temurin:17-jdk",
"../gradlew",
"bundle",
"--no-daemon",
],
check=True,
)

# Copy compose override and Dockerfile into the temp directory.
copyfile(JAVA_COMPOSE_PATH, tmp_dir / "java.yml")
copyfile(JAVA_DOCKERFILE_PATH, tmp_dir / "Dockerfile.java")

# Copy all JARs from installDist output so the compose bind-mount ./jars
# gives the worker everything JavaCoordinator needs to build a classpath.
copytree(JAVA_SDK_EXAMPLE_LIBS_PATH, tmp_dir / "jars")
# Copy each bundle's JARs into its own directory; the compose bind-mounts
# expose them to the worker, and each JavaCoordinator globs its own dir.
copytree(JAVA_SDK_EXAMPLE_LIBS_PATH, tmp_dir / "java-jars")
copytree(SCALA_SPARK_EXAMPLE_LIBS_PATH, tmp_dir / "scala-jars")

# Copy the Java SDK example Dag file so Airflow can discover it.
# Copy the Java SDK example Dag files so Airflow can discover them.
copyfile(JAVA_SDK_EXAMPLE_DAGS_PATH / "java_examples.py", tmp_dir / "dags" / "java_examples.py")
copyfile(
SCALA_SPARK_EXAMPLE_DAGS_PATH / "scala_spark_examples.py",
tmp_dir / "dags" / "scala_spark_examples.py",
)

# Keep the bundle JARs out of the build context: Dockerfile.java only adds a
# JRE and copies nothing from the context, so without this docker build would
# tar and stream the bundles (hundreds of MB of Spark JARs) to the daemon for
# nothing. The JARs reach the worker via the compose bind-mounts, not the image.
(tmp_dir / ".dockerignore").write_text("java-jars/\nscala-jars/\n")

# Build a local Docker image that extends DOCKER_IMAGE with a JRE.
# We do this explicitly so testcontainers' DockerCompose.start() does not
Expand All @@ -347,17 +392,47 @@ def _setup_java_sdk_integration(dot_env_file, tmp_dir):
check=True,
)

# Coordinator registry: maps the logical name "java-jdk" to JavaCoordinator.
# Queue mapping: routes tasks on the "java" Celery queue to "java-jdk".
# Coordinator registry: two JavaCoordinators on the same worker image,
# serving different bundles on different queues.
# java-jdk -> the lean Java example bundle (queue "java")
# scala-jdk -> the Scala Spark bundle (queue "scala"), which needs Spark's
# Java 17 module openings, a small driver heap, and a longer
# startup timeout for its large dependency classpath.
coordinator_config = json.dumps(
{
"java-jdk": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"jars_root": ["/opt/airflow/jars"]},
}
"kwargs": {"jars_root": ["/opt/airflow/java-jars"]},
},
"scala-jdk": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {
"jars_root": ["/opt/airflow/scala-jars"],
# Pin the entry point so Spark's large dependency classpath
# cannot make Main-Class discovery ambiguous.
"main_class": "org.apache.airflow.example.ScalaSparkBundleBuilder",
"jvm_args": [
"-Xmx512m",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.io=ALL-UNNAMED",
"--add-opens=java.base/java.net=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/java.util=ALL-UNNAMED",
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED",
"--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
],
# Booting a JVM over the large Spark classpath takes longer
# than the 10 s default before it connects back.
"task_startup_timeout": 60.0,
},
},
}
)
queue_to_coordinator = json.dumps({"java": "java-jdk"})
queue_to_coordinator = json.dumps({"java": "java-jdk", "scala": "scala-jdk"})

# Connection expected by the Java example bundle tasks. The JSON form
# covers all connection fields, in particular the port: wire integers
Expand Down
5 changes: 5 additions & 0 deletions airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
JAVA_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "java.yml"
JAVA_DOCKERFILE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "Dockerfile.java"

# Scala Spark example is a separate Java-SDK bundle, served by its own
# coordinator/queue but exercised within the same java_sdk E2E mode.
SCALA_SPARK_EXAMPLE_DAGS_PATH = JAVA_SDK_ROOT_PATH / "scala_spark_example" / "src" / "resources" / "dags"
SCALA_SPARK_EXAMPLE_LIBS_PATH = JAVA_SDK_ROOT_PATH / "scala_spark_example" / "build" / "bundle"

# Go SDK E2E test paths
GO_SDK_ROOT_PATH = AIRFLOW_ROOT_PATH / "go-sdk"
GO_SDK_DAGS_PATH = GO_SDK_ROOT_PATH / "dags"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,63 @@ def test_application_logs_preserve_their_level(self):
assert str(record.get("level", "")).lower() == "info", (
f"application INFO log should keep its level, got {record.get('level')!r}; record: {record}"
)


# Each Scala task spins up its own local SparkSession; allow generous time for
# three sequential JVM + Spark startups in a constrained CI container.
_SPARK_TASK_TIMEOUT = 1200

# Derived from the fixed in-memory dataset in ScalaSparkExample.scala: 5 sales
# rows whose amounts (100+200+300+150+250) sum to 1000.
_SPARK_EXPECTED_ROW_COUNT = 5
_SPARK_EXPECTED_TOTAL_REVENUE = 1000


class TestJavaSDKScalaSparkExample:
"""Verify the Scala + Apache Spark ETL example bundle executes correctly."""

airflow_client = AirflowClient()

def test_spark_etl_pipeline(self):
"""The three Scala Spark stubs run in order and pass scalar results via XCom.

Each runs in its own JVM through ``JavaCoordinator`` with real Spark.
"""
resp = self.airflow_client.trigger_dag(
"scala_spark_example",
json={"logical_date": datetime.now(timezone.utc).isoformat()},
)
run_id = resp["dag_run_id"]

dag_state = self.airflow_client.wait_for_dag_run(
dag_id="scala_spark_example",
run_id=run_id,
timeout=_SPARK_TASK_TIMEOUT,
)

ti_resp = self.airflow_client.get_task_instances(dag_id="scala_spark_example", run_id=run_id)
ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", [])}

for task_id in ("spark_extract", "spark_transform", "spark_load"):
assert ti_map.get(task_id, {}).get("state") == "success", (
f"Scala Spark {task_id!r} task did not succeed.\n"
f" task state : {ti_map.get(task_id, {}).get('state')!r}\n"
f" dag state : {dag_state!r}\n"
f" all tasks : { {k: v.get('state') for k, v in ti_map.items()} }"
)

extract_xcom = self.airflow_client.get_xcom_value(
dag_id="scala_spark_example", task_id="spark_extract", run_id=run_id, key="return_value"
)
assert extract_xcom.get("value") == _SPARK_EXPECTED_ROW_COUNT, (
f"Expected spark_extract to push row count {_SPARK_EXPECTED_ROW_COUNT}, "
f"got {extract_xcom.get('value')!r}"
)

load_xcom = self.airflow_client.get_xcom_value(
dag_id="scala_spark_example", task_id="spark_load", run_id=run_id, key="return_value"
)
assert load_xcom.get("value") == _SPARK_EXPECTED_TOTAL_REVENUE, (
f"Expected spark_load to return total revenue {_SPARK_EXPECTED_TOTAL_REVENUE}, "
f"got {load_xcom.get('value')!r}"
)
44 changes: 44 additions & 0 deletions java-sdk/scala_spark_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Scala Spark example bundle

A Scala + Apache Spark bundle for the Java SDK, exercised by the `java_sdk`
end-to-end test. It shows a non-Java JVM language driving Spark from a
`@task.stub` task and routing Log4j 2 logs into Airflow via `airflow-sdk-log4j2`.

The `scala_spark_example` Dag chains three tasks, each running in its own JVM
with a local `SparkSession` and passing scalar results over XCom:

- `spark_extract` - builds a DataFrame, pushes its row count.
- `spark_transform` - aggregates total revenue.
- `spark_load` - returns the persisted total.

## Build

```bash
# From java-sdk/: publish the SDK to the local Maven repository first.
./gradlew publishToMavenLocal -PskipSigning=true

cd scala_spark_example
../gradlew bundle
```

`fatJar` is disabled, so `build/bundle/` holds the bundle JAR plus every runtime
JAR (Spark included) — copy it into a Java coordinator's `jars_root`.
64 changes: 64 additions & 0 deletions java-sdk/scala_spark_example/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

plugins {
id("scala")
id("org.apache.airflow.sdk") version "${projectVersion}"
}

repositories {
mavenLocal() // Not needed for your own project.
mavenCentral()
}

dependencies {
implementation("org.apache.airflow:airflow-sdk:${projectVersion}")

// Routes this bundle's Log4j 2 logging (and Spark's own) into Airflow task logs.
implementation("org.apache.airflow:airflow-sdk-log4j2:${projectVersion}")

// Spark 3.5.x ships its 2.13 artifacts built against Scala 2.13.8.
implementation("org.scala-lang:scala-library:2.13.8")
implementation("org.apache.spark:spark-sql_2.13:3.5.8")

// Spark pins Log4j 2 to 2.20.0 and airflow-sdk-log4j2 to 2.26.0; align the
// whole family onto one version so the AirflowAppender's api/core agree.
implementation(platform("org.apache.logging.log4j:log4j-bom:2.26.0"))
}

java {
// Spark 3.5.x supports Java 8/11/17; match the Java 17 worker runtime.
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}

sourceSets {
main {
scala.srcDir("src/scala")
}
}

airflowBundle {
mainClass = "org.apache.airflow.example.ScalaSparkBundleBuilder"
// Spark drags in hundreds of dependency JARs; a thin bundle copies every
// runtime JAR alongside the bundle JAR (JavaCoordinator globs them all)
// rather than shadow-merging, which collides on Log4j2Plugins.dat.
fatJar = false
}
1 change: 1 addition & 0 deletions java-sdk/scala_spark_example/gradle.properties
Loading
Loading