Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.delta.kernel.spark.read;

import io.delta.kernel.CommitActions;
import io.delta.kernel.CommitRange;
import io.delta.kernel.Snapshot;
import io.delta.kernel.data.ColumnarBatch;
Expand Down Expand Up @@ -59,6 +60,7 @@
import scala.collection.JavaConverters;
import scala.collection.Seq;

// TODO(#5318): Use DeltaErrors error framework for consistent error handling.
public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {

private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class);
Expand Down Expand Up @@ -93,6 +95,10 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
*/
private boolean isFirstBatch = false;

// Cached starting version to ensure idempotent behavior for "latest" starting version.
// getStartingVersion() must return the same value across multiple calls.
private volatile Optional<Long> cachedStartingVersion = null;

public SparkMicroBatchStream(
DeltaSnapshotManager snapshotManager,
Snapshot snapshotAtSourceInit,
Expand Down Expand Up @@ -334,15 +340,20 @@ public void stop() {
*
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion.
*/
Optional<Long> getStartingVersion() {
synchronized Optional<Long> getStartingVersion() {
if (cachedStartingVersion != null) {
return cachedStartingVersion;
}

// TODO(#5319): DeltaSource.scala uses `allowOutOfRange` parameter from
// DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP.
if (options.startingVersion().isDefined()) {
DeltaStartingVersion startingVersion = options.startingVersion().get();
if (startingVersion instanceof StartingVersionLatest$) {
Snapshot latestSnapshot = snapshotManager.loadLatestSnapshot();
// "latest": start reading from the next commit
return Optional.of(latestSnapshot.getVersion() + 1);
cachedStartingVersion = Optional.of(latestSnapshot.getVersion() + 1);
return cachedStartingVersion;
} else if (startingVersion instanceof StartingVersion) {
long version = ((StartingVersion) startingVersion).version();
if (!validateProtocolAt(spark, snapshotManager, engine, version)) {
Expand All @@ -354,11 +365,13 @@ Optional<Long> getStartingVersion() {
snapshotManager.checkVersionExists(
version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false);
}
return Optional.of(version);
cachedStartingVersion = Optional.of(version);
return cachedStartingVersion;
}
}
// TODO(#5319): Implement startingTimestamp support
return Optional.empty();
cachedStartingVersion = Optional.empty();
return cachedStartingVersion;
}

/**
Expand Down Expand Up @@ -490,6 +503,20 @@ private CloseableIterator<IndexedFile> filterDeltaLogs(
Optional<Long> endVersionOpt =
endOffset.isPresent() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty();

// Cap endVersion to the latest available version. The Kernel's getTableChanges requires
// endVersion to be an actual existing version or empty.
if (endVersionOpt.isPresent()) {
long latestVersion = snapshotAtSourceInit.getVersion();
if (endVersionOpt.get() > latestVersion) {
// This could happen because:
// 1. data could be added after snapshotAtSourceInit was captured.
// 2. buildOffsetFromIndexedFile bumps the version up by one when we hit the END_INDEX.
// TODO(#5318): consider caching the latest version to avoid loading a new snapshot.
// TODO(#5318): kernel should ideally relax this constraint.
endVersionOpt = Optional.of(snapshotManager.loadLatestSnapshot().getVersion());
}
}

CommitRange commitRange;
try {
commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
Expand All @@ -499,94 +526,55 @@ private CloseableIterator<IndexedFile> filterDeltaLogs(
return Utils.toCloseableIterator(allIndexedFiles.iterator());
}

// Use getActionsFromRangeUnsafe instead of CommitRange.getActions() because:
// 1. CommitRange.getActions() requires a snapshot at exactly the startVersion, but when
// Use getCommitActionsFromRangeUnsafe instead of CommitRange.getCommitActions() because:
// 1. CommitRange.getCommitActions() requires a snapshot at exactly the startVersion, but when
// startingVersion option is used, we may not be able to recreate that exact snapshot
// (e.g., if log files have been cleaned up after checkpointing).
// 2. This matches DSv1 behavior which uses snapshotAtSourceInit's P&M to interpret all
// AddFile actions and performs per-commit protocol validation.
try (CloseableIterator<ColumnarBatch> actionsIter =
StreamingHelper.getActionsFromRangeUnsafe(
try (CloseableIterator<CommitActions> commitsIter =
StreamingHelper.getCommitActionsFromRangeUnsafe(
engine,
(io.delta.kernel.internal.commitrange.CommitRangeImpl) commitRange,
snapshotAtSourceInit.getPath(),
ACTION_SET)) {
// Each ColumnarBatch belongs to a single commit version,
// but a single version may span multiple ColumnarBatches.
long currentVersion = -1;
long currentIndex = 0;
List<IndexedFile> currentVersionFiles = new ArrayList<>();

while (actionsIter.hasNext()) {
ColumnarBatch batch = actionsIter.next();
if (batch.getSize() == 0) {
// TODO(#5318): this shouldn't happen, empty commits will still have a non-empty row
// with the version set. Make sure the kernel API is explicit about this.
continue;
}
long version = StreamingHelper.getVersion(batch);
// When version changes, flush the completed version
if (currentVersion != -1 && version != currentVersion) {
flushVersion(currentVersion, currentVersionFiles, allIndexedFiles);
currentVersionFiles.clear();
currentIndex = 0;
}

// Validate the commit before processing files from this batch
// TODO(#5318): migrate to kernel's commit-level iterator (WIP).
// The current one-pass algorithm assumes REMOVE actions proceed ADD actions
// in a commit; we should implement a proper two-pass approach once kernel API is ready.
validateCommit(batch, version, snapshotAtSourceInit.getPath(), endOffset);
while (commitsIter.hasNext()) {
try (CommitActions commit = commitsIter.next()) {
long version = commit.getVersion();

currentVersion = version;
currentIndex =
extractIndexedFilesFromBatch(batch, version, currentIndex, currentVersionFiles);
}
// First pass: Validate the commit.
// Must process the entire commit first for all-or-nothing semantics.
validateCommit(commit, version, snapshotAtSourceInit.getPath(), endOffset);

// Flush the last version
if (currentVersion != -1) {
flushVersion(currentVersion, currentVersionFiles, allIndexedFiles);
// TODO(#5318): consider caching the commit actions to avoid reading the same commit
// twice.
// Second pass: Extract AddFile actions with dataChange=true.
extractVersionFiles(commit, version, allIndexedFiles);
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to read commit range", e);
} catch (RuntimeException e) {
throw e; // Rethrow runtime exceptions directly
} catch (Exception e) {
// CommitActions.close() throws Exception
throw new RuntimeException("Failed to process commits", e);
}
// TODO(#5318): implement lazy loading (only load a batch into memory if needed).
return Utils.toCloseableIterator(allIndexedFiles.iterator());
}

/**
* Flushes a completed version by adding BEGIN/END sentinels around data files.
*
* <p>Sentinels are IndexedFiles with null addFile that mark version boundaries. They serve
* several purposes:
*
* <ul>
* <li>Enable offset tracking at version boundaries (before any files or after all files)
* <li>Allow streaming to resume at the start or end of a version
* <li>Handle versions with only metadata/protocol changes (no data files)
* </ul>
*
* <p>This mimics DeltaSource.addBeginAndEndIndexOffsetsForVersion
*/
private void flushVersion(
long version, List<IndexedFile> versionFiles, List<IndexedFile> output) {
// Add BEGIN sentinel
output.add(new IndexedFile(version, DeltaSourceOffset.BASE_INDEX(), /* addFile= */ null));
// TODO(#5319): implement getMetadataOrProtocolChangeIndexedFileIterator.
// Add all data files
output.addAll(versionFiles);
// Add END sentinel
output.add(new IndexedFile(version, DeltaSourceOffset.END_INDEX(), /* addFile= */ null));
}

/**
* Validates a commit and fail the stream if it's invalid. Mimics
* DeltaSource.validateCommitAndDecideSkipping in Scala.
*
* @param commit the CommitActions representing a single commit
* @param version the commit version
* @param tablePath the path to the Delta table
* @param endOffsetOpt optional end offset for boundary checking
* @throws RuntimeException if the commit is invalid.
*/
private void validateCommit(
ColumnarBatch batch,
CommitActions commit,
long version,
String tablePath,
Optional<DeltaSourceOffset> endOffsetOpt) {
Expand All @@ -598,25 +586,52 @@ private void validateCommit(
return;
}
}
int numRows = batch.getSize();
// TODO(#5319): Implement ignoreChanges & skipChangeCommits & ignoreDeletes (legacy)
// TODO(#5318): validate METADATA actions
for (int rowId = 0; rowId < numRows; rowId++) {
// RULE 1: If commit has RemoveFile(dataChange=true), fail this stream.
Optional<RemoveFile> removeOpt = StreamingHelper.getDataChangeRemove(batch, rowId);
if (removeOpt.isPresent()) {
RemoveFile removeFile = removeOpt.get();
Throwable error =
DeltaErrors.deltaSourceIgnoreDeleteError(version, removeFile.getPath(), tablePath);
if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else {
throw new RuntimeException(error);

try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) {
while (actionsIter.hasNext()) {
ColumnarBatch batch = actionsIter.next();
int numRows = batch.getSize();
for (int rowId = 0; rowId < numRows; rowId++) {
// RULE 1: If commit has RemoveFile(dataChange=true), fail this stream.
Optional<RemoveFile> removeOpt = StreamingHelper.getDataChangeRemove(batch, rowId);
if (removeOpt.isPresent()) {
RemoveFile removeFile = removeOpt.get();
throw (RuntimeException)
DeltaErrors.deltaSourceIgnoreDeleteError(version, removeFile.getPath(), tablePath);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test case for this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's follow the error message framework and add error class/state in https://github.com/delta-io/delta/blob/master/spark/src/main/resources/error/delta-error-classes.json

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see testGetFileChanges_OnRemoveFile_throwError

public void testGetFileChanges_onRemoveFile_throwError(

Re: error message framework, we are already using it for this error:

"DELTA_SOURCE_IGNORE_DELETE" : {

Going through this file, I did see cases where we are not using the error framework. I left a TODO and will fix that soon.

}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to process commit at version " + version, e);
}
}

/**
* Extracts all IndexedFiles from a commit version, wrapped with BEGIN/END sentinels.
*
* @param commit the CommitActions representing a single commit
* @param version the commit version
* @param output the list to append IndexedFiles to
*/
private void extractVersionFiles(CommitActions commit, long version, List<IndexedFile> output) {
// Add BEGIN sentinel before data files.
output.add(new IndexedFile(version, DeltaSourceOffset.BASE_INDEX(), /* addFile= */ null));
// TODO(#5319): implement getMetadataOrProtocolChangeIndexedFileIterator.
long index = 0;
try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using try-with-resource which automatically closes resources: https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html

while (actionsIter.hasNext()) {
ColumnarBatch batch = actionsIter.next();
index = extractIndexedFilesFromBatch(batch, version, index, output);
}
} catch (IOException e) {
throw new RuntimeException("Failed to extract files from commit at version " + version, e);
}
// Add END sentinel after data files.
output.add(new IndexedFile(version, DeltaSourceOffset.END_INDEX(), /* addFile= */ null));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could think about how to avoid doing iterator -> list -> iterator but just assemble the iterator in the future(in a separate pr)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next PR implements proper lazy iterating which gets rid of this arraylist.

}

/**
* Extracts IndexedFiles from a batch of actions for a given version and adds them to the output
* list. Assigns an index to each IndexedFile.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.Preconditions.checkState;

import io.delta.kernel.CommitActions;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaLogActionUtils;
import io.delta.kernel.internal.TableChangesUtils;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.commitrange.CommitRangeImpl;
Expand Down Expand Up @@ -98,29 +98,29 @@ public static Optional<RemoveFile> getDataChangeRemove(ColumnarBatch batch, int
}

/**
* Gets actions from a commit range without requiring a snapshot at the exact start version.
* Gets commit-level actions from a commit range without requiring a snapshot at the exact start
* version.
*
* <p>This method is "unsafe" because it bypasses the standard {@code CommitRange.getActions()}
* API which requires a snapshot at the exact start version for protocol validation.
* <p>Returns an iterator over {@link CommitActions}, where each CommitActions represents a single
* commit.
*
* <p>This is necessary for streaming scenarios where the start version might not have a
* recreatable snapshot (e.g., after log cleanup) or where {@code startingVersion} is used.
* <p>This method is "unsafe" because it bypasses the standard {@code
* CommitRange.getCommitActions()} API which requires a snapshot at the exact start version for
* protocol validation.
*
* @param engine the Delta engine
* @param commitRange the commit range to read actions from
* @param tablePath the path to the Delta table
* @param actionSet the set of actions to read (e.g., ADD, REMOVE)
* @return an iterator over columnar batches containing the requested actions
* @return an iterator over {@link CommitActions}, one per commit version
*/
public static CloseableIterator<ColumnarBatch> getActionsFromRangeUnsafe(
public static CloseableIterator<CommitActions> getCommitActionsFromRangeUnsafe(
Engine engine,
CommitRangeImpl commitRange,
String tablePath,
Set<DeltaLogActionUtils.DeltaAction> actionSet) {
return TableChangesUtils.flattenCommitsAndAddMetadata(
engine,
DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation(
engine, tablePath, commitRange.getDeltaFiles(), actionSet));
return DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation(
engine, tablePath, commitRange.getDeltaFiles(), actionSet);
}

/** Private constructor to prevent instantiation of this utility class. */
Expand Down
Loading
Loading