Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {
public class SparkMicroBatchStream
implements MicroBatchStream, SupportsAdmissionControl, SupportsTriggerAvailableNow {

private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class);

Expand Down Expand Up @@ -104,6 +105,43 @@ public SparkMicroBatchStream(
(Boolean) spark.sessionState().conf().getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION());
}

/**
* When AvailableNow is used, this offset will be the upper bound where this run of the query will
* process up. We may run multiple micro batches, but the query will stop itself when it reaches
* this offset.
*/
protected Optional<DeltaSourceOffset> lastOffsetForTriggerAvailableNow = Optional.empty();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move those variable definition after L76


private boolean isLastOffsetForTriggerAvailableNowInitialized = false;

private boolean isTriggerAvailableNow = false;

@Override
public void prepareForTriggerAvailableNow() {
logger.info("The streaming query reports to use Trigger.AvailableNow.");
isTriggerAvailableNow = true;
}

/**
* initialize the internal states for AvailableNow if this method is called first time after
* prepareForTriggerAvailableNow.
*/
protected void initForTriggerAvailableNowIfNeeded(DeltaSourceOffset startOffsetOpt) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can those methods and lastOffsetForTriggerAvailableNow be private?

if (isTriggerAvailableNow && !isLastOffsetForTriggerAvailableNowInitialized) {
isLastOffsetForTriggerAvailableNowInitialized = true;
initLastOffsetForTriggerAvailableNow(startOffsetOpt);
}
}

protected void initLastOffsetForTriggerAvailableNow(DeltaSourceOffset startOffsetOpt) {
lastOffsetForTriggerAvailableNow =
latestOffsetInternal(startOffsetOpt, ReadLimit.allAvailable());

lastOffsetForTriggerAvailableNow.ifPresent(
lastOffset ->
logger.info("lastOffset for Trigger.AvailableNow has set to " + lastOffset.json()));
}

////////////
// offset //
////////////
Expand Down Expand Up @@ -150,21 +188,26 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
Objects.requireNonNull(startOffset, "startOffset should not be null for MicroBatchStream");
Objects.requireNonNull(limit, "limit should not be null for MicroBatchStream");

// TODO(#5318): init trigger available now support

DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset);
initForTriggerAvailableNowIfNeeded(deltaStartOffset);
// endOffset is null: no data is available to read for this batch.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This has been changed to // Return null when no data is available for this batch.

DeltaSourceOffset endOffset = latestOffsetInternal(deltaStartOffset, limit).orElse(null);
isFirstBatch = false;
return endOffset;
}

protected Optional<DeltaSourceOffset> latestOffsetInternal(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a new method here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new method would be better because our logic operates on DeltaSourceOffset. We don't want to retrieve Offset each time and then convert it to DeltaSourceOffset

DeltaSourceOffset deltaStartOffset, ReadLimit limit) {
Optional<DeltaSource.AdmissionLimits> limits =
ScalaUtils.toJavaOptional(DeltaSource.AdmissionLimits$.MODULE$.apply(options, limit));
Optional<DeltaSourceOffset> endOffset =
getNextOffsetFromPreviousOffset(deltaStartOffset, limits, isFirstBatch);
isFirstBatch = false;

if (shouldValidateOffsets && endOffset.isPresent()) {
DeltaSourceOffset.validateOffsets(deltaStartOffset, endOffset.get());
}

// Return null when no data is available for this batch.
return endOffset.orElse(null);
return endOffset;
}

@Override
Expand Down Expand Up @@ -397,9 +440,16 @@ CloseableIterator<IndexedFile> getFileChanges(
file.getVersion() > fromVersion
|| (file.getVersion() == fromVersion && file.getIndex() > fromIndex));

// If endOffset is provided, we are getting a batch on a constructed range so we should use
// the endOffset as the limit.
// Otherwise, we are looking for a new offset, so we try to use the latestOffset we found for
// Trigger.availableNow() as limit. We know endOffset <= lastOffsetForTriggerAvailableNow.
Optional<DeltaSourceOffset> lastOffsetForThisScan =
endOffset.or(() -> lastOffsetForTriggerAvailableNow);

// Check end boundary (inclusive)
if (endOffset.isPresent()) {
DeltaSourceOffset bound = endOffset.get();
if (lastOffsetForThisScan.isPresent()) {
DeltaSourceOffset bound = lastOffsetForThisScan.get();
result =
result.takeWhile(
file ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,137 @@ private static Stream<Arguments> noNewDataAtLatestVersionParameters() {
Arguments.of(0L, 1L, BASE_INDEX, "Latest version index=0, no new data"));
}

// ================================================================================================
// Tests for availableNow parity between DSv1 and DSv2
// ================================================================================================

@ParameterizedTest
@MethodSource("availableNowParameters")
public void testAvailableNow_SequentialBatchAdvancement(
Long startVersion,
Long startIndex,
ReadLimitConfig limitConfig,
int numIterations,
String testDescription,
@TempDir File tempDir) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its really hard to understand what is going on in this test. please add more inline comments explaining what you are doing.

String testTablePath = tempDir.getAbsolutePath();
String testTableName =
"test_availableNow_sequential"
+ Math.abs(testDescription.hashCode())
+ "_"
+ System.nanoTime();
createEmptyTestTable(testTablePath, testTableName);
insertVersions(
testTableName,
/* numVersions= */ 5,
/* rowsPerVersion= */ 10,
/* includeEmptyVersion= */ true);

DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
String tableId = deltaLog.tableId();

DeltaSourceOffset startOffset =
new DeltaSourceOffset(tableId, startVersion, startIndex, /* isInitialSnapshot= */ false);
ReadLimit readLimit = limitConfig.toReadLimit();

// dsv1
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);
deltaSource.prepareForTriggerAvailableNow();
List<Offset> dsv1Offsets =
advanceOffsetSequenceDsv1(deltaSource, startOffset, numIterations, readLimit);

// dsv2
Configuration hadoopConf = new Configuration();
PathBasedSnapshotManager snapshotManager =
new PathBasedSnapshotManager(testTablePath, hadoopConf);
SparkMicroBatchStream stream =
new SparkMicroBatchStream(
snapshotManager, snapshotManager.loadLatestSnapshot(), hadoopConf);
stream.prepareForTriggerAvailableNow();
List<Offset> dsv2Offsets =
advanceOffsetSequenceDsv2(stream, startOffset, numIterations, readLimit);

compareOffsetSequence(dsv1Offsets, dsv2Offsets, testDescription);
}

private static Stream<Arguments> availableNowParameters() {
long BASE_INDEX = DeltaSourceOffset.BASE_INDEX();
long END_INDEX = DeltaSourceOffset.END_INDEX();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my knowledge.. what does these 2 mean?


return Stream.of(
// No limits
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.noLimit(),
/* numIterations= */ 3,
"NoLimits1"),
Arguments.of(
/* startVersion= */ 1L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.noLimit(),
/* numIterations= */ 3,
"NoLimits2"),
Arguments.of(
/* startVersion= */ 4L,
/* startIndex= */ END_INDEX,
ReadLimitConfig.noLimit(),
/* numIterations= */ 3,
"NoLimits3"),

// Max files
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(1),
/* numIterations= */ 3,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should use a larger number of iterations for some of these.

"MaxFiles1"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(1000),
/* numIterations= */ 3,
"MaxFiles2"),
Arguments.of(
/* startVersion= */ 1L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(2),
/* numIterations= */ 3,
"MaxFiles3"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxFiles(0),
/* numIterations= */ 3,
"MaxFiles4"),

// Max bytes
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(1),
/* numIterations= */ 3,
"MaxBytes1"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(1000000), // ensure larger than total file size
/* numIterations= */ 3,
"MaxBytes2"),
Arguments.of(
/* startVersion= */ 1L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(1000),
/* numIterations= */ 3,
"MaxBytes3"),
Arguments.of(
/* startVersion= */ 0L,
/* startIndex= */ BASE_INDEX,
ReadLimitConfig.maxBytes(0),
/* numIterations= */ 3,
"MaxBytes4"));
}

// ================================================================================================
// Helper methods
// ================================================================================================
Expand Down
Loading