diff --git a/skills/docs/dev/TESTING.md b/skills/docs/dev/TESTING.md new file mode 100644 index 00000000000..ed108e9ccac --- /dev/null +++ b/skills/docs/dev/TESTING.md @@ -0,0 +1,35 @@ +# Testing + +The commands assume you are in the `skills/` directory. + +## Setup + +Set up a local dev environment: + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -e ".[dev]" +``` + +## Fast Tests + +Run the fast tests: + +```bash +pytest -m "not slow" +``` + +These are generally lightweight skill validation tests, such as verifying skill frontmatter. + +## Integration Tests + +Run the integration tests: + +```bash +pytest -m slow -s +``` + +These tests deterministically fill in the template project from `skills/udf-gen-test/templates/` with fixture implementations, then actually compile and run Spark tests and benchmark scripts locally. + +Thus they require JDK, Maven and Maven repository access, a GPU environment, and (for `cuda` tests) CMake and CUDA toolkit. diff --git a/skills/pyproject.toml b/skills/pyproject.toml new file mode 100644 index 00000000000..8501cdad3d1 --- /dev/null +++ b/skills/pyproject.toml @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "aether-agent" +version = "0.1.0" +description = "Convert Spark UDFs into GPU implementations" +authors = [ + {name = "Rishi Chandra", email = "rishic@nvidia.com"} +] +readme = "README.md" +requires-python = ">=3.10" +classifiers = [ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent", +] + +[project.optional-dependencies] +dev = [ + "pytest==8.4.1", + "PyYAML==6.0.3", + "isort==6.0.1", + "black==25.1.0", + "ruff==0.12.8", +] + +[tool.setuptools] +packages = [] + +[tool.pyright] +typeCheckingMode = "standard" + +[tool.pytest.ini_options] +markers = ["slow: integration tests"] diff --git a/skills/tests/__init__.py b/skills/tests/__init__.py new file mode 100644 index 00000000000..030f4598fd0 --- /dev/null +++ b/skills/tests/__init__.py @@ -0,0 +1,6 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Tests for the skills templates. +""" diff --git a/skills/tests/fixtures.py b/skills/tests/fixtures.py new file mode 100644 index 00000000000..3db312348c6 --- /dev/null +++ b/skills/tests/fixtures.py @@ -0,0 +1,224 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Source fixtures for the JVM template integration tests. +""" + +from pathlib import Path + + +def _read_resource(name: str) -> str: + return (Path(__file__).parent / "resources" / name).read_text(encoding="utf-8") + + +# --------------------------------------------------------------------------- +# UDF source code +# --------------------------------------------------------------------------- + +CPU_UDF_NAME = "IntegerMultiplyBy2UDF" +RAPIDS_UDF_NAME = "IntegerMultiplyBy2RapidsUDF" +NATIVE_UDF_NAME = "IntegerMultiplyBy2NativeRapidsUDF" + +SCALA_UDF_SOURCE = _read_resource(f"{CPU_UDF_NAME}.scala") +JAVA_UDF_SOURCE = _read_resource(f"{CPU_UDF_NAME}.java") +SCALA_RAPIDS_UDF_SOURCE = _read_resource(f"{RAPIDS_UDF_NAME}.scala") +JAVA_RAPIDS_UDF_SOURCE = _read_resource(f"{RAPIDS_UDF_NAME}.java") +NATIVE_RAPIDS_UDF_SOURCE = _read_resource(f"{NATIVE_UDF_NAME}.java") +SQL_SOURCE = _read_resource("integer_multiply_by_2.sql") +JNI_SOURCE = _read_resource("IntegerMultiplyBy2Jni.cpp") +CUDA_SOURCE = _read_resource("integer_multiply_by_2.cu") +HEADER_SOURCE = _read_resource("integer_multiply_by_2.hpp") + +# --------------------------------------------------------------------------- +# Unit test methods +# --------------------------------------------------------------------------- + +CREATE_TEST_DATA = """\ + def createTestData(spark: SparkSession): DataFrame = { + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("value", IntegerType, nullable = true) + )) + val testData = Seq( + Row(1, 123), + Row(2, 0), + Row(3, -5), + Row(4, null) + ) + spark.createDataFrame(spark.sparkContext.parallelize(testData), schema) + }""" + +EXECUTE_UDF = """\ + def executeUDF(spark: SparkSession, udfName: String, testDF: DataFrame): DataFrame = { + testDF.createOrReplaceTempView("test_table") + spark.sql(s"SELECT *, $udfName(value) AS result FROM test_table") + }""" + +ASSERT_UDF_RESULTS = """\ + def assertUDFResults(resultDF: DataFrame, testDF: DataFrame): Unit = { + val results = resultDF.collect().sortBy(_.getAs[Int]("id")) + assert(results(0).getAs[Int]("result") === 246) + assert(results(1).getAs[Int]("result") === 0) + assert(results(2).getAs[Int]("result") === -10) + assert(results(3).isNullAt(results(3).fieldIndex("result"))) + }""" + +_SCALA_REGISTER_CALL = "spark.udf.register({name}, new com.udf.{cls}())" +_JAVA_REGISTER_CALL = "spark.udf.register({name}, new com.udf.{cls}(), org.apache.spark.sql.types.IntegerType)" + + +_REGISTER_METHOD = """\ + def {method}(spark: SparkSession, udfName: String): Unit = {{ + {register_call} + }}""" + + +SCALA_REGISTER_UDF = _REGISTER_METHOD.format( + method="registerUDF", + register_call=_SCALA_REGISTER_CALL.format(name="udfName", cls=CPU_UDF_NAME), +) +JAVA_REGISTER_UDF = _REGISTER_METHOD.format( + method="registerUDF", + register_call=_JAVA_REGISTER_CALL.format(name="udfName", cls=CPU_UDF_NAME), +) + +SCALA_REGISTER_RAPIDS_UDF = _REGISTER_METHOD.format( + method="registerRapidsUDF", + register_call=_SCALA_REGISTER_CALL.format(name="udfName", cls=RAPIDS_UDF_NAME), +) +JAVA_REGISTER_RAPIDS_UDF = _REGISTER_METHOD.format( + method="registerRapidsUDF", + register_call=_JAVA_REGISTER_CALL.format(name="udfName", cls=RAPIDS_UDF_NAME), +) +NATIVE_REGISTER_RAPIDS_UDF = _REGISTER_METHOD.format( + method="registerRapidsUDF", + register_call=_JAVA_REGISTER_CALL.format(name="udfName", cls=NATIVE_UDF_NAME), +) + +# --------------------------------------------------------------------------- +# BenchUtils methods +# --------------------------------------------------------------------------- + +BENCH_GENERATE = """\ + def generateSyntheticData( + spark: SparkSession, + numRows: Long, + numPartitions: Int + ): DataFrame = { + val baseDF = spark.range(0, numRows, 1, numPartitions) + baseDF.select( + col("id"), + (rand() * 1000).cast(IntegerType).alias("value") + ) + }""" + + +_BENCH_EXECUTE_METHOD = """\ + def {method}(spark: SparkSession, df: DataFrame): DataFrame = {{ + df.createOrReplaceTempView("bench_table") + {register} + spark.sql("SELECT *, udf(value) AS result FROM bench_table") + }}""" + + +BENCH_EXECUTE_SCALA_CPU = _BENCH_EXECUTE_METHOD.format( + method="executeCpu", + register=_SCALA_REGISTER_CALL.format(name='"udf"', cls=CPU_UDF_NAME), +) +BENCH_EXECUTE_JAVA_CPU = _BENCH_EXECUTE_METHOD.format( + method="executeCpu", + register=_JAVA_REGISTER_CALL.format(name='"udf"', cls=CPU_UDF_NAME), +) +BENCH_EXECUTE_SCALA_CUDF = _BENCH_EXECUTE_METHOD.format( + method="executeGpu", + register=_SCALA_REGISTER_CALL.format(name='"udf"', cls=RAPIDS_UDF_NAME), +) +BENCH_EXECUTE_JAVA_CUDF = _BENCH_EXECUTE_METHOD.format( + method="executeGpu", + register=_JAVA_REGISTER_CALL.format(name='"udf"', cls=RAPIDS_UDF_NAME), +) +BENCH_EXECUTE_CUDA = _BENCH_EXECUTE_METHOD.format( + method="executeGpu", + register=_JAVA_REGISTER_CALL.format(name='"udf"', cls=NATIVE_UDF_NAME), +) + +BENCH_EXECUTE_SQL = """\ + def executeGpu(spark: SparkSession, df: DataFrame): DataFrame = { + df.createOrReplaceTempView("bench_table") + val sqlSource = scala.io.Source.fromFile("src/main/resources/integer_multiply_by_2.sql") + val sqlContent = try sqlSource.mkString finally sqlSource.close() + val benchSql = sqlContent.replace("test_table", "bench_table") + spark.sql(benchSql) + }""" + +# --------------------------------------------------------------------------- +# MicroBenchRunner methods +# --------------------------------------------------------------------------- + +MICRO_PREPARE_CPU = """\ + def prepareCpuData( + hostColumns: Array[HostColumnVector], + numRows: Int + ): Array[AnyRef] = { + val values = Array.tabulate(numRows) { i => + if (hostColumns(1).isNull(i)) null + else Int.box(hostColumns(1).getInt(i)) + } + Array[AnyRef](values) + }""" + + +_MICRO_EXECUTE_CPU_METHOD = """\ + def executeCpu(data: Array[AnyRef], numRows: Int): Unit = {{ + val values = data(0).asInstanceOf[Array[Integer]] + val udf = new com.udf.{cls}() + var i = 0 + while (i < numRows) {{ + udf.{invoke}(values(i)) + i += 1 + }} + }}""" + +MICRO_EXECUTE_SCALA_CPU = _MICRO_EXECUTE_CPU_METHOD.format( + cls=CPU_UDF_NAME, + invoke="apply", +) +MICRO_EXECUTE_JAVA_CPU = _MICRO_EXECUTE_CPU_METHOD.format( + cls=CPU_UDF_NAME, + invoke="call", +) + + +_MICRO_EXECUTE_GPU_METHOD = """\ + def executeGpu(table: Table, numRows: Int): Unit = {{ + val udf = new com.udf.{cls}() + withResource(udf.evaluateColumnar(numRows, table.getColumn(1))) {{ _ => }} + }}""" + +MICRO_EXECUTE_CUDF = _MICRO_EXECUTE_GPU_METHOD.format(cls=RAPIDS_UDF_NAME) +MICRO_EXECUTE_CUDA = _MICRO_EXECUTE_GPU_METHOD.format(cls=NATIVE_UDF_NAME) + +# --------------------------------------------------------------------------- +# Native source paths +# --------------------------------------------------------------------------- + +CMAKE_SOURCE_FILES = """\ +set(SOURCE_FILES + "src/IntegerMultiplyBy2Jni.cpp" + "src/integer_multiply_by_2.cu" +) +""" + +NATIVE_PLACEHOLDER_FILES = ( + "src/main/java/com/udf/PlaceholderUDFNameNativeRapidsUDF.java", + "native/src/main/cpp/src/PlaceholderUDFNameJni.cpp", + "native/src/main/cpp/src/placeholder_udf_name.cu", + "native/src/main/cpp/src/placeholder_udf_name.hpp", +) + +NATIVE_SOURCE_FILES = { + "native/src/main/cpp/src/IntegerMultiplyBy2Jni.cpp": JNI_SOURCE, + "native/src/main/cpp/src/integer_multiply_by_2.cu": CUDA_SOURCE, + "native/src/main/cpp/src/integer_multiply_by_2.hpp": HEADER_SOURCE, +} diff --git a/skills/tests/resources/IntegerMultiplyBy2Jni.cpp b/skills/tests/resources/IntegerMultiplyBy2Jni.cpp new file mode 100644 index 00000000000..aa7bb147147 --- /dev/null +++ b/skills/tests/resources/IntegerMultiplyBy2Jni.cpp @@ -0,0 +1,63 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "integer_multiply_by_2.hpp" + +#include +#include +#include + +#include + +#include +#include + +namespace { + +constexpr char const* RUNTIME_ERROR_CLASS = "java/lang/RuntimeException"; +constexpr char const* ILLEGAL_ARG_CLASS = "java/lang/IllegalArgumentException"; + +void throw_java_exception(JNIEnv* env, char const* class_name, char const* message) +{ + jclass ex_class = env->FindClass(class_name); + if (ex_class != nullptr) { + env->ThrowNew(ex_class, message); + } +} + +} // namespace + +extern "C" { + +JNIEXPORT jlong JNICALL +Java_com_udf_IntegerMultiplyBy2NativeRapidsUDF_integerMultiplyBy2(JNIEnv* env, + jclass, + jlong input_view) +{ + try { + auto input = reinterpret_cast(input_view); + if (input == nullptr) { + throw_java_exception(env, ILLEGAL_ARG_CLASS, "input column view is null"); + return 0; + } + if (input->type().id() != cudf::type_id::INT32) { + throw_java_exception(env, ILLEGAL_ARG_CLASS, "input must be INT32"); + return 0; + } + + std::unique_ptr result = integer_multiply_by_2(*input); + return reinterpret_cast(result.release()); + } catch (std::bad_alloc const& e) { + auto message = std::string("Unable to allocate native memory: ") + e.what(); + throw_java_exception(env, RUNTIME_ERROR_CLASS, message.c_str()); + } catch (std::invalid_argument const& e) { + throw_java_exception(env, ILLEGAL_ARG_CLASS, e.what()); + } catch (std::exception const& e) { + throw_java_exception(env, RUNTIME_ERROR_CLASS, e.what()); + } + return 0; +} + +} diff --git a/skills/tests/resources/IntegerMultiplyBy2NativeRapidsUDF.java b/skills/tests/resources/IntegerMultiplyBy2NativeRapidsUDF.java new file mode 100644 index 00000000000..2768889b941 --- /dev/null +++ b/skills/tests/resources/IntegerMultiplyBy2NativeRapidsUDF.java @@ -0,0 +1,27 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.udf; + +import ai.rapids.cudf.ColumnVector; +import com.nvidia.spark.RapidsUDF; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.spark.sql.api.java.UDF1; + +public class IntegerMultiplyBy2NativeRapidsUDF extends UDF + implements UDF1, RapidsUDF { + @Override + public Integer call(Integer value) { + return value == null ? null : value * 2; + } + + @Override + public ColumnVector evaluateColumnar(int numRows, ColumnVector... args) { + NativeUDFLoader.ensureLoaded(); + return new ColumnVector(integerMultiplyBy2(args[0].getNativeView())); + } + + private static native long integerMultiplyBy2(long inputView); +} diff --git a/skills/tests/resources/IntegerMultiplyBy2RapidsUDF.java b/skills/tests/resources/IntegerMultiplyBy2RapidsUDF.java new file mode 100644 index 00000000000..b35b43b4c20 --- /dev/null +++ b/skills/tests/resources/IntegerMultiplyBy2RapidsUDF.java @@ -0,0 +1,24 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.udf; + +import org.apache.spark.sql.api.java.UDF1; +import ai.rapids.cudf.*; +import com.nvidia.spark.RapidsUDF; + +public class IntegerMultiplyBy2RapidsUDF implements UDF1, RapidsUDF { + @Override + public Integer call(Integer value) { + return value == null ? null : value * 2; + } + + @Override + public ColumnVector evaluateColumnar(int numRows, ColumnVector... args) { + try (Scalar two = Scalar.fromInt(2)) { + return args[0].mul(two); + } + } +} diff --git a/skills/tests/resources/IntegerMultiplyBy2RapidsUDF.scala b/skills/tests/resources/IntegerMultiplyBy2RapidsUDF.scala new file mode 100644 index 00000000000..bbfff504008 --- /dev/null +++ b/skills/tests/resources/IntegerMultiplyBy2RapidsUDF.scala @@ -0,0 +1,22 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.udf + +import ai.rapids.cudf._ +import com.nvidia.spark.RapidsUDF +import Arm.withResource + +class IntegerMultiplyBy2RapidsUDF extends Function1[Integer, Integer] with Serializable with RapidsUDF { + override def apply(value: Integer): Integer = { + if (value == null) null else value * 2 + } + + override def evaluateColumnar(numRows: Int, args: ColumnVector*): ColumnVector = { + withResource(Scalar.fromInt(2)) { two => + args.head.mul(two) + } + } +} diff --git a/skills/tests/resources/IntegerMultiplyBy2UDF.java b/skills/tests/resources/IntegerMultiplyBy2UDF.java new file mode 100644 index 00000000000..076a6c4bab4 --- /dev/null +++ b/skills/tests/resources/IntegerMultiplyBy2UDF.java @@ -0,0 +1,15 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.udf; + +import org.apache.spark.sql.api.java.UDF1; + +public class IntegerMultiplyBy2UDF implements UDF1 { + @Override + public Integer call(Integer value) { + return value == null ? null : value * 2; + } +} diff --git a/skills/tests/resources/IntegerMultiplyBy2UDF.scala b/skills/tests/resources/IntegerMultiplyBy2UDF.scala new file mode 100644 index 00000000000..b0f97409986 --- /dev/null +++ b/skills/tests/resources/IntegerMultiplyBy2UDF.scala @@ -0,0 +1,12 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.udf + +class IntegerMultiplyBy2UDF extends Function1[Integer, Integer] with Serializable { + override def apply(value: Integer): Integer = { + if (value == null) null else value * 2 + } +} diff --git a/skills/tests/resources/integer_multiply_by_2.cu b/skills/tests/resources/integer_multiply_by_2.cu new file mode 100644 index 00000000000..7dfa0e200aa --- /dev/null +++ b/skills/tests/resources/integer_multiply_by_2.cu @@ -0,0 +1,56 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "integer_multiply_by_2.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +namespace { + +__global__ void multiply_by_2_kernel(int32_t const* input, int32_t* output, cudf::size_type size) +{ + auto const idx = static_cast(blockIdx.x * blockDim.x + threadIdx.x); + if (idx < size) { + output[idx] = input[idx] * 2; + } +} + +} // namespace + +std::unique_ptr integer_multiply_by_2( + cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (input.type().id() != cudf::type_id::INT32) { + throw std::invalid_argument("input must be INT32"); + } + + auto const row_count = input.size(); + auto null_mask = cudf::copy_bitmask(input, stream, mr); + auto result = cudf::make_numeric_column( + input.type(), row_count, std::move(null_mask), input.null_count(), stream, mr); + + if (row_count > 0) { + constexpr int threads_per_block = 256; + int const blocks = (row_count + threads_per_block - 1) / threads_per_block; + multiply_by_2_kernel<<>>( + input.data(), result->mutable_view().data(), row_count); + CUDF_CHECK_CUDA(stream.value()); + } + + return result; +} diff --git a/skills/tests/resources/integer_multiply_by_2.hpp b/skills/tests/resources/integer_multiply_by_2.hpp new file mode 100644 index 00000000000..6683400be78 --- /dev/null +++ b/skills/tests/resources/integer_multiply_by_2.hpp @@ -0,0 +1,21 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include + +#include + +std::unique_ptr integer_multiply_by_2( + cudf::column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); diff --git a/skills/tests/resources/integer_multiply_by_2.sql b/skills/tests/resources/integer_multiply_by_2.sql new file mode 100644 index 00000000000..5f5f2d18607 --- /dev/null +++ b/skills/tests/resources/integer_multiply_by_2.sql @@ -0,0 +1,6 @@ +-- SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 + +SELECT *, + value * 2 AS result +FROM test_table diff --git a/skills/tests/test_frontmatter.py b/skills/tests/test_frontmatter.py new file mode 100644 index 00000000000..e5062b176f0 --- /dev/null +++ b/skills/tests/test_frontmatter.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Tests for skill frontmatter headers. +""" + +from pathlib import Path + +import pytest +import yaml + +SKILLS_DIR = Path(__file__).resolve().parents[1] +SKILL_FILES = sorted(SKILLS_DIR.glob("*/SKILL.md")) + + +def _read_frontmatter(path: Path) -> str: + """Return the YAML frontmatter body from a SKILL.md file.""" + lines = path.read_text(encoding="utf-8").splitlines() + if not lines or lines[0] != "---": + raise ValueError(f"{path} must start with YAML frontmatter") + + try: + end = lines.index("---", 1) + except ValueError as e: + raise ValueError(f"{path} must close YAML frontmatter with ---") from e + + return "\n".join(lines[1:end]) + + +@pytest.mark.parametrize("skill_file", SKILL_FILES, ids=lambda p: p.parent.name) +def test_skill_frontmatter_loads(skill_file: Path) -> None: + frontmatter = _read_frontmatter(skill_file) + parsed = yaml.safe_load(frontmatter) + + assert isinstance(parsed, dict), f"{skill_file} frontmatter must parse to a map" + assert isinstance(parsed.get("name"), str), f"{skill_file} must define name" + assert isinstance( + parsed.get("description"), str + ), f"{skill_file} must define description" diff --git a/skills/tests/test_jvm_templates.py b/skills/tests/test_jvm_templates.py new file mode 100644 index 00000000000..87d5b5e9a8b --- /dev/null +++ b/skills/tests/test_jvm_templates.py @@ -0,0 +1,522 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Integration tests for the JVM skill templates. +""" + +import os +import shutil +import stat +import tempfile +from pathlib import Path + +import pytest + +from . import fixtures as fx +from .utils import replace_scala_todo_method, run_mvn, run_script + +pytestmark = pytest.mark.slow + +SKILLS_DIR = Path(__file__).resolve().parents[1] +TEMPLATES_DIR = SKILLS_DIR / "udf-gen-test" / "templates" / "scala" +CUDA_TEMPLATES_DIR = SKILLS_DIR / "udf-convert-to-cuda" / "templates" / "cuda" + +LANGS = ["scala", "java"] +TARGETS = ["cudf", "sql", "cuda"] +LANG_TARGETS = [(lang, target) for lang in LANGS for target in TARGETS] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _unit_test_methods(lang: str) -> dict[str, str]: + return { + "createTestData": fx.CREATE_TEST_DATA, + "registerUDF": ( + fx.SCALA_REGISTER_UDF if lang == "scala" else fx.JAVA_REGISTER_UDF + ), + "executeUDF": fx.EXECUTE_UDF, + "assertUDFResults": fx.ASSERT_UDF_RESULTS, + } + + +def _comparison_test_methods(lang: str, target: str) -> dict[str, str]: + match (lang, target): + case ("scala", "cudf"): + return {"registerRapidsUDF": fx.SCALA_REGISTER_RAPIDS_UDF} + case ("java", "cudf"): + return {"registerRapidsUDF": fx.JAVA_REGISTER_RAPIDS_UDF} + case (_, "cuda"): + return {"registerRapidsUDF": fx.NATIVE_REGISTER_RAPIDS_UDF} + case _: # sql (no additional method) + return {} + + +def _bench_utils_methods(lang: str, target: str) -> dict[str, str]: + methods = { + "generateSyntheticData": fx.BENCH_GENERATE, + "executeCpu": ( + fx.BENCH_EXECUTE_SCALA_CPU if lang == "scala" else fx.BENCH_EXECUTE_JAVA_CPU + ), + } + match (lang, target): + case ("scala", "cudf"): + methods["executeGpu"] = fx.BENCH_EXECUTE_SCALA_CUDF + case ("java", "cudf"): + methods["executeGpu"] = fx.BENCH_EXECUTE_JAVA_CUDF + case (_, "cuda"): + methods["executeGpu"] = fx.BENCH_EXECUTE_CUDA + case _: # sql + methods["executeGpu"] = fx.BENCH_EXECUTE_SQL + return methods + + +def _micro_bench_methods(lang: str, target: str) -> dict[str, str]: + return { + "prepareCpuData": fx.MICRO_PREPARE_CPU, + "executeCpu": ( + fx.MICRO_EXECUTE_SCALA_CPU if lang == "scala" else fx.MICRO_EXECUTE_JAVA_CPU + ), + # microbenchmarks are not applicable to sql + "executeGpu": ( + fx.MICRO_EXECUTE_CUDA if target == "cuda" else fx.MICRO_EXECUTE_CUDF + ), + } + + +def _build_project_dir() -> tuple[str, str]: + """Copy export directory to a temp directory and resolve pom.xml.""" + tmp_dir = tempfile.mkdtemp(prefix="test_jvm_") + project_dir = os.path.join(tmp_dir, "project") + shutil.copytree(str(TEMPLATES_DIR), project_dir) + return tmp_dir, project_dir + + +def _copy_cuda_templates(project_dir: str): + """Copy CUDA add-on templates and replace placeholders with fixture sources.""" + shutil.copytree(str(CUDA_TEMPLATES_DIR), project_dir, dirs_exist_ok=True) + + extract_script = Path(project_dir) / "native" / "scripts" / "extract-cudf-libs.sh" + extract_script.chmod( + extract_script.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH + ) + + project_path = Path(project_dir) + for rel_path in fx.NATIVE_PLACEHOLDER_FILES: + (project_path / rel_path).unlink(missing_ok=True) + + for rel_path, source in fx.NATIVE_SOURCE_FILES.items(): + path = project_path / rel_path + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(source) + + wrapper = ( + project_path + / "src" + / "main" + / "java" + / "com" + / "udf" + / f"{fx.NATIVE_UDF_NAME}.java" + ) + wrapper.parent.mkdir(parents=True, exist_ok=True) + wrapper.write_text(fx.NATIVE_RAPIDS_UDF_SOURCE) + + cmake_path = project_path / "native" / "src" / "main" / "cpp" / "CMakeLists.txt" + cmake = cmake_path.read_text() + cmake = cmake.replace( + """\ +set(SOURCE_FILES + "src/PlaceholderUDFNameJni.cpp" + "src/placeholder_udf_name.cu" +) +""", + fx.CMAKE_SOURCE_FILES, + ) + cmake_path.write_text(cmake) + + +def _replace_stubs(path: str, methods: dict[str, str]): + """Replace each named TODO stub in a Scala source file with its implementation.""" + with open(path, "r") as f: + source = f.read() + for method_name, impl in methods.items(): + source = replace_scala_todo_method(source, method_name, impl) + with open(path, "w") as f: + f.write(source) + + +def _fill_stubs(project_dir: str, lang: str, target: str): + """Write UDF sources and fill in all TODO stubs for the (language, target).""" + if lang == "scala": + ext = ".scala" + udf_source = fx.SCALA_UDF_SOURCE + rapids_udf_source = fx.SCALA_RAPIDS_UDF_SOURCE + else: + ext = ".java" + udf_source = fx.JAVA_UDF_SOURCE + rapids_udf_source = fx.JAVA_RAPIDS_UDF_SOURCE + + udf_dir = os.path.join(project_dir, "src", "main", lang, "com", "udf") + bench_dir = os.path.join(project_dir, "src", "main", "scala", "com", "udf", "bench") + test_dir = os.path.join(project_dir, "src", "test", "scala", "com", "udf") + os.makedirs(udf_dir, exist_ok=True) + + # Write CPU UDF source. + with open(os.path.join(udf_dir, f"IntegerMultiplyBy2UDF{ext}"), "w") as f: + f.write(udf_source) + + # Fill in unit test stubs. + _replace_stubs(os.path.join(test_dir, "UnitTest.scala"), _unit_test_methods(lang)) + + if target == "cudf": + # Write RapidsUDF + with open(os.path.join(udf_dir, f"IntegerMultiplyBy2RapidsUDF{ext}"), "w") as f: + f.write(rapids_udf_source) + + # Fill in comparison test stubs. + _replace_stubs( + os.path.join(test_dir, "CudfComparisonTest.scala"), + _comparison_test_methods(lang, "cudf"), + ) + # Fill in MicroBenchRunner stubs. + _replace_stubs( + os.path.join(bench_dir, "MicroBenchRunner.scala"), + _micro_bench_methods(lang, target), + ) + elif target == "cuda": + # Writes the Java wrapper and C++ sources + _copy_cuda_templates(project_dir) + # Fill in comparison test stubs. + _replace_stubs( + os.path.join(test_dir, "CudfComparisonTest.scala"), + _comparison_test_methods(lang, "cuda"), + ) + # Fill in MicroBenchRunner stubs. + _replace_stubs( + os.path.join(bench_dir, "MicroBenchRunner.scala"), + _micro_bench_methods(lang, target), + ) + else: # sql + # Write SQL file + resources_dir = os.path.join(project_dir, "src", "main", "resources") + os.makedirs(resources_dir, exist_ok=True) + with open(os.path.join(resources_dir, "integer_multiply_by_2.sql"), "w") as f: + f.write(fx.SQL_SOURCE) + + # Point the comparison test at the SQL file / registered name + sql_test_path = os.path.join(test_dir, "SqlComparisonTest.scala") + with open(sql_test_path, "r") as f: + content = f.read() + with open(sql_test_path, "w") as f: + f.write(content.replace("placeholder_udf_name", "integer_multiply_by_2")) + + # Fill in BenchUtils stubs. + _replace_stubs( + os.path.join(bench_dir, "BenchUtils.scala"), + _bench_utils_methods(lang, target), + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def project_dir(): + """Clean copy of the export template with resolved pom.xml (stubs not filled).""" + tmp_dir, proj = _build_project_dir() + try: + yield proj + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture( + scope="module", params=LANG_TARGETS, ids=lambda p: f"{p[0]}-{p[1]}" +) # 'language-target' +def project_with_fixtures(request): + """Clean copy with all stubs filled in, per (language, target).""" + lang, target = request.param + tmp_dir, proj = _build_project_dir() + try: + _fill_stubs(proj, lang, target) + yield (proj, lang, target) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture(scope="class", params=TARGETS, ids=lambda t: t) +def project_with_broken_gpu(request): + """Project with deliberately broken GPU implementation.""" + target = request.param + lang = "scala" + tmp_dir, proj = _build_project_dir() + + def _break_gpu_source(source: str) -> str: + # Change multiplier from 2 to 3 + return source.replace("fromInt(2)", "fromInt(3)").replace("* 2", "* 3") + + def _insert_memory_leak(source: str) -> str: + # Inject an unclosed Scalar. + idx = source.index("evaluateColumnar") + brace = source.index("{", idx) + return ( + source[: brace + 1] + '\nScalar.fromString("LEAKED");' + source[brace + 1 :] + ) + + try: + _fill_stubs(proj, lang, target) + + ext = ".scala" if lang == "scala" else ".java" + if target == "cudf": + path = os.path.join( + proj, + "src", + "main", + lang, + "com", + "udf", + f"IntegerMultiplyBy2RapidsUDF{ext}", + ) + elif target == "cuda": + path = os.path.join( + proj, "native", "src", "main", "cpp", "src", "integer_multiply_by_2.cu" + ) + else: + path = os.path.join( + proj, "src", "main", "resources", "integer_multiply_by_2.sql" + ) + + with open(path, "r") as f: + content = f.read() + broken = _break_gpu_source(content) + if target == "cudf": + broken = _insert_memory_leak(broken) + with open(path, "w") as f: + f.write(broken) + + yield (proj, target) + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + +@pytest.fixture(scope="class") +def project_with_broken_schema(): + """Project with wrong column name in generateSyntheticData.""" + tmp_dir, proj = _build_project_dir() + try: + _fill_stubs(proj, "scala", "cudf") + bench_path = os.path.join( + proj, "src", "main", "scala", "com", "udf", "bench", "BenchUtils.scala" + ) + + # Overwrite with broken source + with open(bench_path, "r") as f: + source = f.read() + with open(bench_path, "w") as f: + f.write(source.replace('.alias("value")', '.alias("wrong_column")')) + + yield proj + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestCompilation: + """Verify the export directory compiles.""" + + def test_compile_smoke(self, project_dir): + """Compile the project as-is with TODO stubs to catch basic compile errors.""" + result = run_mvn(project_dir, "clean", "compile") + assert result.returncode == 0, "smoke compile failed" + + def test_compile_with_fixtures(self, project_with_fixtures): + """Compile after writing UDF sources and filling stubs.""" + proj, _, _target = project_with_fixtures + # test-compile compiles both main and test sources + result = run_mvn(proj, "clean", "test-compile") + assert result.returncode == 0, "compile with fixtures failed" + + +class TestComparisonTest: + """Run the comparison test suite.""" + + def test_run_comparison_test(self, project_with_fixtures): + """Execute the comparison test (CudfComparisonTest or SqlComparisonTest).""" + proj, _, target = project_with_fixtures + suite = ( + "com.udf.SqlComparisonTest" + if target == "sql" + else "com.udf.CudfComparisonTest" + ) + result = run_mvn( + proj, + "test", + extra_args=[ + *(["-Pcuda-native-udf"] if target == "cuda" else []), + f"-Dsuites={suite}", + ], + ) + assert result.returncode == 0, f"{suite} failed" + + +class TestBench: + """Test the benchmark pipeline (GenData + BenchRunner).""" + + def test_validate(self, project_with_fixtures): + """GenData validation: generate a small dataset and run validation.""" + proj, _, target = project_with_fixtures + result = run_mvn( + proj, + "compile", + "exec:java", + extra_args=[ + *(["-Pcuda-native-udf"] if target == "cuda" else []), + "-Dexec.mainClass=com.udf.bench.GenData", + "-Dexec.classpathScope=compile", + "-Dexec.args=--rows 1000 --validate --spark-conf spark.master=local[*]", + ], + ) + assert result.returncode == 0, "GenData validate failed" + + def test_spark_e2e(self, project_with_fixtures): + """End-to-end: GenData generates data, SparkBenchRunner benchmarks CPU/GPU.""" + proj, _, target = project_with_fixtures + + data_dir = os.path.join(proj, "data", "bench_input") + result_path = os.path.join(proj, "results", "bench_result.json") + mvn_args = ["--mvn-arg", "-Pcuda-native-udf"] if target == "cuda" else [] + try: + # GenData: generate parquet + gen_result = run_script( + os.path.join(proj, "run_gen_data.sh"), + args=["--rows", "1000", "--output-path", data_dir, *mvn_args], + ) + assert gen_result.returncode == 0, "run_gen_data.sh failed" + + # BenchRunner: run both benchmarks + for mode in ["cpu", "gpu"]: + bench_result = run_script( + os.path.join(proj, "run_spark_benchmark.sh"), + args=[ + "--mode", + mode, + "--data-path", + data_dir, + "--result-path", + result_path, + *mvn_args, + ], + ) + assert ( + bench_result.returncode == 0 + ), f"run_spark_benchmark.sh --mode {mode} failed" + assert os.path.isfile( + result_path + ), f"Result file not created: {result_path}" + finally: + shutil.rmtree(data_dir, ignore_errors=True) + if os.path.isfile(result_path): + os.remove(result_path) + + def test_micro_e2e(self, project_with_fixtures): + """End-to-end: GenData generates data, MicroBenchRunner benchmarks CPU/GPU.""" + proj, _, target = project_with_fixtures + if target not in {"cudf", "cuda"}: + pytest.skip("MicroBenchRunner only applies to RapidsUDF targets") + + data_dir = os.path.join(proj, "data", "micro_input") + mvn_args = ["--mvn-arg", "-Pcuda-native-udf"] if target == "cuda" else [] + try: + # GenData: generate parquet + gen_result = run_script( + os.path.join(proj, "run_gen_data.sh"), + args=["--rows", "1000", "--output-path", data_dir, *mvn_args], + ) + assert gen_result.returncode == 0, "run_gen_data.sh failed" + + # MicroBenchRunner: run both benchmarks + bench_result = run_script( + os.path.join(proj, "run_micro_benchmark.sh"), + args=["--mode", "all", "--data-path", data_dir, *mvn_args], + ) + assert bench_result.returncode == 0, ( + "run_micro_benchmark.sh failed:\n" + + bench_result.stdout + + bench_result.stderr + ) + finally: + shutil.rmtree(data_dir, ignore_errors=True) + + +class TestErrors: + """Verify that errors are caught by the test harness.""" + + def test_comparison_catches_gpu_error(self, project_with_broken_gpu): + """Comparison test should fail when GPU implementation produces wrong results.""" + proj, target = project_with_broken_gpu + suite = ( + "com.udf.SqlComparisonTest" + if target == "sql" + else "com.udf.CudfComparisonTest" + ) + + result = run_mvn( + proj, + "test", + extra_args=[ + *(["-Pcuda-native-udf"] if target == "cuda" else []), + f"-Dsuites={suite}", + ], + ) + assert ( + result.returncode != 0 + ), f"{suite} should have failed with broken GPU implementation" + combined = result.stdout + result.stderr + assert ( # 123 * 2 vs. 123 * 3, since we swapped multiplier + "246" in combined and "369" in combined + ), "Expected to see mismatch in test output" + + if target == "cudf": + # Re-run with leak detection to verify the unclosed Scalar is reported + leak_result = run_mvn( + proj, + "test", + extra_args=[ + f"-Dsuites={suite}", + "-Ddebug.memory.leaks=true", + ], + ) + assert ( + leak_result.returncode != 0 + ), f"{suite} should have failed with broken GPU implementation" + leak_output = leak_result.stdout + leak_result.stderr + assert "A SCALAR WAS LEAKED" in leak_output, "Expected memory leak" + + def test_bench_validate_catches_error(self, project_with_broken_schema): + """GenData --validate should fail when synthetic data has wrong schema.""" + result = run_mvn( + project_with_broken_schema, + "compile", + "exec:java", + extra_args=[ + "-Dexec.mainClass=com.udf.bench.GenData", + "-Dexec.classpathScope=compile", + "-Dexec.args=--rows 1000 --validate --spark-conf spark.master=local[*]", + ], + ) + assert ( + result.returncode != 0 + ), "GenData --validate should have failed with wrong schema" + assert ( + "org.apache.spark.sql.AnalysisException" in result.stderr + ), "Expected to see schema error message" diff --git a/skills/tests/utils.py b/skills/tests/utils.py new file mode 100644 index 00000000000..1697f680571 --- /dev/null +++ b/skills/tests/utils.py @@ -0,0 +1,78 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Shared test utilities for JVM skill integration tests. +""" + +import re +import subprocess +import sys +from typing import Optional + + +def _echo_indented(result: subprocess.CompletedProcess, prefix: str = " ") -> None: + """Write indented output to stdout/stderr so it is visible via pytest -s.""" + + def _indent(text: str) -> str: + return "\n" + "".join(prefix + line for line in text.splitlines(keepends=True)) + + if result.stdout: + sys.stdout.write(_indent(result.stdout)) + if result.stderr: + sys.stderr.write(_indent(result.stderr)) + + +def run_mvn( + work_dir: str, + *goals: str, + extra_args: Optional[list[str]] = None, + timeout: int = 300, +) -> subprocess.CompletedProcess: + """Run Maven in work_dir with the given goals.""" + cmd = ["mvn", *goals, "-q"] + if extra_args: + cmd.extend(extra_args) + result = subprocess.run( + cmd, + cwd=work_dir, + capture_output=True, + text=True, + timeout=timeout, + ) + _echo_indented(result) + return result + + +def run_script( + script_path: str, + args: Optional[list[str]] = None, + timeout: int = 300, +) -> subprocess.CompletedProcess: + """Run a bash script with the given arguments.""" + cmd = ["bash", script_path] + if args: + cmd.extend(args) + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + ) + _echo_indented(result) + return result + + +def replace_scala_todo_method(source: str, method_name: str, new_body: str) -> str: + """ + Replace a TODO method stub in a Scala source file with a real implementation. + Assumes stubs look like "def foo(...) = ???" + """ + pattern = re.compile( + r" def " + re.escape(method_name) + r"\b.*?\?\?\?", + re.DOTALL, + ) + result = pattern.sub(new_body, source, count=1) + if result == source: + raise ValueError(f"Could not find TODO method '{method_name}' in source") + return result