Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {
public class SparkMicroBatchStream
implements MicroBatchStream, SupportsAdmissionControl, SupportsTriggerAvailableNow {

private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class);

Expand All @@ -74,6 +75,17 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
*/
private boolean isFirstBatch = false;

/**
* When AvailableNow is used, this offset will be the upper bound where this run of the query will
* process up. We may run multiple micro batches, but the query will stop itself when it reaches
* this offset.
*/
private Optional<DeltaSourceOffset> lastOffsetForTriggerAvailableNow = Optional.empty();

private boolean isLastOffsetForTriggerAvailableNowInitialized = false;

private boolean isTriggerAvailableNow = false;

public SparkMicroBatchStream(
DeltaSnapshotManager snapshotManager,
Snapshot snapshotAtSourceInit,
Expand Down Expand Up @@ -104,6 +116,32 @@ public SparkMicroBatchStream(
(Boolean) spark.sessionState().conf().getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION());
}

@Override
public void prepareForTriggerAvailableNow() {
logger.info("The streaming query reports to use Trigger.AvailableNow.");
isTriggerAvailableNow = true;
}

/**
* initialize the internal states for AvailableNow if this method is called first time after
* prepareForTriggerAvailableNow.
*/
private void initForTriggerAvailableNowIfNeeded(DeltaSourceOffset startOffsetOpt) {
if (isTriggerAvailableNow && !isLastOffsetForTriggerAvailableNowInitialized) {
isLastOffsetForTriggerAvailableNowInitialized = true;
initLastOffsetForTriggerAvailableNow(startOffsetOpt);
}
}

private void initLastOffsetForTriggerAvailableNow(DeltaSourceOffset startOffsetOpt) {
lastOffsetForTriggerAvailableNow =
latestOffsetInternal(startOffsetOpt, ReadLimit.allAvailable());

lastOffsetForTriggerAvailableNow.ifPresent(
lastOffset ->
logger.info("lastOffset for Trigger.AvailableNow has set to " + lastOffset.json()));
}

////////////
// offset //
////////////
Expand Down Expand Up @@ -150,21 +188,26 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
Objects.requireNonNull(startOffset, "startOffset should not be null for MicroBatchStream");
Objects.requireNonNull(limit, "limit should not be null for MicroBatchStream");

// TODO(#5318): init trigger available now support

DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset);
initForTriggerAvailableNowIfNeeded(deltaStartOffset);
// Return null when no data is available for this batch.
DeltaSourceOffset endOffset = latestOffsetInternal(deltaStartOffset, limit).orElse(null);
isFirstBatch = false;
return endOffset;
}

private Optional<DeltaSourceOffset> latestOffsetInternal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add method docs explaining what each method does?

DeltaSourceOffset deltaStartOffset, ReadLimit limit) {
Optional<DeltaSource.AdmissionLimits> limits =
ScalaUtils.toJavaOptional(DeltaSource.AdmissionLimits$.MODULE$.apply(options, limit));
Optional<DeltaSourceOffset> endOffset =
getNextOffsetFromPreviousOffset(deltaStartOffset, limits, isFirstBatch);
isFirstBatch = false;

if (shouldValidateOffsets && endOffset.isPresent()) {
DeltaSourceOffset.validateOffsets(deltaStartOffset, endOffset.get());
}

// Return null when no data is available for this batch.
return endOffset.orElse(null);
return endOffset;
}

@Override
Expand Down Expand Up @@ -397,9 +440,16 @@ CloseableIterator<IndexedFile> getFileChanges(
file.getVersion() > fromVersion
|| (file.getVersion() == fromVersion && file.getIndex() > fromIndex));

// If endOffset is provided, we are getting a batch on a constructed range so we should use
// the endOffset as the limit.
// Otherwise, we are looking for a new offset, so we try to use the latestOffset we found for
// Trigger.availableNow() as limit. We know endOffset <= lastOffsetForTriggerAvailableNow.
Optional<DeltaSourceOffset> lastOffsetForThisScan =
endOffset.or(() -> lastOffsetForTriggerAvailableNow);

// Check end boundary (inclusive)
if (endOffset.isPresent()) {
DeltaSourceOffset bound = endOffset.get();
if (lastOffsetForThisScan.isPresent()) {
DeltaSourceOffset bound = lastOffsetForThisScan.get();
result =
result.takeWhile(
file ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,137 @@ private static Stream<Arguments> noNewDataAtLatestVersionParameters() {
Arguments.of(0L, 1L, BASE_INDEX, "Latest version index=0, no new data"));
}

// ================================================================================================
// Tests for availableNow parity between DSv1 and DSv2
// ================================================================================================

@ParameterizedTest
@MethodSource("availableNowParameters")
public void testAvailableNow_SequentialBatchAdvancement(
Long startVersion,
Long startIndex,
ReadLimitConfig limitConfig,
int numIterations,
String testDescription,
@TempDir File tempDir) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its really hard to understand what is going on in this test. please add more inline comments explaining what you are doing.

String testTablePath = tempDir.getAbsolutePath();
String testTableName =
"test_availableNow_sequential"
+ Math.abs(testDescription.hashCode())
+ "_"
+ System.nanoTime();
createEmptyTestTable(testTablePath, testTableName);
insertVersions(
testTableName,
/* numVersions= */ 5,
/* rowsPerVersion= */ 10,
/* includeEmptyVersion= */ true);

DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
String tableId = deltaLog.tableId();

DeltaSourceOffset startOffset =
new DeltaSourceOffset(tableId, startVersion, startIndex, /* isInitialSnapshot= */ false);
ReadLimit readLimit = limitConfig.toReadLimit();

// dsv1
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);
deltaSource.prepareForTriggerAvailableNow();
List<Offset> dsv1Offsets =
advanceOffsetSequenceDsv1(deltaSource, startOffset, numIterations, readLimit);

// dsv2
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
stream.prepareForTriggerAvailableNow();
List<Offset> dsv2Offsets =
advanceOffsetSequenceDsv2(stream, startOffset, numIterations, readLimit);

compareOffsetSequence(dsv1Offsets, dsv2Offsets, testDescription);
}

private static Stream<Arguments> availableNowParameters() {
long BASE_INDEX = DeltaSourceOffset.BASE_INDEX();
long END_INDEX = DeltaSourceOffset.END_INDEX();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my knowledge.. what does these 2 mean?


return Stream.of(
// No limits
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.noLimit(),
/* numIterations= */ 3,
"NoLimits1"),
Arguments.of(
/* startVersion= */ 1L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.noLimit(),
/* numIterations= */ 3,
"NoLimits2"),
Arguments.of(
/* startVersion= */ 4L,
/* startIndex= */ END_INDEX,
ReadLimitConfig.noLimit(),
/* numIterations= */ 3,
"NoLimits3"),

// Max files
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(1),
/* numIterations= */ 10,
"MaxFiles1"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(1000),
/* numIterations= */ 3,
"MaxFiles2"),
Arguments.of(
/* startVersion= */ 1L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(2),
/* numIterations= */ 10,
"MaxFiles3"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(0),
/* numIterations= */ 3,
"MaxFiles4"),

// Max bytes
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(1),
/* numIterations= */ 100,
"MaxBytes1"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(1000000), // ensure larger than total file size
/* numIterations= */ 3,
"MaxBytes2"),
Arguments.of(
/* startVersion= */ 1L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(1000),
/* numIterations= */ 100,
"MaxBytes3"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(0),
/* numIterations= */ 3,
"MaxBytes4"));
}

// ================================================================================================
// Helper methods
// ================================================================================================
Expand Down