Skip to content

Commit 328bc96

Browse files
committed
init
1 parent 9cc309a commit 328bc96

File tree

7 files changed

+30
-14
lines changed

7 files changed

+30
-14
lines changed

kernel/unitycatalog/src/main/java/io/delta/kernel/unitycatalog/UCCatalogManagedCommitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,8 @@ private void commitToUC(
373373
commitMetadata.getMaxKnownPublishedDeltaVersion(),
374374
false /* isDisown */,
375375
generateMetadataPayloadOpt(commitMetadata).map(MetadataAdapter::new),
376-
commitMetadata.getNewProtocolOpt().map(ProtocolAdapter::new));
376+
commitMetadata.getNewProtocolOpt().map(ProtocolAdapter::new),
377+
commitMetadata.getCommitterProperties());
377378
return null;
378379
} catch (io.delta.storage.commit.CommitFailedException cfe) {
379380
throw storageCFEtoKernelCFE(cfe);

kernel/unitycatalog/src/test/scala/io/delta/kernel/unitycatalog/InMemoryUCClient.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package io.delta.kernel.unitycatalog
1818

1919
import java.lang.{Long => JLong}
2020
import java.net.URI
21-
import java.util.Optional
21+
import java.util
22+
import java.util.{Collections, Map, Optional}
2223
import java.util.concurrent.ConcurrentHashMap
24+
import java.util.function.Supplier
2325

2426
import scala.collection.JavaConverters._
2527
import scala.collection.mutable.ArrayBuffer
@@ -153,7 +155,9 @@ class InMemoryUCClient(ucMetastoreId: String) extends UCClient {
153155
lastKnownBackfilledVersionOpt: Optional[JLong],
154156
disown: Boolean,
155157
newMetadata: Optional[AbstractMetadata],
156-
newProtocol: Optional[AbstractProtocol]): Unit = {
158+
newProtocol: Optional[AbstractProtocol],
159+
committerProperties: Supplier[util.Map[String, String]] =
160+
() => Collections.emptyMap()): Unit = {
157161
forceThrowInCommitMethod()
158162

159163
if (disown) {

spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/InMemoryUCClient.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.coordinatedcommits
1818

1919
import java.lang.{Long => JLong}
2020
import java.net.URI
21+
import java.util.Collections
2122
import java.util.Optional
23+
import java.util.function.Supplier
2224

2325
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
2426
import io.delta.storage.commit.{Commit => JCommit, GetCommitsResponse => JGetCommitsResponse}
@@ -63,7 +65,9 @@ class InMemoryUCClient(
6365
lastKnownBackfilledVersion: Optional[JLong],
6466
disown: Boolean,
6567
newMetadata: Optional[AbstractMetadata],
66-
newProtocol: Optional[AbstractProtocol]): Unit = {
68+
newProtocol: Optional[AbstractProtocol],
69+
committerProperties: Supplier[java.util.Map[String, String]] =
70+
() => Collections.emptyMap()): Unit = {
6771
ucCommitCoordinator.commitToCoordinator(
6872
tableId,
6973
tableUri,

spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorClientSuiteBase.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.coordinatedcommits
1818

1919
import java.io.File
2020
import java.net.URI
21-
import java.util.{Optional, UUID}
21+
import java.util.{Collections, Optional, UUID}
2222

2323
import scala.collection.JavaConverters._
2424

@@ -124,7 +124,8 @@ trait UCCommitCoordinatorClientSuiteBase extends CommitCoordinatorClientImplSuit
124124
Optional.of(version),
125125
false,
126126
Optional.empty(),
127-
Optional.empty())
127+
Optional.empty(),
128+
() => Collections.emptyMap())
128129
}
129130

130131
override protected def validateBackfillStrategy(

storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCClient.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424

2525
import java.io.IOException;
2626
import java.net.URI;
27+
import java.util.Map;
2728
import java.util.Optional;
29+
import java.util.function.Supplier;
2830

2931
/**
3032
* Interface for interacting with the Unity Catalog.
@@ -76,6 +78,10 @@ public interface UCClient extends AutoCloseable {
7678
* If present, the table's metadata will be updated atomically with the commit.
7779
* @param newProtocol An Optional containing a new protocol version to be applied to the table.
7880
* If present, the table's protocol will be updated atomically with the commit.
81+
* @param committerProperties Additional catalog-specific properties to be passed through to the
82+
* UC server. These opaque key-value pairs may be used by specific UC
83+
* client implementations as needed. The interpretation and usage of these
84+
* properties is implementation-specific.
7985
* @throws IOException if there's an error during the commit process, such as network issues.
8086
* @throws CommitFailedException if the commit fails due to conflicts or other logical errors.
8187
* @throws UCCommitCoordinatorException if there's an error specific to the Unity Catalog
@@ -88,7 +94,8 @@ void commit(
8894
Optional<Long> lastKnownBackfilledVersion,
8995
boolean disown,
9096
Optional<AbstractMetadata> newMetadata,
91-
Optional<AbstractProtocol> newProtocol
97+
Optional<AbstractProtocol> newProtocol,
98+
Supplier<Map<String, String>> committerProperties
9299
) throws IOException, CommitFailedException, UCCommitCoordinatorException;
93100

94101
/**

storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCCommitCoordinatorClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,8 @@ protected void commitToUC(
683683
lastKnownBackfilledVersion,
684684
disown,
685685
newMetadata,
686-
newProtocol
686+
newProtocol,
687+
Collections::emptyMap /* committerProperties */
687688
);
688689
}
689690

storage/src/main/java/io/delta/storage/commit/uccommitcoordinator/UCTokenBasedRestClient.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,8 @@
4949

5050
import java.io.IOException;
5151
import java.net.URI;
52-
import java.util.ArrayList;
53-
import java.util.Arrays;
54-
import java.util.List;
55-
import java.util.Objects;
56-
import java.util.Optional;
52+
import java.util.*;
53+
import java.util.function.Supplier;
5754

5855
/**
5956
* A REST client implementation of [[UCClient]] for interacting with Unity Catalog's commit
@@ -186,7 +183,8 @@ public void commit(
186183
Optional<Long> lastKnownBackfilledVersion,
187184
boolean disown,
188185
Optional<AbstractMetadata> newMetadata,
189-
Optional<AbstractProtocol> newProtocol
186+
Optional<AbstractProtocol> newProtocol,
187+
Supplier<Map<String, String>> committerProperties
190188
) throws IOException, CommitFailedException, UCCommitCoordinatorException {
191189
// Validate required parameters
192190
Objects.requireNonNull(tableId, "tableId must not be null.");

0 commit comments

Comments
 (0)