Skip to content

Commit c7cc62e

Browse files
committed
Update SparkMicroBatchStream with lazy evaluation
1 parent c4ffca5 commit c7cc62e

File tree

1 file changed

+52
-51
lines changed

1 file changed

+52
-51
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -496,10 +496,8 @@ CloseableIterator<IndexedFile> getFileChanges(
496496
return result;
497497
}
498498

499-
// TODO(#5318): implement lazy loading (one batch at a time).
500499
private CloseableIterator<IndexedFile> filterDeltaLogs(
501500
long startVersion, Optional<DeltaSourceOffset> endOffset) {
502-
List<IndexedFile> allIndexedFiles = new ArrayList<>();
503501
Optional<Long> endVersionOpt =
504502
endOffset.isPresent() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty();
505503

@@ -523,7 +521,7 @@ private CloseableIterator<IndexedFile> filterDeltaLogs(
523521
} catch (io.delta.kernel.exceptions.CommitRangeNotFoundException e) {
524522
// If the requested version range doesn't exist (e.g., we're asking for version 6 when
525523
// the table only has versions 0-5).
526-
return Utils.toCloseableIterator(allIndexedFiles.iterator());
524+
return Utils.toCloseableIterator(Collections.emptyIterator());
527525
}
528526

529527
// Use getCommitActionsFromRangeUnsafe instead of CommitRange.getCommitActions() because:
@@ -532,35 +530,62 @@ private CloseableIterator<IndexedFile> filterDeltaLogs(
532530
// (e.g., if log files have been cleaned up after checkpointing).
533531
// 2. This matches DSv1 behavior which uses snapshotAtSourceInit's P&M to interpret all
534532
// AddFile actions and performs per-commit protocol validation.
535-
try (CloseableIterator<CommitActions> commitsIter =
533+
CloseableIterator<CommitActions> commitsIter =
536534
StreamingHelper.getCommitActionsFromRangeUnsafe(
537535
engine,
538536
(io.delta.kernel.internal.commitrange.CommitRangeImpl) commitRange,
539537
snapshotAtSourceInit.getPath(),
540-
ACTION_SET)) {
541-
542-
while (commitsIter.hasNext()) {
543-
try (CommitActions commit = commitsIter.next()) {
544-
long version = commit.getVersion();
545-
546-
// First pass: Validate the commit.
547-
// Must process the entire commit first for all-or-nothing semantics.
548-
validateCommit(commit, version, snapshotAtSourceInit.getPath(), endOffset);
538+
ACTION_SET);
539+
540+
return commitsIter.flatMap(
541+
commit -> {
542+
try {
543+
long version = commit.getVersion();
544+
545+
// First pass: Validate the commit.
546+
// Eager execution: must process the entire commit first for all-or-nothing semantics.
547+
validateCommit(commit, version, snapshotAtSourceInit.getPath(), endOffset);
548+
549+
// TODO(#5318): consider caching the commit actions to avoid reading the same commit
550+
// twice.
551+
// Second pass: Extract AddFile actions with dataChange=true.
552+
// Build lazy iterator with sentinel IndexedFiles:
553+
// BEGIN (BASE_INDEX) + actual file actions + END (END_INDEX)
554+
// These sentinel IndexedFiles have null file actions and are used for proper offset
555+
// tracking. The BASE_INDEX sentinel marks the start of a version, allowing the offset
556+
// to reference "before any files in this version". The END_INDEX sentinel marks the
557+
// end of a version, which triggers version advancement in buildOffsetFromIndexedFile
558+
// to skip re-reading the completed version.
559+
// See DeltaSource.addBeginAndEndIndexOffsetsForVersion for the Scala equivalent.
560+
return Utils.singletonCloseableIterator(
561+
new IndexedFile(version, DeltaSourceOffset.BASE_INDEX(), /* addFile= */ null))
562+
.combine(getFilesFromCommit(commit, version))
563+
.combine(
564+
Utils.singletonCloseableIterator(
565+
new IndexedFile(
566+
version, DeltaSourceOffset.END_INDEX(), /* addFile= */ null)));
567+
} catch (Exception e) {
568+
// commit is not a CloseableIterator, we need to close it manually.
569+
Utils.closeCloseables(commit);
570+
throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);
571+
}
572+
});
573+
}
549574

550-
// TODO(#5318): consider caching the commit actions to avoid reading the same commit
551-
// twice.
552-
// Second pass: Extract AddFile actions with dataChange=true.
553-
extractVersionFiles(commit, version, allIndexedFiles);
554-
}
555-
}
556-
} catch (RuntimeException e) {
557-
throw e; // Rethrow runtime exceptions directly
558-
} catch (Exception e) {
559-
// CommitActions.close() throws Exception
560-
throw new RuntimeException("Failed to process commits", e);
561-
}
562-
// TODO(#5318): implement lazy loading (only load a batch into memory if needed).
563-
return Utils.toCloseableIterator(allIndexedFiles.iterator());
575+
private CloseableIterator<IndexedFile> getFilesFromCommit(CommitActions commit, long version) {
576+
// Assign each IndexedFile a unique index within the commit. We use a mutable array
577+
// because variables captured by a lambda must be effectively final (never reassigned).
578+
long[] fileIndex = {0};
579+
580+
return commit
581+
.getActions()
582+
.flatMap(
583+
batch -> {
584+
// Processing each batch eagerly because they are already loaded into memory.
585+
List<IndexedFile> files = new ArrayList<>();
586+
fileIndex[0] = extractIndexedFilesFromBatch(batch, version, fileIndex[0], files);
587+
return Utils.toCloseableIterator(files.iterator());
588+
});
564589
}
565590

566591
/**
@@ -608,30 +633,6 @@ private void validateCommit(
608633
}
609634
}
610635

611-
/**
612-
* Extracts all IndexedFiles from a commit version, wrapped with BEGIN/END sentinels.
613-
*
614-
* @param commit the CommitActions representing a single commit
615-
* @param version the commit version
616-
* @param output the list to append IndexedFiles to
617-
*/
618-
private void extractVersionFiles(CommitActions commit, long version, List<IndexedFile> output) {
619-
// Add BEGIN sentinel before data files.
620-
output.add(new IndexedFile(version, DeltaSourceOffset.BASE_INDEX(), /* addFile= */ null));
621-
// TODO(#5319): implement getMetadataOrProtocolChangeIndexedFileIterator.
622-
long index = 0;
623-
try (CloseableIterator<ColumnarBatch> actionsIter = commit.getActions()) {
624-
while (actionsIter.hasNext()) {
625-
ColumnarBatch batch = actionsIter.next();
626-
index = extractIndexedFilesFromBatch(batch, version, index, output);
627-
}
628-
} catch (IOException e) {
629-
throw new RuntimeException("Failed to extract files from commit at version " + version, e);
630-
}
631-
// Add END sentinel after data files.
632-
output.add(new IndexedFile(version, DeltaSourceOffset.END_INDEX(), /* addFile= */ null));
633-
}
634-
635636
/**
636637
* Extracts IndexedFiles from a batch of actions for a given version and adds them to the output
637638
* list. Assigns an index to each IndexedFile.

0 commit comments

Comments
 (0)