Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
!task-sdk/
!airflow-ctl/
!go-sdk/
!sdk/

# Add all "test" distributions
!tests
Expand Down
16 changes: 16 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,22 @@ repos:
pass_filenames: false
files: ^shared/.*|^scripts/ci/prek/check_shared_mypy_hooks\.py$
require_serial: true
- id: check-java-serialization-compatibility
name: Check Java SDK serialization compatibility
description: >-
Verify that the Java and Python SDKs produce identical DAG serialization
for all test cases defined in java-sdk/validation/serialization/test_dags.yaml
entry: ./scripts/ci/prek/check_java_serialization_compatibility.py
language: python
stages: ['manual']
pass_filenames: false
require_serial: true
files: >
(?x)
^java-sdk/sdk/src/.*\.(kt|java)$|
^java-sdk/validation/serialization/.*$|
^airflow-core/src/airflow/serialization/.*\.py$|
^task-sdk/src/airflow/sdk/definitions/.*\.py$
## ADD MOST PREK HOOK ABOVE THAT LINE
# The below prek hooks are those requiring CI image to be built
## ONLY ADD PREK HOOKS HERE THAT REQUIRE CI IMAGE
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,8 @@ function install_airflow_and_providers_from_docker_context_files(){
install_airflow_core_distribution=("apache-airflow-core==${AIRFLOW_VERSION}")
fi

# Find Provider/TaskSDK/CTL distributions in docker-context files
readarray -t airflow_distributions< <(python /scripts/docker/get_distribution_specs.py /docker-context-files/apache?airflow?{providers,task?sdk,airflowctl}*.{whl,tar.gz} 2>/dev/null || true)
# Find Provider/TaskSDK/CTL/Coordinator distributions in docker-context files
readarray -t airflow_distributions< <(python /scripts/docker/get_distribution_specs.py /docker-context-files/apache?airflow?{providers,task?sdk,airflowctl,coordinators?java}*.{whl,tar.gz} 2>/dev/null || true)
echo
echo "${COLOR_BLUE}Found provider distributions in docker-context-files folder: ${airflow_distributions[*]}${COLOR_RESET}"
echo
Expand Down
24 changes: 24 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,30 @@ function environment_initialization() {
export AIRFLOW__SCHEDULER__GO_WORKER=True
fi

if [[ ${JAVA_SDK=} == "true" ]]; then
echo
echo "${COLOR_BLUE}Setting up Java SDK${COLOR_RESET}"
echo

# Install Java and the Java language provider
bash /opt/airflow/scripts/in_container/java_sdk_setup.sh

# Set JAVA_HOME and PATH before Gradle build so gradlew can find java
export JAVA_HOME=/files/openjdk
export PATH=/files/openjdk/bin:${PATH}

# Build both Java SDK bundles (stub + pure Java)
export JAVA_SDK_SRC_DIR=/opt/airflow/java-sdk
export BUNDLES_OUTPUT_DIR=/files/java-sdk-bundles
bash /opt/airflow/scripts/in_container/java_sdk_build.sh

# Source the generated environment configuration
# shellcheck disable=SC1091
source /files/java-sdk-bundles/java_sdk_env.sh

echo "${COLOR_BLUE}Java SDK setup complete.${COLOR_RESET}"
fi

RUN_TESTS=${RUN_TESTS:="false"}
CI=${CI:="false"}

Expand Down
85 changes: 85 additions & 0 deletions airflow-e2e-tests/docker/java-sdk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Docker Compose override for Java SDK e2e tests.
#
# Overrides the dag-processor and worker services to:
# 1. Mount a pre-downloaded JDK, the Python stub bundle, and the pure Java bundle.
# 2. Set JAVA_HOME and PATH so the Java coordinator can find the JDK.
#
# The apache-airflow-coordinators-java distribution is expected to be
# pre-installed in the Docker image (baked in by `breeze prod-image build`
# via the workspace `uv sync --all-packages`).
#
# Environment variables expected (set by conftest.py):
# JAVA_OPENJDK_PATH – host path to the pre-downloaded OpenJDK directory
# JAVA_STUB_BUNDLE_PATH – host path to the stub DAG bundle root (dags/ + jar-bundles/)
# JAVA_BUNDLES_PATH – host path to the pure Java DAG bundle (JARs only)
---
services:
airflow-dag-processor:
user: "0:0"
environment:
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
JAVA_HOME: /files/openjdk
PATH: /files/openjdk/bin:/home/airflow/.local/bin:/usr/python/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
volumes:
- ${JAVA_OPENJDK_PATH}:/files/openjdk
- ${JAVA_STUB_BUNDLE_PATH}:/files/python-stub-java-tasks
- ${JAVA_BUNDLES_PATH}:/files/pure-java

airflow-worker:
user: "0:0"
environment:
<<: *java-env
volumes: *java-volumes

# Dedicated Celery worker that only consumes from "java-queue".
# Since this is a new service (not an override of the base docker-compose),
# it needs the full service definition including the common Airflow config
# that the base compose provides via x-airflow-common.
# AIRFLOW__SDK__COORDINATORS and AIRFLOW__SDK__QUEUE_TO_COORDINATOR are picked
# up from the .env file via env_file (set by conftest.py).
airflow-java-worker:
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:|version|}
env_file:
- ${ENV_FILE_PATH:-.env}
command: celery worker --queues java-queue
user: "0:0"
environment:
<<: *java-env
# Mirrors x-airflow-common-env from the base docker-compose.yaml
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
AIRFLOW__API_AUTH__JWT_SECRET: ${AIRFLOW__API_AUTH__JWT_SECRET:-airflow_jwt_secret}
AIRFLOW__API_AUTH__JWT_ISSUER: ${AIRFLOW__API_AUTH__JWT_ISSUER:-airflow}
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
volumes:
- ${JAVA_OPENJDK_PATH}:/files/openjdk
- ${JAVA_STUB_BUNDLE_PATH}:/files/python-stub-java-tasks
- ${JAVA_BUNDLES_PATH}:/files/pure-java
168 changes: 164 additions & 4 deletions airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import json
import os
import platform
import subprocess
import tempfile
from datetime import datetime
from pathlib import Path
from shutil import copyfile, copytree
Expand All @@ -27,13 +30,24 @@
from testcontainers.compose import DockerCompose

from airflow_e2e_tests.constants import (
AIRFLOW_ROOT_PATH,
AWS_INIT_PATH,
DOCKER_COMPOSE_HOST_PORT,
DOCKER_COMPOSE_PATH,
DOCKER_IMAGE,
E2E_DAGS_FOLDER,
E2E_TEST_MODE,
ELASTICSEARCH_PATH,
JAVA_CONTAINER_PURE_BUNDLE_PATH,
JAVA_CONTAINER_STUB_DAG_BUNDLE_PATH,
JAVA_CONTAINER_STUB_JAVA_BUNDLES_FOLDER_PATH,
JAVA_PURE_BUNDLE_NAME,
JAVA_PURE_DAG_ID,
JAVA_SDK_COMPOSE_PATH,
JAVA_SDK_DAGS_FOLDER,
JAVA_SDK_PATH,
JAVA_STUB_BUNDLE_NAME,
JAVA_STUB_DAG_ID,
LOCALSTACK_PATH,
LOGS_FOLDER,
OPENSEARCH_PATH,
Expand Down Expand Up @@ -142,8 +156,152 @@ def _setup_xcom_object_storage_integration(dot_env_file, tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)


def spin_up_airflow_environment(tmp_path_factory: pytest.TempPathFactory):
tmp_dir = tmp_path_factory.mktemp("airflow-e2e-tests")
def _download_jdk(dest_dir: Path) -> Path:
"""Download a Linux JDK matching the host/container architecture into dest_dir/openjdk."""
machine = platform.machine()
if machine in ("x86_64", "amd64"):
arch = "x64"
elif machine in ("aarch64", "arm64"):
arch = "aarch64"
else:
raise RuntimeError(f"Unsupported architecture: {machine}")

jdk_url = (
"https://github.com/adoptium/temurin11-binaries/releases/download/"
f"jdk-11.0.30%2B7/OpenJDK11U-jdk_{arch}_linux_hotspot_11.0.30_7.tar.gz"
)

tarball_path = dest_dir / "openjdk-11.tar.gz"
openjdk_dir = dest_dir / "openjdk"
openjdk_dir.mkdir(exist_ok=True)

console.print(f"[yellow]Downloading OpenJDK 11 ({arch}) for containers...")
result = subprocess.run(
["curl", "-fL", "-o", str(tarball_path), jdk_url],
capture_output=True,
text=True,
timeout=300,
check=False,
)
if result.returncode != 0:
raise RuntimeError(f"Failed to download OpenJDK: {result.stderr}")

console.print("[yellow]Extracting OpenJDK...")
result = subprocess.run(
["tar", "-xzf", str(tarball_path), "--strip-components=1", "-C", str(openjdk_dir)],
capture_output=True,
text=True,
timeout=120,
check=False,
)
if result.returncode != 0:
raise RuntimeError(f"Failed to extract OpenJDK: {result.stderr}")

tarball_path.unlink()
console.print("[green]OpenJDK 11 downloaded and extracted successfully")
return openjdk_dir


def _build_java_sdk_bundles(tmp_dir: Path) -> Path:
"""Build both Java SDK bundles (stub + pure Java) using the shared build script.

Returns the bundles output directory containing both bundle subdirectories.
The caller's system Java is used for the Gradle build (not the downloaded Linux JDK).
"""
build_script = AIRFLOW_ROOT_PATH / "scripts" / "in_container" / "java_sdk_build.sh"
bundles_dir = tmp_dir / "java-sdk-bundles"

build_env = {
**os.environ,
"JAVA_SDK_SRC_DIR": str(JAVA_SDK_PATH),
"BUNDLES_OUTPUT_DIR": str(bundles_dir),
"JAVA_STUB_BUNDLE_NAME": JAVA_STUB_BUNDLE_NAME,
"JAVA_PURE_BUNDLE_NAME": JAVA_PURE_BUNDLE_NAME,
"JAVA_STUB_DAG_ID": JAVA_STUB_DAG_ID,
"JAVA_PURE_DAG_ID": JAVA_PURE_DAG_ID,
"JAVA_SDK_DAGS_DIR": str(JAVA_SDK_DAGS_FOLDER),
# The build script defaults GRADLE_USER_HOME to /files/.gradle for breeze
# in-container caching; on the host we use the standard ~/.gradle so the
# Gradle wrapper can write its lock files.
"GRADLE_USER_HOME": os.environ.get("GRADLE_USER_HOME", str(Path.home() / ".gradle")),
}

console.print("[yellow]Building Java SDK bundles via shared build script...")
result = subprocess.run(
["bash", str(build_script)],
env=build_env,
capture_output=True,
text=True,
timeout=600,
check=False,
)
if result.returncode != 0:
console.print(f"[red]Java SDK build failed:\n{result.stdout}\n{result.stderr}")
raise RuntimeError("Failed to build Java SDK bundles")
console.print(result.stdout)

return bundles_dir


def _setup_java_sdk_integration(dot_env_file, tmp_dir):
"""Set up Java SDK integration: download JDK, build bundles, write env and compose override."""
# Download the Linux JDK on the host so containers get it via bind mount
openjdk_dir = _download_jdk(tmp_dir)

# Build both bundles (stub + pure Java) using the shared script
bundles_dir = _build_java_sdk_bundles(tmp_dir)
stub_bundle_root = bundles_dir / JAVA_STUB_BUNDLE_NAME
pure_bundle_dir = bundles_dir / JAVA_PURE_BUNDLE_NAME

# Copy the docker-compose override
copyfile(JAVA_SDK_COMPOSE_PATH, tmp_dir / "java-sdk.yml")

# Set host environment variables consumed by docker-compose variable substitution
os.environ["JAVA_OPENJDK_PATH"] = str(openjdk_dir)
os.environ["JAVA_STUB_BUNDLE_PATH"] = str(stub_bundle_root)
os.environ["JAVA_BUNDLES_PATH"] = str(pure_bundle_dir)

# Write .env file with bundle config for both DAG bundles
bundle_config = json.dumps(
[
{
"name": JAVA_STUB_BUNDLE_NAME,
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": JAVA_CONTAINER_STUB_DAG_BUNDLE_PATH, "refresh_interval": 20},
},
{
"name": JAVA_PURE_BUNDLE_NAME,
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": JAVA_CONTAINER_PURE_BUNDLE_PATH, "refresh_interval": 20},
},
]
)
coordinators = json.dumps(
[
{
"name": "java",
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"bundles_folder": JAVA_CONTAINER_STUB_JAVA_BUNDLES_FOLDER_PATH},
}
]
)
queue_to_coordinator = json.dumps({"java-queue": "java"})

dot_env_file.write_text(
f"AIRFLOW_UID={os.getuid()}\n"
"AIRFLOW__CORE__LOAD_EXAMPLES=false\n"
"AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG\n"
f"AIRFLOW__SDK__COORDINATORS={coordinators}\n"
f"AIRFLOW__SDK__QUEUE_TO_COORDINATOR={queue_to_coordinator}\n"
f"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST={bundle_config}\n"
)
os.environ["ENV_FILE_PATH"] = str(dot_env_file)


def spin_up_airflow_environment():
# We indent to use explicitly created temp directory instead of pytest's tmp_path fixture because we want the directory to persist after the test run for debugging purposes and pytest's tmp_path is automatically deleted after the test run.
tmp_dir = Path(tempfile.mkdtemp(prefix="airflow-e2e-tests-"))
console.print(f"[yellow]Temp directory (persists after test run): {tmp_dir}")

console.print(f"[yellow]Using docker compose file: {DOCKER_COMPOSE_PATH}")
copyfile(DOCKER_COMPOSE_PATH, tmp_dir / "docker-compose.yaml")
Expand Down Expand Up @@ -180,6 +338,9 @@ def spin_up_airflow_environment(tmp_path_factory: pytest.TempPathFactory):
elif E2E_TEST_MODE == "xcom_object_storage":
compose_file_names.append("localstack.yml")
_setup_xcom_object_storage_integration(dot_env_file, tmp_dir)
elif E2E_TEST_MODE == "java_sdk":
compose_file_names.append("java-sdk.yml")
_setup_java_sdk_integration(dot_env_file, tmp_dir)

#
# Please Do not use this Fernet key in any deployments! Please generate your own key.
Expand Down Expand Up @@ -224,8 +385,7 @@ def _print_logs(compose_instance: DockerCompose):


def pytest_sessionstart(session: pytest.Session):
tmp_path_factory = session.config._tmp_path_factory
spin_up_airflow_environment(tmp_path_factory)
spin_up_airflow_environment()

console.print("[green]Airflow environment is up and running!")

Expand Down
21 changes: 21 additions & 0 deletions airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,24 @@

# s3 bucket name for XComObjectStorageBackend tests. This bucket will be created in the `init-aws.sh` script that is run as part of the LocalStack container initialization.
XCOM_BUCKET = "test-xcom-objectstorage-backend"

# Java SDK e2e test paths
JAVA_SDK_PATH = AIRFLOW_ROOT_PATH / "java-sdk"
JAVA_SDK_COMPOSE_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / "java-sdk.yml"
JAVA_SDK_DAGS_FOLDER = (
AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "tests" / "airflow_e2e_tests" / "java_sdk_tests" / "dags"
)
# Java SDK bundle names (used in DAG bundle config and docker-compose env vars)
JAVA_STUB_BUNDLE_NAME = "python-stub-java-tasks"
JAVA_PURE_BUNDLE_NAME = "pure-java"

# Java SDK DAG IDs (sed-replaced from the original "java_example" in JavaExample.java / stub_dag.py)
JAVA_STUB_DAG_ID = "java_sdk_stub_example"
JAVA_PURE_DAG_ID = "java_sdk_pure_java_example"

# In-container paths for Java SDK volumes (mounted by java-sdk.yml)
JAVA_CONTAINER_OPENJDK_PATH = "/files/openjdk"
JAVA_CONTAINER_STUB_BUNDLE_PATH = f"/files/{JAVA_STUB_BUNDLE_NAME}"
JAVA_CONTAINER_STUB_DAG_BUNDLE_PATH = f"/files/{JAVA_STUB_BUNDLE_NAME}/dags"
JAVA_CONTAINER_STUB_JAVA_BUNDLES_FOLDER_PATH = f"/files/{JAVA_STUB_BUNDLE_NAME}/jar-bundles"
JAVA_CONTAINER_PURE_BUNDLE_PATH = f"/files/{JAVA_PURE_BUNDLE_NAME}"
Loading