diff --git a/.gitignore b/.gitignore index 449bffa18e5..d39cdcd1484 100644 --- a/.gitignore +++ b/.gitignore @@ -136,3 +136,7 @@ override.tf.json # Ignore kernel benchmark report kernel/kernel-benchmarks/benchmark_report.json +# Unity Catalog test artifacts +spark/unitycatalog/etc/ +.scala-build/ + diff --git a/build.sbt b/build.sbt index 357d34fb40d..c28bc31a11e 100644 --- a/build.sbt +++ b/build.sbt @@ -705,6 +705,76 @@ lazy val contribs = (project in file("contribs")) Compile / compile := ((Compile / compile) dependsOn createTargetClassesDir).value ).configureUnidoc() + +val unityCatalogVersion = "0.3.0" +val sparkUnityCatalogJacksonVersion = "2.15.4" // We are using Spark 4.0's Jackson version 2.15.x, to override Unity Catalog 0.3.0's version 2.18.x + +lazy val sparkUnityCatalog = (project in file("spark/unitycatalog")) + .dependsOn(spark % "compile->compile;test->test;provided->provided") + .disablePlugins(ScalafmtPlugin) + .settings( + name := "delta-spark-unitycatalog", + commonSettings, + skipReleaseSettings, + CrossSparkVersions.sparkDependentSettings(sparkVersion), + + // This is a test-only module - no production sources + Compile / sources := Seq.empty, + + // Ensure Java sources are picked up + Test / unmanagedSourceDirectories += baseDirectory.value / "src" / "test" / "java", + + Test / javaOptions ++= Seq("-ea"), + + // Don't execute in parallel since we can't have multiple Sparks in the same JVM + Test / parallelExecution := false, + + // Force ALL Jackson dependencies to match Spark's Jackson version + // This overrides Jackson from Unity Catalog's transitive dependencies (e.g., Armeria) + dependencyOverrides ++= Seq( + "com.fasterxml.jackson.core" % "jackson-core" % sparkUnityCatalogJacksonVersion, + "com.fasterxml.jackson.core" % "jackson-annotations" % sparkUnityCatalogJacksonVersion, + "com.fasterxml.jackson.core" % "jackson-databind" % sparkUnityCatalogJacksonVersion, + "com.fasterxml.jackson.module" %% "jackson-module-scala" % sparkUnityCatalogJacksonVersion, + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % sparkUnityCatalogJacksonVersion, + "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % sparkUnityCatalogJacksonVersion, + "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % sparkUnityCatalogJacksonVersion + ), + + libraryDependencies ++= Seq( + // Standard test dependencies + "org.scalatest" %% "scalatest" % scalaTestVersion % "test", + + // Unity Catalog dependencies - exclude Jackson to use Spark's Jackson 2.15.x + "io.unitycatalog" %% "unitycatalog-spark" % unityCatalogVersion % "test" excludeAll( + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ), + "io.unitycatalog" % "unitycatalog-server" % unityCatalogVersion % "test" excludeAll( + ExclusionRule(organization = "com.fasterxml.jackson.core"), + ExclusionRule(organization = "com.fasterxml.jackson.module"), + ExclusionRule(organization = "com.fasterxml.jackson.datatype"), + ExclusionRule(organization = "com.fasterxml.jackson.dataformat") + ), + + // Spark test dependencies + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "test", + + // TODO: Let's define a common junit version. + "org.junit.jupiter" % "junit-jupiter" % "5.10.3" % Test, + "org.apache.hadoop" % "hadoop-aws" % hadoopVersion % Test, + "net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test, + "org.assertj" % "assertj-core" % "3.26.3" % Test, + ), + + Test / testOptions += Tests.Argument("-oDF"), + Test / testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a") + ) + lazy val sharing = (project in file("sharing")) .dependsOn(spark % "compile->compile;test->test;provided->provided") .disablePlugins(JavaFormatterPlugin, ScalafmtPlugin) @@ -1515,7 +1585,7 @@ val createTargetClassesDir = taskKey[Unit]("create target classes dir") // Don't use these groups for any other projects lazy val sparkGroup = project - .aggregate(spark, sparkV1, sparkV1Filtered, sparkV2, contribs, storage, storageS3DynamoDB, sharing, hudi) + .aggregate(spark, sparkV1, sparkV1Filtered, sparkV2, contribs, sparkUnityCatalog, storage, storageS3DynamoDB, sharing, hudi) .settings( // crossScalaVersions must be set to Nil on the aggregating project crossScalaVersions := Nil, diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UCManagedTableDMLTest.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UCManagedTableDMLTest.java new file mode 100644 index 00000000000..6ab74f2f5ae --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UCManagedTableDMLTest.java @@ -0,0 +1,36 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class UCManagedTableDMLTest extends UnityCatalogTestBase { + + @Test + public void testBasicOperation() { + String tableName = fullTableName("testBasicOperation"); + sql("CREATE TABLE %s (id INT, val STRING) USING DELTA LOCATION '%s'", + tableName, path("testBasicOperation")); + + sql("INSERT INTO %s VALUES (1, 'AAA'), (2, 'BBB')", tableName); + + assertEquals("Should have the expected results", + ImmutableList.of(row(1, "AAA"), row(2, "BBB")), + sql("SELECT * FROM %s ORDER BY id ASC", tableName)); + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogTestBase.java b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogTestBase.java new file mode 100644 index 00000000000..dee07db62bf --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/UnityCatalogTestBase.java @@ -0,0 +1,110 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.base.Preconditions; +import io.sparkuctest.extension.UnityCatalogExtension; +import io.sparkuctest.extension.UnityCatalogExtensionUtil; +import java.util.List; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class UnityCatalogTestBase { + + @RegisterExtension + public static UnityCatalogExtension UC_EXTENSION = UnityCatalogExtensionUtil.initialize(); + + private static SparkSession spark; + + @BeforeAll + public static void beforeAll() { + SparkConf conf = new SparkConf() + .setAppName("UnityCatalog Support Tests") + .setMaster("local[2]") + .set("spark.ui.enabled", "false") + .set("spark.sql.shuffle.partitions", "5") + // Delta Lake required configurations + .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"); + + // Configure with Unity Catalog + UC_EXTENSION.catalogSparkConf().forEach(conf::set); + + // Build the spark session. + spark = SparkSession.builder().config(conf).getOrCreate(); + } + + @AfterAll + public static void afterAll() { + if (spark != null) { + spark.stop(); + spark = null; + } + } + + protected String fullTableName(String tableName) { + return String.format("`%s`.`%s`.`%s`", + UC_EXTENSION.catalogName(), UC_EXTENSION.schemaName(), tableName); + } + + protected String path(String basename) { + Preconditions.checkNotNull(basename, "basename cannot be null"); + if (basename.startsWith("/")) { + return String.format("%s%s", UC_EXTENSION.rootTestingDir(), basename); + } else { + return String.format("%s/%s", UC_EXTENSION.rootTestingDir(), basename); + } + } + + public static List sql(String statement, Object... args) { + return spark.sql(String.format(statement, args)).collectAsList(); + } + + public static Object[] row(Object... args) { + return args; + } + + public static void assertEquals(String context, List expected, List actual) { + assertThat(expected) + .as("%s: number of results should match", context) + .hasSameSizeAs(actual); + + for (int row = 0; row < expected.size(); row += 1) { + Object[] expectedRow = expected.get(row); + Row actualRow = actual.get(row); + + assertEquals(context + ": row " + (row + 1), expectedRow, actualRow); + } + } + + public static void assertEquals(String context, Object[] expected, Row actual) { + assertThat(expected.length) + .as("%s: Number of columns should match", context) + .isEqualTo(actual.size()); + for (int i = 0; i < expected.length; i += 1) { + assertThat(expected[i]) + .as("%s: Element does not match", context) + .isEqualTo(actual.get(i)); + } + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/extension/LocalUnityCatalogExtension.java b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/LocalUnityCatalogExtension.java new file mode 100644 index 00000000000..17159262659 --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/LocalUnityCatalogExtension.java @@ -0,0 +1,171 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest.extension; + +import io.unitycatalog.client.ApiClient; +import io.unitycatalog.client.api.CatalogsApi; +import io.unitycatalog.client.api.SchemasApi; +import io.unitycatalog.client.model.CreateCatalog; +import io.unitycatalog.client.model.CreateSchema; +import io.unitycatalog.server.UnityCatalogServer; +import io.unitycatalog.server.utils.ServerProperties; +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.file.Files; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LocalUnityCatalogExtension implements UnityCatalogExtension { + + private static final Logger LOG = LoggerFactory.getLogger(LocalUnityCatalogExtension.class); + + private File tempDir; + private UnityCatalogServer ucServer; + private int ucPort; + private String ucToken = "mock-token"; + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + tempDir = Files.createTempDirectory("spark-uc-test-").toFile(); + ucServer = startUnityCatalogServer(); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + if (ucServer != null) { + ucServer.stop(); + } + + if (tempDir != null) { + FileUtils.deleteDirectory(tempDir); + } + } + + @Override + public String catalogName() { + return "unity"; + } + + @Override + public String schemaName() { + return "default"; + } + + @Override + public String catalogUri() { + return "http://localhost:" + ucPort; + } + + @Override + public String rootTestingDir() { + return tempDir.getAbsolutePath(); + } + + @Override + public Map catalogSparkConf() { + String catalogKey = String.format("spark.sql.catalog.%s", catalogName()); + return Map.of( + catalogKey, "io.unitycatalog.spark.UCSingleCatalog", + catalogKey + ".uri", catalogUri(), + catalogKey + ".token", ucToken + ); + } + + private UnityCatalogServer startUnityCatalogServer() throws Exception { + // Find an available port + ucPort = findAvailablePort(); + + // Set up server properties + Properties serverProps = new Properties(); + serverProps.setProperty("server.env", "test"); + + // Start UC server with configuration + ServerProperties initServerProperties = new ServerProperties(serverProps); + + UnityCatalogServer server = UnityCatalogServer.builder() + .port(ucPort) + .serverProperties(initServerProperties) + .build(); + + server.start(); + + // Poll for server readiness by checking if we can create an API client and query catalogs + int maxRetries = 30; + int retryDelayMs = 500; + boolean serverReady = false; + int retries = 0; + + while (!serverReady && retries < maxRetries) { + try { + ApiClient testClient = new ApiClient(); + testClient.setScheme("http"); + testClient.setHost("localhost"); + testClient.setPort(ucPort); + CatalogsApi catalogsApi = new CatalogsApi(testClient); + catalogsApi.listCatalogs(null, null); // This will throw if server is not ready + serverReady = true; + } catch (Exception e) { + Thread.sleep(retryDelayMs); + retries++; + } + } + + if (!serverReady) { + throw new RuntimeException( + "Unity Catalog server did not become ready after " + (maxRetries * retryDelayMs) + "ms"); + } + + // Create the catalog and default schema in the UC server + ApiClient client = new ApiClient(); + client.setScheme("http"); + client.setHost("localhost"); + client.setPort(ucPort); + + CatalogsApi catalogsApi = new CatalogsApi(client); + SchemasApi schemasApi = new SchemasApi(client); + + // Create catalog + catalogsApi.createCatalog( + new CreateCatalog() + .name(catalogName()) + .comment("Test catalog for Delta Lake integration") + ); + + // Create default schema + schemasApi.createSchema( + new CreateSchema() + .name(schemaName()) + .catalogName(catalogName()) + ); + + LOG.info("Unity Catalog server started and ready at {}", catalogUri()); + LOG.info("Created catalog '{}' with schema '{}'", catalogName(), schemaName()); + + return server; + } + + private int findAvailablePort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/extension/RemoteUnityCatalogExtension.java b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/RemoteUnityCatalogExtension.java new file mode 100644 index 00000000000..5141726acc5 --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/RemoteUnityCatalogExtension.java @@ -0,0 +1,96 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest.extension; + +import com.google.common.base.Preconditions; +import java.util.Map; +import org.apache.parquet.Strings; +import org.junit.jupiter.api.extension.ExtensionContext; + +public class RemoteUnityCatalogExtension implements UnityCatalogExtension { + + private String ucUri; + private String catalogName; + private String schemaName; + private String ucBaseLocation; + private String ucToken; + + @Override + public void beforeAll(ExtensionContext context) { + ucUri = System.getenv(UnityCatalogExtensionUtil.UC_URI); + Preconditions.checkArgument(!Strings.isNullOrEmpty(ucUri), + "Environment variable '%s' cannot be null or empty", + UnityCatalogExtensionUtil.UC_URI); + + catalogName = System.getenv(UnityCatalogExtensionUtil.UC_CATALOG_NAME); + Preconditions.checkArgument(!Strings.isNullOrEmpty(catalogName), + "Environment variable '%s' cannot be null or empty", + UnityCatalogExtensionUtil.UC_CATALOG_NAME); + + schemaName = System.getenv(UnityCatalogExtensionUtil.UC_SCHEMA_NAME); + Preconditions.checkArgument(!Strings.isNullOrEmpty(schemaName), + "Environment variable '%s' cannot be null or empty", + UnityCatalogExtensionUtil.UC_SCHEMA_NAME); + + ucBaseLocation = System.getenv(UnityCatalogExtensionUtil.UC_BASE_LOCATION); + Preconditions.checkArgument(!Strings.isNullOrEmpty(ucBaseLocation), + "Environment variable '%s' cannot be null or empty", + UnityCatalogExtensionUtil.UC_BASE_LOCATION); + + ucToken = System.getenv(UnityCatalogExtensionUtil.UC_TOKEN); + Preconditions.checkArgument(!Strings.isNullOrEmpty(ucToken), + "Environment variable '%s' cannot be null or empty", + UnityCatalogExtensionUtil.UC_TOKEN); + } + + @Override + public void afterAll(ExtensionContext context) { + } + + @Override + public String catalogName() { + return catalogName; + } + + @Override + public String schemaName() { + return schemaName; + } + + @Override + public String catalogUri() { + return ucUri; + } + + @Override + public String rootTestingDir() { + return ucBaseLocation; + } + + @Override + public Map catalogSparkConf() { + String catalogKey = String.format("spark.sql.catalog.%s", catalogName()); + return Map.of( + // TODO: Maybe we can abstract another extraSparkConf() method to set those non-catalog + // TODO: properties, such as filesystem setting. + "spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem", + catalogKey, "io.unitycatalog.spark.UCSingleCatalog", + catalogKey + ".uri", catalogUri(), + catalogKey + ".token", ucToken + ); + } +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/extension/UnityCatalogExtension.java b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/UnityCatalogExtension.java new file mode 100644 index 00000000000..a473b1170bd --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/UnityCatalogExtension.java @@ -0,0 +1,34 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest.extension; + +import java.util.Map; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; + +public interface UnityCatalogExtension extends BeforeAllCallback, AfterAllCallback { + + String catalogName(); + + String schemaName(); + + String catalogUri(); + + String rootTestingDir(); + + Map catalogSparkConf(); +} diff --git a/spark/unitycatalog/src/test/java/io/sparkuctest/extension/UnityCatalogExtensionUtil.java b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/UnityCatalogExtensionUtil.java new file mode 100644 index 00000000000..3f5ecca5f6c --- /dev/null +++ b/spark/unitycatalog/src/test/java/io/sparkuctest/extension/UnityCatalogExtensionUtil.java @@ -0,0 +1,58 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sparkuctest.extension; + +import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UnityCatalogExtensionUtil { + + private static final Logger LOG = LoggerFactory.getLogger(UnityCatalogExtensionUtil.class); + + public static final String UC_EXTENSION_CLASS = "UC_EXTENSION_CLASS"; + + // The unity catalog URI endpoint environment variable. + public static final String UC_URI = "UC_URI"; + public static final String UC_CATALOG_NAME = "UC_CATALOG_NAME"; + public static final String UC_SCHEMA_NAME = "UC_SCHEMA_NAME"; + public static final String UC_BASE_LOCATION = "UC_BASE_LOCATION"; + + // The unity catalog token environment variable, for personal access token (PAT). + public static final String UC_TOKEN = "UC_TOKEN"; + + public static UnityCatalogExtension initialize() { + UnityCatalogExtension extension; + + String implClass = System.getenv(UC_EXTENSION_CLASS); + if (!Strings.isNullOrEmpty(implClass)) { + LOG.info("THe initializing UnityCatalogExtension is: {}", implClass); + try { + extension = (UnityCatalogExtension) Class.forName(implClass) + .getDeclaredConstructor() + .newInstance(); + } catch (Exception e) { + throw new RuntimeException( + "Cannot initialize UnityCatalogExtension for class name: " + implClass, e); + } + } else { + extension = new LocalUnityCatalogExtension(); + } + + return extension; + } +}