diff --git a/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/MicroBenchRunner.scala b/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/MicroBenchRunner.scala index f1f22cc4469..1666b8d35b5 100644 --- a/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/MicroBenchRunner.scala +++ b/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/MicroBenchRunner.scala @@ -85,20 +85,19 @@ object MicroBenchRunner { def executeCpu(data: Array[AnyRef], numRows: Int): Unit = ??? /** - * TODO: Execute the GPU UDF via evaluateColumnar. + * TODO: Execute the GPU UDF via evaluateColumnar and close its result. * * Example: * {{{ * val udf = new com.udf.PlaceholderRapidsUDFName() - * udf.evaluateColumnar(numRows, - * table.getColumn(0), table.getColumn(1)) + * withResource(udf.evaluateColumnar(numRows, + * table.getColumn(0), table.getColumn(1))) { _ => } * }}} * * @param table the dataset loaded on GPU * @param numRows number of rows in the dataset - * @return result ColumnVector (NOTE: caller must close) */ - def executeGpu(table: Table, numRows: Int): ColumnVector = ??? + def executeGpu(table: Table, numRows: Int): Unit = ??? def main(args: Array[String]): Unit = { val parsed = parseArgs(args) @@ -165,7 +164,7 @@ object MicroBenchRunner { if (runGpu) { try { val times = runBenchmark(warmup, measured, profile = profile) { - withResource(executeGpu(table, numRows)) { _ => } + executeGpu(table, numRows) } val medianMs = times(times.length / 2) / 1e6 val minMs = times(0) / 1e6 diff --git a/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/SparkBenchRunner.scala b/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/SparkBenchRunner.scala index 7583ae10fa7..c1d6a9f372a 100644 --- a/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/SparkBenchRunner.scala +++ b/skills/udf-gen-test/templates/scala/src/main/scala/com/udf/bench/SparkBenchRunner.scala @@ -112,31 +112,26 @@ object SparkBenchRunner { val resultDir = new File(path).getParentFile if (resultDir != null) resultDir.mkdirs() - try { - import java.util.{LinkedHashMap => JLinkedHashMap, Arrays => JArrays} - val report = new JLinkedHashMap[String, AnyRef]() - report.put("mode", mode) - report.put("data_path", dataPath) - report.put("status", status) - report.put("e2e_runtime", java.lang.Double.valueOf(elapsed)) - report.put("cli_args", JArrays.asList(cliArgs: _*)) - errorMessage.foreach { msg => - val error = new JLinkedHashMap[String, String]() - error.put("error_message", msg) - errorLogFile.foreach(f => error.put("error_log_file", f)) - report.put("error", error) - } - - val mapper = new ObjectMapper() - mapper.enable(SerializationFeature.INDENT_OUTPUT) - val printer = new DefaultPrettyPrinter() - printer.indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE) - mapper.writer(printer).writeValue(new File(path), report) - System.err.println(s"Report written to: $path") - } catch { - case e: Exception => - System.err.println(s"Failed to write report: ${e.getMessage}") + import java.util.{LinkedHashMap => JLinkedHashMap, Arrays => JArrays} + val report = new JLinkedHashMap[String, AnyRef]() + report.put("mode", mode) + report.put("data_path", dataPath) + report.put("status", status) + report.put("e2e_runtime", java.lang.Double.valueOf(elapsed)) + report.put("cli_args", JArrays.asList(cliArgs: _*)) + errorMessage.foreach { msg => + val error = new JLinkedHashMap[String, String]() + error.put("error_message", msg) + errorLogFile.foreach(f => error.put("error_log_file", f)) + report.put("error", error) } + + val mapper = new ObjectMapper() + mapper.enable(SerializationFeature.INDENT_OUTPUT) + val printer = new DefaultPrettyPrinter() + printer.indentArraysWith(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE) + mapper.writer(printer).writeValue(new File(path), report) + System.err.println(s"Report written to: $path") } /** Write an exception to an error log file. */ diff --git a/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/CudfComparisonTest.scala b/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/CudfComparisonTest.scala index 26eec32ddc6..f13bfe4c108 100644 --- a/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/CudfComparisonTest.scala +++ b/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/CudfComparisonTest.scala @@ -32,17 +32,18 @@ class CudfComparisonTest extends AnyFunSuite with BeforeAndAfterAll { def registerRapidsUDF(spark: SparkSession, udfName: String): Unit = ??? test("UDF vs RapidsUDF") { - val testDF = UnitTest.createTestData(spark).repartition(1) + // Repartition down to 2 tasks to ensure we exercise multi-row columns. + val testDF = UnitTest.createTestData(spark).repartition(2) // Run CPU UDF UnitTest.registerUDF(spark, "placeholder_udf_name") val cpuResultDF = UnitTest.executeUDF(spark, "placeholder_udf_name", testDF) - UnitTest.verifyUDFResults(cpuResultDF, testDF) + UnitTest.assertUDFResults(cpuResultDF, testDF) // Run RapidsUDF registerRapidsUDF(spark, "placeholder_rapids_udf_name") val gpuResultDF = UnitTest.executeUDF(spark, "placeholder_rapids_udf_name", testDF) - UnitTest.verifyUDFResults(gpuResultDF, testDF) + UnitTest.assertUDFResults(gpuResultDF, testDF) // Compare TestUtils.assertDataFrameEquals(actual = gpuResultDF, expected = cpuResultDF) diff --git a/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/SqlComparisonTest.scala b/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/SqlComparisonTest.scala index 1fa4cd4d78e..a93e06ef8d3 100644 --- a/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/SqlComparisonTest.scala +++ b/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/SqlComparisonTest.scala @@ -30,19 +30,20 @@ class SqlComparisonTest extends AnyFunSuite with BeforeAndAfterAll { } test("UDF vs SQL expression") { - val testDF = UnitTest.createTestData(spark).repartition(1) + // Repartition down to 2 tasks to ensure we exercise multi-row columns. + val testDF = UnitTest.createTestData(spark).repartition(2) // Run CPU UDF UnitTest.registerUDF(spark, "placeholder_udf_name") val udfResultDF = UnitTest.executeUDF(spark, "placeholder_udf_name", testDF) - UnitTest.verifyUDFResults(udfResultDF, testDF) + UnitTest.assertUDFResults(udfResultDF, testDF) // Read and execute SQL expression testDF.createOrReplaceTempView("test_table") val sqlSource = scala.io.Source.fromFile("src/main/resources/placeholder_udf_name.sql") val sqlContent = try sqlSource.mkString finally sqlSource.close() val sqlResultDF = spark.sql(sqlContent) - UnitTest.verifyUDFResults(sqlResultDF, testDF) + UnitTest.assertUDFResults(sqlResultDF, testDF) // Compare results TestUtils.assertDataFrameEquals(actual = sqlResultDF, expected = udfResultDF) diff --git a/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/UnitTest.scala b/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/UnitTest.scala index 278d13e4c63..4e508ef56e1 100644 --- a/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/UnitTest.scala +++ b/skills/udf-gen-test/templates/scala/src/test/scala/com/udf/UnitTest.scala @@ -13,7 +13,8 @@ import org.scalatest.BeforeAndAfterAll object UnitTest extends Assertions { /** - * TODO: Create a test DataFrame with diverse test cases including edge cases. + * TODO: Create a test DataFrame with diverse test cases including edge cases + * (at least 10+ cases). * * Example: * {{{ @@ -24,7 +25,8 @@ object UnitTest extends Assertions { * val testData = Seq( * Row(1, 800), * Row(2, 550), - * Row(3, null) + * Row(3, null), + * // ... * ) * spark.createDataFrame(spark.sparkContext.parallelize(testData), schema) * }}} @@ -55,7 +57,7 @@ object UnitTest extends Assertions { def executeUDF(spark: SparkSession, udfName: String, testDF: DataFrame): DataFrame = ??? /** - * TODO: Verify UDF results using assert statements. + * TODO: Assert the UDF results match expectations. * * Example: * {{{ @@ -65,7 +67,7 @@ object UnitTest extends Assertions { * assert(results(2).getAs[String]("risk_level") === "UNKNOWN") * }}} */ - def verifyUDFResults(resultDF: DataFrame, testDF: DataFrame): Unit = ??? + def assertUDFResults(resultDF: DataFrame, testDF: DataFrame): Unit = ??? } class UnitTest extends AnyFunSuite with BeforeAndAfterAll { @@ -89,11 +91,12 @@ class UnitTest extends AnyFunSuite with BeforeAndAfterAll { } test("UDF produces correct results") { - val testDF = UnitTest.createTestData(spark).repartition(1) + // Repartition down to 2 tasks to ensure we exercise multi-row columns. + val testDF = UnitTest.createTestData(spark).repartition(2) UnitTest.registerUDF(spark, "placeholder_udf_name") val resultDF = UnitTest.executeUDF(spark, "placeholder_udf_name", testDF) - UnitTest.verifyUDFResults(resultDF, testDF) + UnitTest.assertUDFResults(resultDF, testDF) } } diff --git a/skills/udf-judge-conversion/SKILL.md b/skills/udf-judge-conversion/SKILL.md index 8b01cecb745..f1a5c4a7c06 100644 --- a/skills/udf-judge-conversion/SKILL.md +++ b/skills/udf-judge-conversion/SKILL.md @@ -39,7 +39,7 @@ Check that: - Assertions verify schema, row count, deterministic ordering, output values, null propagation, and exception/default behavior where applicable. - The test exercises visible CPU UDF branches. Coverage reports should support this when available. - Assertions reflect the CPU UDF's actual behavior and do not merely assert weak properties such as non-null output. -- Extra unit tests outside the shared `verifyUDFResults` path are mirrored in the comparison test and run against both CPU and GPU/SQL paths. +- Extra unit tests outside the shared `assertUDFResults` path are mirrored in the comparison test and run against both CPU and GPU/SQL paths. ## Comparison Test Checks