Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.UnsupportedTableFeatureException;
import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.spark.snapshot.DeltaSnapshotManager;
import io.delta.kernel.spark.utils.ScalaUtils;
import io.delta.kernel.spark.utils.StreamingHelper;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
Expand All @@ -34,8 +36,7 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.*;
import org.apache.spark.sql.delta.DeltaErrors;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.DeltaStartingVersion;
Expand All @@ -46,9 +47,8 @@
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class SparkMicroBatchStream implements MicroBatchStream {
public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {

private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class);

Expand All @@ -59,6 +59,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final Engine engine;
private final DeltaSnapshotManager snapshotManager;
private final DeltaOptions options;
private final String tableId;
private final boolean shouldValidateOffsets;
private final SparkSession spark;

public SparkMicroBatchStream(DeltaSnapshotManager snapshotManager, Configuration hadoopConf) {
Expand All @@ -80,6 +82,13 @@ public SparkMicroBatchStream(
this.snapshotManager = snapshotManager;
this.engine = DefaultEngine.create(hadoopConf);
this.options = options;

// Initialize snapshot at source init to get table ID, similar to DeltaSource.scala
Snapshot snapshotAtSourceInit = snapshotManager.loadLatestSnapshot();
this.tableId = ((SnapshotImpl) snapshotAtSourceInit).getMetadata().getId();

this.shouldValidateOffsets =
(Boolean) spark.sessionState().conf().getConf(DeltaSQLConf.STREAMING_OFFSET_VALIDATION());
}

////////////
Expand All @@ -88,19 +97,91 @@ public SparkMicroBatchStream(

@Override
public Offset initialOffset() {
// TODO(#5318): Implement initialOffset
throw new UnsupportedOperationException("initialOffset is not supported");
}

@Override
public Offset latestOffset() {
throw new UnsupportedOperationException("latestOffset is not supported");
throw new IllegalStateException(
"latestOffset() should not be called - use latestOffset(Offset, ReadLimit) instead");
}

/**
* Get the latest offset with rate limiting (SupportsAdmissionControl).
*
* @param startOffset The starting offset (can be null if initialOffset() returned null)
* @param limit The read limit for rate limiting
* @return The latest offset, or null if no data is available to read.
*/
@Override
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
// For the first batch, initialOffset() should be called before latestOffset().
// if startOffset is null: no data is available to read.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there by a case that startOffset is set to null

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark will call initialOffset first to obtain this startOffset for batch 0 -- so it could be null when initialOffset() returns null (i.e. table has no data).

if (startOffset == null) {
return null;
}
// TODO(#5318): init trigger available now support

DeltaSourceOffset deltaStartOffset = DeltaSourceOffset.apply(tableId, startOffset);
Optional<DeltaSource.AdmissionLimits> limits =
ScalaUtils.toJavaOptional(DeltaSource.AdmissionLimits$.MODULE$.apply(options, limit));
Optional<DeltaSourceOffset> endOffset =
getNextOffsetFromPreviousOffset(deltaStartOffset, limits);

if (shouldValidateOffsets && endOffset.isPresent()) {
DeltaSourceOffset.validateOffsets(deltaStartOffset, endOffset.get());
}

// endOffset is null: no data is available to read for this batch.
return endOffset.orElse(null);
}

@Override
public Offset deserializeOffset(String json) {
throw new UnsupportedOperationException("deserializeOffset is not supported");
}

@Override
public ReadLimit getDefaultReadLimit() {
return DeltaSource.AdmissionLimits$.MODULE$.toReadLimit(options);
}

/**
* Return the next offset when previous offset exists. Mimics
* DeltaSource.getNextOffsetFromPreviousOffset.
*
* @param previousOffset The previous offset
* @param limits Rate limits for this batch (Optional.empty() for no limits)
* @return The next offset, or the previous offset if no new data is available
*/
private Optional<DeltaSourceOffset> getNextOffsetFromPreviousOffset(
DeltaSourceOffset previousOffset, Optional<DeltaSource.AdmissionLimits> limits) {
// TODO(#5319): Special handling for schema tracking.

CloseableIterator<IndexedFile> changes =
getFileChangesWithRateLimit(
previousOffset.reservoirVersion(),
previousOffset.index(),
previousOffset.isInitialSnapshot(),
limits);

Optional<IndexedFile> lastFileChange = Utils.iteratorLast(changes);

if (!lastFileChange.isPresent()) {
return Optional.of(previousOffset);
}
// TODO(#5318): Check read-incompatible schema changes during stream start
IndexedFile lastFile = lastFileChange.get();
return Optional.of(
DeltaSource.buildOffsetFromIndexedFile(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeltaSource.buildOffsetFromIndexedFile seems to always return an offset although it returns Option[Offset].

Can we update the signature for DeltaSource.buildOffsetFromIndexedFile? I just want to minimize the interaction with null if it is not really necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

tableId,
lastFile.getVersion(),
lastFile.getIndex(),
previousOffset.reservoirVersion(),
previousOffset.isInitialSnapshot()));
}

////////////
/// data ///
////////////
Expand Down Expand Up @@ -214,23 +295,24 @@ private static boolean validateProtocolAt(
* @param fromVersion The starting version (exclusive with fromIndex)
* @param fromIndex The starting index within fromVersion (exclusive)
* @param isInitialSnapshot Whether this is the initial snapshot
* @param limits Rate limits to apply (Option.empty for no limits)
* @param limits Rate limits to apply (Optional.empty() for no limits)
* @return An iterator of IndexedFile with rate limiting applied
*/
CloseableIterator<IndexedFile> getFileChangesWithRateLimit(
long fromVersion,
long fromIndex,
boolean isInitialSnapshot,
Option<DeltaSource.AdmissionLimits> limits) {
Optional<DeltaSource.AdmissionLimits> limits) {
// TODO(#5319): getFileChangesForCDC if CDC is enabled.

CloseableIterator<IndexedFile> changes =
getFileChanges(fromVersion, fromIndex, isInitialSnapshot, /*endOffset=*/ Option.empty());
getFileChanges(
fromVersion, fromIndex, isInitialSnapshot, /* endOffset= */ Optional.empty());

// Take each change until we've seen the configured number of addFiles. Some changes don't
// represent file additions; we retain them for offset tracking, but they don't count toward
// the maxFilesPerTrigger conf.
if (limits.isDefined()) {
if (limits.isPresent()) {
DeltaSource.AdmissionLimits admissionLimits = limits.get();
changes = changes.takeWhile(admissionLimits::admit);
}
Expand All @@ -255,7 +337,7 @@ CloseableIterator<IndexedFile> getFileChanges(
long fromVersion,
long fromIndex,
boolean isInitialSnapshot,
Option<DeltaSourceOffset> endOffset) {
Optional<DeltaSourceOffset> endOffset) {

CloseableIterator<IndexedFile> result;

Expand All @@ -274,7 +356,7 @@ CloseableIterator<IndexedFile> getFileChanges(
|| (file.getVersion() == fromVersion && file.getIndex() > fromIndex));

// Check end boundary (inclusive)
if (endOffset.isDefined()) {
if (endOffset.isPresent()) {
DeltaSourceOffset bound = endOffset.get();
result =
result.takeWhile(
Expand All @@ -289,11 +371,20 @@ CloseableIterator<IndexedFile> getFileChanges(

// TODO(#5318): implement lazy loading (one batch at a time).
private CloseableIterator<IndexedFile> filterDeltaLogs(
long startVersion, Option<DeltaSourceOffset> endOffset) {
long startVersion, Optional<DeltaSourceOffset> endOffset) {
List<IndexedFile> allIndexedFiles = new ArrayList<>();
Optional<Long> endVersionOpt =
endOffset.isDefined() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty();
CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
endOffset.isPresent() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty();

CommitRange commitRange;
try {
commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
} catch (io.delta.kernel.exceptions.CommitRangeNotFoundException e) {
// If the requested version range doesn't exist (e.g., we're asking for version 6 when
// the table only has versions 0-5).
return Utils.toCloseableIterator(allIndexedFiles.iterator());
}

// Required by kernel: perform protocol validation by creating a snapshot at startVersion.
Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion);
String tablePath = startSnapshot.getPath();
Expand Down Expand Up @@ -374,9 +465,12 @@ private void flushVersion(
* @throws RuntimeException if the commit is invalid.
*/
private void validateCommit(
ColumnarBatch batch, long version, String tablePath, Option<DeltaSourceOffset> endOffsetOpt) {
ColumnarBatch batch,
long version,
String tablePath,
Optional<DeltaSourceOffset> endOffsetOpt) {
// If endOffset is at the beginning of this version, exit early.
if (endOffsetOpt.isDefined()) {
if (endOffsetOpt.isPresent()) {
DeltaSourceOffset endOffset = endOffsetOpt.get();
if (endOffset.reservoirVersion() == version
&& endOffset.index() == DeltaSourceOffset.BASE_INDEX()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import scala.Option;
import scala.Tuple2;
import scala.collection.immutable.Map$;
import scala.collection.mutable.Builder;
Expand Down Expand Up @@ -48,4 +50,26 @@ public static Map<String, String> toJavaMap(
}
return CollectionConverters.asJava(scalaMap);
}

/**
* Converts a Java {@link Optional} to a Scala {@link Option}.
*
* @param optional the Java Optional to convert
* @param <T> the type of the value
* @return the corresponding Scala Option
*/
public static <T> Option<T> toScalaOption(Optional<T> optional) {
return optional.map(Option::apply).orElse(Option.empty());
}

/**
* Converts a Scala {@link Option} to a Java {@link Optional}.
*
* @param option the Scala Option to convert
* @param <T> the type of the value
* @return the corresponding Java Optional
*/
public static <T> Optional<T> toJavaOptional(Option<T> option) {
return option.isDefined() ? Optional.of(option.get()) : Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ private static int getFieldIndex(ColumnarBatch batch, String fieldName) {
}

/**
* Get the version from a batch. Assumes all rows in the batch have the same version, so it reads
* from the first row (rowId=0).
* Get the version from a {@link ColumnarBatch} of Delta log actions. Assumes all rows in the
* batch belong to the same commit version, so it reads the version from the first row (rowId=0).
*/
public static long getVersion(ColumnarBatch batch) {
int versionColIdx = getFieldIndex(batch, "version");
Expand Down
Loading
Loading