Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ private void commitToUC(
commitMetadata.getMaxKnownPublishedDeltaVersion(),
false /* isDisown */,
generateMetadataPayloadOpt(commitMetadata).map(MetadataAdapter::new),
commitMetadata.getNewProtocolOpt().map(ProtocolAdapter::new));
commitMetadata.getNewProtocolOpt().map(ProtocolAdapter::new),
commitMetadata.getCommitterProperties());
return null;
} catch (io.delta.storage.commit.CommitFailedException cfe) {
throw storageCFEtoKernelCFE(cfe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package io.delta.kernel.unitycatalog

import java.lang.{Long => JLong}
import java.net.URI
import java.util.Optional
import java.util
import java.util.{Collections, Map, Optional}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Supplier

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -153,7 +155,9 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient {
lastKnownBackfilledVersionOpt: Optional[JLong],
disown: Boolean,
newMetadata: Optional[AbstractMetadata],
newProtocol: Optional[AbstractProtocol]): Unit = {
newProtocol: Optional[AbstractProtocol],
committerProperties: Supplier[util.Map[String, String]] =
() => Collections.emptyMap()): Unit = {
forceThrowInCommitMethod()

if (disown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.coordinatedcommits

import java.lang.{Long => JLong}
import java.net.URI
import java.util.Collections
import java.util.Optional
import java.util.function.Supplier

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import io.delta.storage.commit.{Commit => JCommit, GetCommitsResponse => JGetCommitsResponse}
Expand Down Expand Up @@ -63,7 +65,9 @@ class InMemoryUCClient(
lastKnownBackfilledVersion: Optional[JLong],
disown: Boolean,
newMetadata: Optional[AbstractMetadata],
newProtocol: Optional[AbstractProtocol]): Unit = {
newProtocol: Optional[AbstractProtocol],
committerProperties: Supplier[java.util.Map[String, String]] =
() => Collections.emptyMap()): Unit = {
ucCommitCoordinator.commitToCoordinator(
tableId,
tableUri,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.coordinatedcommits

import java.io.File
import java.net.URI
import java.util.{Optional, UUID}
import java.util.{Collections, Optional, UUID}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -124,7 +124,8 @@ trait UCCommitCoordinatorClientSuiteBase extends CommitCoordinatorClientImplSuit
Optional.of(version),
false,
Optional.empty(),
Optional.empty())
Optional.empty(),
() => Collections.emptyMap())
}

override protected def validateBackfillStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Interface for interacting with the Unity Catalog.
Expand Down Expand Up @@ -76,6 +78,10 @@ public interface UCClient extends AutoCloseable {
* If present, the table's metadata will be updated atomically with the commit.
* @param newProtocol An Optional containing a new protocol version to be applied to the table.
* If present, the table's protocol will be updated atomically with the commit.
* @param committerProperties Additional catalog-specific properties to be passed through to the
* UC server. These opaque key-value pairs may be used by specific UC
* client implementations as needed. The interpretation and usage of these
* properties is implementation-specific.
* @throws IOException if there's an error during the commit process, such as network issues.
* @throws CommitFailedException if the commit fails due to conflicts or other logical errors.
* @throws UCCommitCoordinatorException if there's an error specific to the Unity Catalog
Expand All @@ -88,7 +94,8 @@ void commit(
Optional<Long> lastKnownBackfilledVersion,
boolean disown,
Optional<AbstractMetadata> newMetadata,
Optional<AbstractProtocol> newProtocol
Optional<AbstractProtocol> newProtocol,
Supplier<Map<String, String>> committerProperties
) throws IOException, CommitFailedException, UCCommitCoordinatorException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,8 @@ protected void commitToUC(
lastKnownBackfilledVersion,
disown,
newMetadata,
newProtocol
newProtocol,
Collections::emptyMap /* committerProperties */
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.*;
import java.util.function.Supplier;

/**
* A REST client implementation of [[UCClient]] for interacting with Unity Catalog's commit
Expand Down Expand Up @@ -186,7 +183,8 @@ public void commit(
Optional<Long> lastKnownBackfilledVersion,
boolean disown,
Optional<AbstractMetadata> newMetadata,
Optional<AbstractProtocol> newProtocol
Optional<AbstractProtocol> newProtocol,
Supplier<Map<String, String>> committerProperties
) throws IOException, CommitFailedException, UCCommitCoordinatorException {
// Validate required parameters
Objects.requireNonNull(tableId, "tableId must not be null.");
Expand Down
Loading