Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public void checkVersionExists(long version, boolean mustBeRecreatable, boolean
@Override
public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Long> endVersion) {
CommitRangeBuilder builder =
TableManager.loadCommitRange(tablePath)
.withStartBoundary(CommitRangeBuilder.CommitBoundary.atVersion(startVersion));
TableManager.loadCommitRange(
tablePath, CommitRangeBuilder.CommitBoundary.atVersion(startVersion));

if (endVersion.isPresent()) {
builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ public interface CommitRange {
* <p>The boundary indicates whether the range was defined using a specific version number or a
* timestamp.
*
* @return an {@link Optional} containing the start boundary, or empty if the range was created
* with default start parameters (version 0)
* @return the start boundary for this commit range
*/
Optional<CommitRangeBuilder.CommitBoundary> getQueryStartBoundary();
CommitRangeBuilder.CommitBoundary getQueryStartBoundary();

/**
* Returns the original query boundary used to define the end boundary of this commit range, if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,15 @@
* A builder for creating {@link CommitRange} instances that define a contiguous range of commits in
* a Delta Lake table.
*
* <p>If no start specification is provided, the range defaults to starting at version 0. If no end
* specification is provided, the range defaults to the latest available version.
* <p>The start boundary is required and provided via {@link TableManager#loadCommitRange(String,
* CommitBoundary)}. If no end specification is provided, the range defaults to the latest available
* version.
*
* @since 3.4.0
*/
@Experimental
public interface CommitRangeBuilder {

/**
* Configures the builder to start the commit range at a specific version or timestamp.
*
* <p>If not specified, the commit range will default to starting at version 0.
*
* @param startBoundary the boundary specification for the start of the commit range, must not be
* null
* @return this builder instance configured with the specified start boundary
*/
CommitRangeBuilder withStartBoundary(CommitBoundary startBoundary);

/**
* Configures the builder to end the commit range at a specific version or timestamp.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ static CreateTableTransactionBuilder buildCreateTableTransaction(
/**
* Creates a builder for loading a CommitRange at a given path.
*
* <p>The returned builder can be configured with start version or timestamp and an end version or
* timestamp, and with additional metadata to optimize the loading process.
* <p>The returned builder can be configured with an end version or timestamp, and with additional
* metadata to optimize the loading process.
*
* @param path the file system path to the Delta table
* @param startBoundary the boundary specification for the start of the commit range, must not be
* null
* @return a {@link CommitRangeBuilder} that can be used to load a {@link CommitRange} at the
* given path
*/
static CommitRangeBuilder loadCommitRange(String path) {
return new CommitRangeBuilderImpl(path);
static CommitRangeBuilder loadCommitRange(
String path, CommitRangeBuilder.CommitBoundary startBoundary) {
return new CommitRangeBuilderImpl(path, startBoundary);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.delta.kernel.exceptions.UnknownConfigurationException;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
import io.delta.kernel.internal.util.IntervalParserUtils;
import java.util.*;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -277,6 +277,37 @@ public class TableConfig<T> {
"needs to be larger than or equal to -1.",
true);

/**
* IMPORTANT: This table property is recognized but is not yet validated, enforced, or implemented
* by Kernel.
*
* <p>The names of specific columns to collect stats on for data skipping. If present, it takes
* precedence over {@link #DATA_SKIPPING_NUM_INDEXED_COLS}, and the system will only collect stats
* for columns that exactly match those specified. If a nested column is specified, the system
* will collect stats for all leaf fields of that column. If a non-existent column is specified,
* it will be ignored. Updating this config does not trigger stats re-collection, but redefines
* the stats schema of the table, i.e., it will change the behavior of future stats collection
* (e.g., in append and OPTIMIZE) as well as data skipping (e.g., the column stats not mentioned
* by this config will be ignored even if they exist).
*
* <p>The value is a comma-separated list of case-insensitive column identifiers. Each column
* identifier can consist of letters, digits, and underscores. If a column identifier includes
* special characters, the column name should be enclosed in backticks (`) to escape the special
* characters.
*
* <p>A column identifier can refer to one of the following: the name of a non-struct column, the
* leaf field's name of a struct column, or the name of a struct column. When a struct column's
* name is specified, statistics for all its leaf fields will be collected.
*/
public static final TableConfig<Optional<String>> DATA_SKIPPING_STATS_COLUMNS =
new TableConfig<>(
"delta.dataSkippingStatsColumns",
null,
v -> Optional.ofNullable(v),
value -> true,
"needs to be a comma-separated list of column identifiers.",
true);

/**
* Table property that enables modifying the table in accordance with the Delta-Iceberg Writer
* Compatibility V1 ({@code icebergCompatWriterV1}) protocol.
Expand Down Expand Up @@ -398,6 +429,9 @@ public static class UniversalFormats {
addConfig(this, MATERIALIZED_ROW_ID_COLUMN_NAME);
addConfig(this, MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME);
addConfig(this, VARIANT_SHREDDING_ENABLED);

// The below configs do not yet have their behavior correctly implemented in Kernel.
addConfig(this, DATA_SKIPPING_STATS_COLUMNS);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,26 @@ public class CommitRangeBuilderImpl implements CommitRangeBuilder {

public static class Context {
public final String unresolvedPath;
public Optional<CommitBoundary> startBoundaryOpt = Optional.empty();
public final CommitBoundary startBoundary;
public Optional<CommitBoundary> endBoundaryOpt = Optional.empty();
public List<ParsedLogData> logDatas = Collections.emptyList();

public Context(String unresolvedPath) {
public Context(String unresolvedPath, CommitBoundary startBoundary) {
this.unresolvedPath = requireNonNull(unresolvedPath, "unresolvedPath is null");
this.startBoundary = requireNonNull(startBoundary, "startBoundary is null");
}
}

private final Context ctx;

public CommitRangeBuilderImpl(String unresolvedPath) {
ctx = new Context(unresolvedPath);
public CommitRangeBuilderImpl(String unresolvedPath, CommitBoundary startBoundary) {
ctx = new Context(unresolvedPath, startBoundary);
}

///////////////////////////////////////
// Public CommitRangeBuilder Methods //
///////////////////////////////////////

@Override
public CommitRangeBuilderImpl withStartBoundary(CommitBoundary startBoundary) {
ctx.startBoundaryOpt = Optional.of(requireNonNull(startBoundary, "startBoundary is null"));
return this;
}

@Override
public CommitRangeBuilderImpl withEndBoundary(CommitBoundary endBoundary) {
ctx.endBoundaryOpt = Optional.of(requireNonNull(endBoundary, "endBoundary is null"));
Expand All @@ -86,9 +81,10 @@ public CommitRange build(Engine engine) {
////////////////////////////

private void validateInputOnBuild() {
// Validate that start boundary is less than or equal to end boundary if both are provided
if (ctx.startBoundaryOpt.isPresent() && ctx.endBoundaryOpt.isPresent()) {
CommitBoundary startBoundary = ctx.startBoundaryOpt.get();
// Validate that start boundary is less than or equal to end boundary if end boundary is
// provided
if (ctx.endBoundaryOpt.isPresent()) {
CommitBoundary startBoundary = ctx.startBoundary;
CommitBoundary endBoundary = ctx.endBoundaryOpt.get();

// If both are version-based, compare versions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,22 @@ CommitRangeImpl create(Engine engine) {
logger.info("{}: Resolved end-boundary to the latest version {}", tablePath, endVersion);
}
return new CommitRangeImpl(
tablePath, ctx.startBoundaryOpt, ctx.endBoundaryOpt, startVersion, endVersion, deltas);
tablePath, ctx.startBoundary, ctx.endBoundaryOpt, startVersion, endVersion, deltas);
}

private long resolveStartVersion(Engine engine, List<ParsedCatalogCommitData> catalogCommits) {
if (!ctx.startBoundaryOpt.isPresent()) {
// Default to version 0 if no start boundary is provided
return 0L;
}
CommitRangeBuilder.CommitBoundary startBoundary = ctx.startBoundaryOpt.get();

if (startBoundary.isVersion()) {
return startBoundary.getVersion();
if (ctx.startBoundary.isVersion()) {
return ctx.startBoundary.getVersion();
} else {
logger.info(
"{}: Trying to resolve start-boundary timestamp {} to version",
tablePath,
startBoundary.getTimestamp());
ctx.startBoundary.getTimestamp());
return DeltaHistoryManager.getVersionAtOrAfterTimestamp(
engine,
logPath,
startBoundary.getTimestamp(),
(SnapshotImpl) startBoundary.getLatestSnapshot(),
ctx.startBoundary.getTimestamp(),
(SnapshotImpl) ctx.startBoundary.getLatestSnapshot(),
catalogCommits);
}
}
Expand Down Expand Up @@ -143,7 +137,7 @@ private void logResolvedVersions(long startVersion, Optional<Long> endVersionOpt
tablePath,
startVersion,
endVersionOpt,
ctx.startBoundaryOpt,
ctx.startBoundary,
ctx.endBoundaryOpt);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
public class CommitRangeImpl implements CommitRange {

private final Path dataPath;
private final Optional<CommitRangeBuilder.CommitBoundary> startBoundaryOpt;
private final CommitRangeBuilder.CommitBoundary startBoundary;
private final Optional<CommitRangeBuilder.CommitBoundary> endBoundaryOpt;

private final long startVersion;
Expand All @@ -50,7 +50,7 @@ public class CommitRangeImpl implements CommitRange {

public CommitRangeImpl(
Path dataPath,
Optional<CommitRangeBuilder.CommitBoundary> startBoundaryOpt,
CommitRangeBuilder.CommitBoundary startBoundary,
Optional<CommitRangeBuilder.CommitBoundary> endBoundaryOpt,
long startVersion,
long endVersion,
Expand All @@ -59,7 +59,7 @@ public CommitRangeImpl(
checkArgument(
deltas.size() == endVersion - startVersion + 1, "deltaFiles size must match size of range");
this.dataPath = requireNonNull(dataPath, "dataPath cannot be null");
this.startBoundaryOpt = requireNonNull(startBoundaryOpt, "startSpecOpt cannot be null");
this.startBoundary = requireNonNull(startBoundary, "startBoundary cannot be null");
this.endBoundaryOpt = requireNonNull(endBoundaryOpt, "endSpecOpt cannot be null");
this.startVersion = startVersion;
this.endVersion = endVersion;
Expand All @@ -81,8 +81,8 @@ public long getEndVersion() {
}

@Override
public Optional<CommitRangeBuilder.CommitBoundary> getQueryStartBoundary() {
return startBoundaryOpt;
public CommitRangeBuilder.CommitBoundary getQueryStartBoundary() {
return startBoundary;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ SnapshotImpl create(Engine engine) {
}

private SnapshotImpl createSnapshot(Engine engine, SnapshotQueryContext snapshotCtx) {
final Optional<Long> versionToLoad = getTargetVersionToLoad(engine, snapshotCtx);
final Lazy<LogSegment> lazyLogSegment = getLazyLogSegment(engine, snapshotCtx, versionToLoad);
final Optional<Long> timeTravelVersion = getTargetTimeTravelVersion(engine, snapshotCtx);
final Lazy<LogSegment> lazyLogSegment =
getLazyLogSegment(engine, snapshotCtx, timeTravelVersion);
final Lazy<Optional<CRCInfo>> lazyCrcInfo =
createLazyChecksumFileLoaderWithMetrics(
engine, lazyLogSegment, snapshotCtx.getSnapshotMetrics());
Expand Down Expand Up @@ -216,7 +217,7 @@ private SnapshotImpl createSnapshot(Engine engine, SnapshotQueryContext snapshot

return new SnapshotImpl(
tablePath,
versionToLoad.orElseGet(() -> lazyLogSegment.get().getVersion()),
timeTravelVersion.orElseGet(() -> lazyLogSegment.get().getVersion()),
lazyLogSegment,
logReplay,
protocol,
Expand All @@ -238,7 +239,7 @@ private SnapshotQueryContext getSnapshotQueryContext() {
}

private Lazy<LogSegment> getLazyLogSegment(
Engine engine, SnapshotQueryContext snapshotCtx, Optional<Long> versionToLoad) {
Engine engine, SnapshotQueryContext snapshotCtx, Optional<Long> timeTravelVersion) {
return new Lazy<>(
() -> {
final LogSegment logSegment =
Expand All @@ -249,7 +250,7 @@ private Lazy<LogSegment> getLazyLogSegment(
() ->
new SnapshotManager(tablePath)
.getLogSegmentForVersion(
engine, versionToLoad, ctx.logDatas, ctx.maxCatalogVersion));
engine, timeTravelVersion, ctx.logDatas, ctx.maxCatalogVersion));

snapshotCtx.setResolvedVersion(logSegment.getVersion());
snapshotCtx.setCheckpointVersion(logSegment.getCheckpointVersionOpt());
Expand All @@ -258,7 +259,8 @@ private Lazy<LogSegment> getLazyLogSegment(
});
}

private Optional<Long> getTargetVersionToLoad(Engine engine, SnapshotQueryContext snapshotCtx) {
private Optional<Long> getTargetTimeTravelVersion(
Engine engine, SnapshotQueryContext snapshotCtx) {
if (ctx.timestampQueryContextOpt.isPresent()) {
return Optional.of(
resolveTimestampToSnapshotVersion(
Expand All @@ -269,9 +271,6 @@ private Optional<Long> getTargetVersionToLoad(Engine engine, SnapshotQueryContex
ctx.logDatas));
} else if (ctx.versionOpt.isPresent()) {
return ctx.versionOpt;
} else if (ctx.maxCatalogVersion.isPresent()) {
// For latest queries for catalogManaged tables we want to load the maxCatalogVersion
return ctx.maxCatalogVersion;
}
return Optional.empty();
}
Expand Down
Loading
Loading