diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index cb77a90487a..0ea49a716cb 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -48,7 +48,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl { +public class SparkMicroBatchStream + implements MicroBatchStream, SupportsAdmissionControl, SupportsTriggerAvailableNow { private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class); @@ -74,6 +75,17 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio */ private boolean isFirstBatch = false; + /** + * When AvailableNow is used, this offset will be the upper bound where this run of the query will + * process up. We may run multiple micro batches, but the query will stop itself when it reaches + * this offset. + */ + private Optional lastOffsetForTriggerAvailableNow = Optional.empty(); + + private boolean isLastOffsetForTriggerAvailableNowInitialized = false; + + private boolean isTriggerAvailableNow = false; + public SparkMicroBatchStream( DeltaSnapshotManager snapshotManager, Snapshot snapshotAtSourceInit, @@ -104,6 +116,32 @@ public SparkMicroBatchStream( (Boolean) spark.sessionState().conf().getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION()); } + @Override + public void prepareForTriggerAvailableNow() { + logger.info("The streaming query reports to use Trigger.AvailableNow."); + isTriggerAvailableNow = true; + } + + /** + * initialize the internal states for AvailableNow if this method is called first time after + * prepareForTriggerAvailableNow. + */ + private void initForTriggerAvailableNowIfNeeded(DeltaSourceOffset startOffsetOpt) { + if (isTriggerAvailableNow && !isLastOffsetForTriggerAvailableNowInitialized) { + isLastOffsetForTriggerAvailableNowInitialized = true; + initLastOffsetForTriggerAvailableNow(startOffsetOpt); + } + } + + private void initLastOffsetForTriggerAvailableNow(DeltaSourceOffset startOffsetOpt) { + lastOffsetForTriggerAvailableNow = + latestOffsetInternal(startOffsetOpt, ReadLimit.allAvailable()); + + lastOffsetForTriggerAvailableNow.ifPresent( + lastOffset -> + logger.info("lastOffset for Trigger.AvailableNow has set to " + lastOffset.json())); + } + //////////// // offset // //////////// @@ -150,21 +188,26 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { Objects.requireNonNull(startOffset, "startOffset should not be null for MicroBatchStream"); Objects.requireNonNull(limit, "limit should not be null for MicroBatchStream"); - // TODO(#5318): init trigger available now support - DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset); + initForTriggerAvailableNowIfNeeded(deltaStartOffset); + // Return null when no data is available for this batch. + DeltaSourceOffset endOffset = latestOffsetInternal(deltaStartOffset, limit).orElse(null); + isFirstBatch = false; + return endOffset; + } + + private Optional latestOffsetInternal( + DeltaSourceOffset deltaStartOffset, ReadLimit limit) { Optional limits = ScalaUtils.toJavaOptional(DeltaSource.AdmissionLimits$.MODULE$.apply(options, limit)); Optional endOffset = getNextOffsetFromPreviousOffset(deltaStartOffset, limits, isFirstBatch); - isFirstBatch = false; if (shouldValidateOffsets && endOffset.isPresent()) { DeltaSourceOffset.validateOffsets(deltaStartOffset, endOffset.get()); } - // Return null when no data is available for this batch. - return endOffset.orElse(null); + return endOffset; } @Override @@ -397,9 +440,16 @@ CloseableIterator getFileChanges( file.getVersion() > fromVersion || (file.getVersion() == fromVersion && file.getIndex() > fromIndex)); + // If endOffset is provided, we are getting a batch on a constructed range so we should use + // the endOffset as the limit. + // Otherwise, we are looking for a new offset, so we try to use the latestOffset we found for + // Trigger.availableNow() as limit. We know endOffset <= lastOffsetForTriggerAvailableNow. + Optional lastOffsetForThisScan = + endOffset.or(() -> lastOffsetForTriggerAvailableNow); + // Check end boundary (inclusive) - if (endOffset.isPresent()) { - DeltaSourceOffset bound = endOffset.get(); + if (lastOffsetForThisScan.isPresent()) { + DeltaSourceOffset bound = lastOffsetForThisScan.get(); result = result.takeWhile( file -> diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index db1e782d0f7..e8b08b18716 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -1102,6 +1102,137 @@ private static Stream noNewDataAtLatestVersionParameters() { Arguments.of(0L, 1L, BASE_INDEX, "Latest version index=0, no new data")); } + // ================================================================================================ + // Tests for availableNow parity between DSv1 and DSv2 + // ================================================================================================ + + @ParameterizedTest + @MethodSource("availableNowParameters") + public void testAvailableNow_SequentialBatchAdvancement( + Long startVersion, + Long startIndex, + ReadLimitConfig limitConfig, + int numIterations, + String testDescription, + @TempDir File tempDir) { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = + "test_availableNow_sequential" + + Math.abs(testDescription.hashCode()) + + "_" + + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + insertVersions( + testTableName, + /* numVersions= */ 5, + /* rowsPerVersion= */ 10, + /* includeEmptyVersion= */ true); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + String tableId = deltaLog.tableId(); + + DeltaSourceOffset startOffset = + new DeltaSourceOffset(tableId, startVersion, startIndex, /* isInitialSnapshot= */ false); + ReadLimit readLimit = limitConfig.toReadLimit(); + + // dsv1 + DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath); + deltaSource.prepareForTriggerAvailableNow(); + List dsv1Offsets = + advanceOffsetSequenceDsv1(deltaSource, startOffset, numIterations, readLimit); + + // dsv2 + Configuration hadoopConf = new Configuration(); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testTablePath, hadoopConf); + SparkMicroBatchStream stream = + new SparkMicroBatchStream( + snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf); + stream.prepareForTriggerAvailableNow(); + List dsv2Offsets = + advanceOffsetSequenceDsv2(stream, startOffset, numIterations, readLimit); + + compareOffsetSequence(dsv1Offsets, dsv2Offsets, testDescription); + } + + private static Stream availableNowParameters() { + long BASE_INDEX = DeltaSourceOffset.BASE_INDEX(); + long END_INDEX = DeltaSourceOffset.END_INDEX(); + + return Stream.of( + // No limits + Arguments.of( + /* startVersion= */ 0L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.noLimit(), + /* numIterations= */ 3, + "NoLimits1"), + Arguments.of( + /* startVersion= */ 1L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.noLimit(), + /* numIterations= */ 3, + "NoLimits2"), + Arguments.of( + /* startVersion= */ 4L, + /* startIndex= */ END_INDEX, + ReadLimitConfig.noLimit(), + /* numIterations= */ 3, + "NoLimits3"), + + // Max files + Arguments.of( + /* startVersion= */ 0L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxFiles(1), + /* numIterations= */ 10, + "MaxFiles1"), + Arguments.of( + /* startVersion= */ 0L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxFiles(1000), + /* numIterations= */ 3, + "MaxFiles2"), + Arguments.of( + /* startVersion= */ 1L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxFiles(2), + /* numIterations= */ 10, + "MaxFiles3"), + Arguments.of( + /* startVersion= */ 0L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxFiles(0), + /* numIterations= */ 3, + "MaxFiles4"), + + // Max bytes + Arguments.of( + /* startVersion= */ 0L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxBytes(1), + /* numIterations= */ 100, + "MaxBytes1"), + Arguments.of( + /* startVersion= */ 0L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxBytes(1000000), // ensure larger than total file size + /* numIterations= */ 3, + "MaxBytes2"), + Arguments.of( + /* startVersion= */ 1L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxBytes(1000), + /* numIterations= */ 100, + "MaxBytes3"), + Arguments.of( + /* startVersion= */ 0L, + /* startIndex= */ BASE_INDEX, + ReadLimitConfig.maxBytes(0), + /* numIterations= */ 3, + "MaxBytes4")); + } + // ================================================================================================ // Helper methods // ================================================================================================