Skip to content

Commit 6d5cb0e

Browse files
committed
snapshot manager factory
1 parent 9102a25 commit 6d5cb0e

File tree

2 files changed

+94
-2
lines changed

2 files changed

+94
-2
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.delta.kernel.Snapshot;
2222
import io.delta.kernel.spark.read.SparkScanBuilder;
2323
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
24-
import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
24+
import io.delta.kernel.spark.snapshot.SnapshotManagerFactory;
2525
import io.delta.kernel.spark.unity.UnityCatalogClientFactory;
2626
import io.delta.kernel.spark.utils.SchemaUtils;
2727
import java.util.*;
@@ -143,7 +143,9 @@ private SparkTable(
143143

144144
this.hadoopConf =
145145
SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
146-
this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);
146+
this.snapshotManager =
147+
SnapshotManagerFactory.create(
148+
identifier, tablePath, hadoopConf, this.catalogTable, this.unityCatalogClient);
147149
// Load the initial snapshot through the manager
148150
this.initialSnapshot = snapshotManager.loadLatestSnapshot();
149151
this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(initialSnapshot.getSchema());
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.spark.snapshot;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
import io.delta.kernel.spark.unity.UnityCatalogClientFactory;
21+
import io.delta.kernel.spark.utils.CatalogTableUtils;
22+
import java.util.Optional;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
25+
import org.apache.spark.sql.connector.catalog.Identifier;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
/**
30+
* Factory responsible for instantiating {@link DeltaSnapshotManager} implementations.
31+
*
32+
* <p>The factory centralises the decision of whether a table should use the traditional
33+
* filesystem-based snapshot manager or a catalog-backed implementation. Today all tables rely on
34+
* {@link PathBasedSnapshotManager}. Catalog-managed support will be integrated in a subsequent
35+
* change once the corresponding snapshot manager is implemented.
36+
*/
37+
public final class SnapshotManagerFactory {
38+
39+
private static final Logger LOG = LoggerFactory.getLogger(SnapshotManagerFactory.class);
40+
41+
private SnapshotManagerFactory() {}
42+
43+
/**
44+
* Creates an appropriate {@link DeltaSnapshotManager} based on the provided metadata.
45+
*
46+
* @param identifier Spark identifier for the table being resolved
47+
* @param tablePath canonical filesystem path to the table root
48+
* @param hadoopConf Hadoop configuration pre-populated with user options
49+
* @param catalogTable optional Spark {@link CatalogTable} descriptor when available
50+
* @param unityCatalogClient optional Unity Catalog client handle for catalog-managed tables
51+
* @return a snapshot manager implementation ready to serve table snapshots
52+
*/
53+
public static DeltaSnapshotManager create(
54+
Identifier identifier,
55+
String tablePath,
56+
Configuration hadoopConf,
57+
Optional<CatalogTable> catalogTable,
58+
Optional<UnityCatalogClientFactory.UnityCatalogClient> unityCatalogClient) {
59+
60+
requireNonNull(identifier, "identifier is null");
61+
requireNonNull(tablePath, "tablePath is null");
62+
requireNonNull(hadoopConf, "hadoopConf is null");
63+
requireNonNull(catalogTable, "catalogTable optional is null");
64+
requireNonNull(unityCatalogClient, "unityCatalogClient optional is null");
65+
66+
if (catalogTable.isPresent()
67+
&& CatalogTableUtils.isUnityCatalogManagedTable(catalogTable.get())
68+
&& unityCatalogClient.isPresent()) {
69+
LOG.debug(
70+
"Unity Catalog-managed table '{}' detected. Falling back to PathBasedSnapshotManager "
71+
+ "until catalog-managed support is wired.",
72+
identifier);
73+
}
74+
75+
return new PathBasedSnapshotManager(tablePath, hadoopConf);
76+
}
77+
78+
/**
79+
* Convenience overload for path-based tables without Spark catalog metadata.
80+
*
81+
* @param identifier Spark identifier for the table being resolved
82+
* @param tablePath canonical filesystem path to the table root
83+
* @param hadoopConf Hadoop configuration pre-populated with user options
84+
* @return a {@link PathBasedSnapshotManager} instance
85+
*/
86+
public static DeltaSnapshotManager createForPath(
87+
Identifier identifier, String tablePath, Configuration hadoopConf) {
88+
return create(identifier, tablePath, hadoopConf, Optional.empty(), Optional.empty());
89+
}
90+
}

0 commit comments

Comments
 (0)