Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package io.delta.kernel.spark.read;

import static io.delta.kernel.internal.util.Preconditions.checkState;

import io.delta.kernel.internal.actions.AddFile;
import org.apache.spark.sql.delta.sources.AdmittableFile;

/**
* Java version of IndexedFile.scala that uses Kernel's action classes.
Expand All @@ -24,7 +27,7 @@
*
* <p>Indexed: refers to the index in DeltaSourceOffset, assigned by the streaming engine.
*/
public class IndexedFile {
public class IndexedFile implements AdmittableFile {
private final long version;
private final long index;
private final AddFile addFile;
Expand All @@ -47,6 +50,17 @@ public AddFile getAddFile() {
return addFile;
}

@Override
public boolean hasFileAction() {
return addFile != null;
}

@Override
public long getFileSize() {
checkState(addFile != null, "check hasFileAction() before calling getFileSize()");
return addFile.getSize();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: null check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches V1 behavior (throwing an NPE when addFile, removeFile, and cdc are all null).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fix that? Are there any places that try to catch NPE? if not we should avoid it for better diagnostic purpose

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Perhaps it's better to throw an IllegalStateException. Thanks!

}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.delta.DeltaErrors;
import org.apache.spark.sql.delta.sources.DeltaSource;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import scala.Option;

Expand Down Expand Up @@ -101,6 +102,37 @@ public void stop() {
// getFileChanges //
////////////////////

/**
* Get file changes with rate limiting applied. Mimics DeltaSource.getFileChangesWithRateLimit.
*
* @param fromVersion The starting version (exclusive with fromIndex)
* @param fromIndex The starting index within fromVersion (exclusive)
* @param isInitialSnapshot Whether this is the initial snapshot
* @param limits Rate limits to apply (Option.empty for no limits)
* @return An iterator of IndexedFile with rate limiting applied
*/
CloseableIterator<IndexedFile> getFileChangesWithRateLimit(
long fromVersion,
long fromIndex,
boolean isInitialSnapshot,
Option<DeltaSource.AdmissionLimits> limits) {
// TODO(#5319): getFileChangesForCDC if CDC is enabled.

CloseableIterator<IndexedFile> changes =
getFileChanges(fromVersion, fromIndex, isInitialSnapshot, /*endOffset=*/ Option.empty());

// Take each change until we've seen the configured number of addFiles. Some changes don't
// represent file additions; we retain them for offset tracking, but they don't count toward
// the maxFilesPerTrigger conf.
if (limits.isDefined()) {
DeltaSource.AdmissionLimits admissionLimits = limits.get();
changes = changes.takeWhile(admissionLimits::admit);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does takeWhile automatically close the iterator on exception?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, getFileChangesWithRateLimit() returns a CloseableIterator, the caller should be responsible for catching exceptions and closing the iterator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DSv1, we wrap the iterator with "withClose" which will automatically close the iteration if there is an Throwable thrown:

https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala#L315

Can we do the same here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need withClose here -- withClose is for making sure when the outer iterator closes, the inner iterator is also closed to prevent resource leak. https://sourcegraph.prod.databricks-corp.com/delta-io/delta/-/blob/spark/src/main/scala/org/apache/spark/sql/delta/storage/ClosableIterator.scala?L43:9

With the kernel's new CloseableIterator, this is already taken care of: https://sourcegraph.prod.databricks-corp.com/delta-io/delta/-/blob/kernel/kernel-api/src/main/java/io/delta/kernel/utils/CloseableIterator.java?L117

}

// TODO(#5318): Stop at schema change barriers
return changes;
}

/**
* Get file changes between fromVersion/fromIndex and endOffset. This is the Kernel-based
* implementation of DeltaSource.getFileChanges.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.sources.DeltaSource;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.apache.spark.sql.delta.storage.ClosableIterator;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -138,27 +139,17 @@ public void testGetFileChanges(

// Create 5 versions of data (versions 1-5, version 0 is the CREATE TABLE)
// Insert 100 rows per commit to potentially trigger multiple batches
for (int i = 0; i < 5; i++) {
StringBuilder insertValues = new StringBuilder();
for (int j = 0; j < 100; j++) {
if (j > 0) insertValues.append(", ");
int id = i * 100 + j;
insertValues.append(String.format("(%d, 'User%d')", id, id));
}
sql("INSERT INTO %s VALUES %s", testTableName, insertValues.toString());
}
SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration());
insertVersions(testTableName, /* numVersions= */ 5, /* rowsPerVersion= */ 100);

// dsv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
org.apache.spark.sql.delta.sources.DeltaSource deltaSource =
createDeltaSource(deltaLog, testTablePath);
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);

scala.Option<DeltaSourceOffset> scalaEndOffset = scala.Option.empty();
Option<DeltaSourceOffset> scalaEndOffset = Option.empty();
if (endVersion.isPresent()) {
long offsetIndex = endIndex.orElse(DeltaSourceOffset.END_INDEX());
scalaEndOffset =
scala.Option.apply(
Option.apply(
new DeltaSourceOffset(
deltaLog.tableId(), endVersion.get(), offsetIndex, isInitialSnapshot));
}
Expand All @@ -176,6 +167,7 @@ public void testGetFileChanges(
deltaChanges.close();

// dsv2 SparkMicroBatchStream
SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration());
Option<DeltaSourceOffset> endOffsetOption = scalaEndOffset;
try (CloseableIterator<IndexedFile> kernelChanges =
stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffsetOption)) {
Expand Down Expand Up @@ -237,6 +229,88 @@ private static Stream<Arguments> getFileChangesParameters() {
2L, 50L, notInitialSnapshot, Optional.of(2L), Optional.of(40L), "Empty Range"));
}

// ================================================================================================
// Tests for getFileChangesWithRateLimit parity between DSv1 and DSv2
// ================================================================================================

/**
* Test that verifies parity between DSv1 DeltaSource.getFileChangesWithRateLimit and DSv2
* SparkMicroBatchStream.getFileChangesWithRateLimit.
*
* <p>TODO(#5318): test initial snapshot once we fully support it.
*/
@ParameterizedTest
@MethodSource("getFileChangesWithRateLimitParameters")
public void testGetFileChangesWithRateLimit(
Optional<Integer> maxFiles,
Optional<Long> maxBytes,
String testDescription,
@TempDir File tempDir)
throws Exception {
String testTablePath = tempDir.getAbsolutePath();
String testTableName =
"test_rate_limit_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime();
createEmptyTestTable(testTablePath, testTableName);

// Create 5 versions with 10 rows each (versions 1-5)
insertVersions(testTableName, /* numVersions= */ 5, /* rowsPerVersion= */ 10);

// dsv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());

Option<DeltaSource.AdmissionLimits> dsv1Limits =
createAdmissionLimits(deltaSource, maxFiles, maxBytes);

ClosableIterator<org.apache.spark.sql.delta.sources.IndexedFile> deltaChanges =
deltaSource.getFileChangesWithRateLimit(
/*fromVersion=*/ 0L,
/* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(),
/* isInitialSnapshot=*/ false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to test it with initialSnapshot :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO.

dsv1Limits);
List<org.apache.spark.sql.delta.sources.IndexedFile> deltaFilesList = new ArrayList<>();
while (deltaChanges.hasNext()) {
deltaFilesList.add(deltaChanges.next());
}
deltaChanges.close();

// dsv2 SparkMicroBatchStream
SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration());
// We need a separate AdmissionLimits object for DSv2 because the method is stateful.
Option<DeltaSource.AdmissionLimits> dsv2Limits =
createAdmissionLimits(deltaSource, maxFiles, maxBytes);

try (CloseableIterator<IndexedFile> kernelChanges =
stream.getFileChangesWithRateLimit(
/*fromVersion=*/ 0L,
/* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(),
/* isInitialSnapshot=*/ false,
dsv2Limits)) {
List<IndexedFile> kernelFilesList = new ArrayList<>();
while (kernelChanges.hasNext()) {
kernelFilesList.add(kernelChanges.next());
}
compareFileChanges(deltaFilesList, kernelFilesList);
}
}

/** Provides test parameters for the parameterized getFileChangesWithRateLimit test. */
private static Stream<Arguments> getFileChangesWithRateLimitParameters() {
Optional<Integer> noMaxFiles = Optional.empty();
Optional<Long> noMaxBytes = Optional.empty();

return Stream.of(
// No rate limits
Arguments.of(noMaxFiles, noMaxBytes, "No limits"),
// MaxFiles only
Arguments.of(Optional.of(5), noMaxBytes, "MaxFiles"),
// MaxBytes only
Arguments.of(noMaxFiles, Optional.of(5000L), "MaxBytes"),
// Both limits
Arguments.of(Optional.of(10), Optional.of(10000L), "MaxFiles and MaxBytes"));
}

private void compareFileChanges(
List<org.apache.spark.sql.delta.sources.IndexedFile> deltaSourceFiles,
List<IndexedFile> kernelFiles) {
Expand Down Expand Up @@ -308,12 +382,11 @@ public void testGetFileChanges_EmptyVersions(
long fromVersion = 0L;
long fromIndex = DeltaSourceOffset.BASE_INDEX();
boolean isInitialSnapshot = false;
scala.Option<DeltaSourceOffset> endOffset = scala.Option.empty();
Option<DeltaSourceOffset> endOffset = Option.empty();

// Test DSv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
org.apache.spark.sql.delta.sources.DeltaSource deltaSource =
createDeltaSource(deltaLog, testTablePath);
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);

ClosableIterator<org.apache.spark.sql.delta.sources.IndexedFile> deltaChanges =
deltaSource.getFileChanges(
Expand Down Expand Up @@ -386,12 +459,11 @@ public void testGetFileChanges_OnRemoveFile_throwError(
long fromVersion = 0L;
long fromIndex = DeltaSourceOffset.BASE_INDEX();
boolean isInitialSnapshot = false;
scala.Option<DeltaSourceOffset> endOffset = scala.Option.empty();
Option<DeltaSourceOffset> endOffset = Option.empty();

// Test DSv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
org.apache.spark.sql.delta.sources.DeltaSource deltaSource =
createDeltaSource(deltaLog, testTablePath);
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);

UnsupportedOperationException dsv1Exception =
assertThrows(
Expand Down Expand Up @@ -535,17 +607,52 @@ private static void sql(String query, Object... args) {
SparkDsv2TestBase.spark.sql(String.format(query, args));
}

/**
* Helper method to insert multiple versions of data into a test table.
*
* @param tableName The name of the table to insert into
* @param numVersions The number of versions (commits) to create
* @param rowsPerVersion The number of rows to insert per version
*/
private void insertVersions(String tableName, int numVersions, int rowsPerVersion) {
for (int i = 0; i < numVersions; i++) {
StringBuilder values = new StringBuilder();
for (int j = 0; j < rowsPerVersion; j++) {
if (j > 0) values.append(", ");
int id = i * rowsPerVersion + j;
values.append(String.format("(%d, 'User%d')", id, id));
}
sql("INSERT INTO %s VALUES %s", tableName, values.toString());
}
}

private Option<DeltaSource.AdmissionLimits> createAdmissionLimits(
DeltaSource deltaSource, Optional<Integer> maxFiles, Optional<Long> maxBytes) {
Option<Object> scalaMaxFiles =
maxFiles.isPresent() ? Option.apply(maxFiles.get()) : Option.empty();
Option<Object> scalaMaxBytes =
maxBytes.isPresent() ? Option.apply(maxBytes.get()) : Option.empty();

if (scalaMaxFiles.isEmpty() && scalaMaxBytes.isEmpty()) {
return Option.empty();
}
return Option.apply(
deltaSource
.new AdmissionLimits(
scalaMaxFiles,
scalaMaxBytes.isDefined() ? (Long) scalaMaxBytes.get() : Long.MAX_VALUE));
}

/** Helper method to create a DeltaSource instance for testing. */
private org.apache.spark.sql.delta.sources.DeltaSource createDeltaSource(
DeltaLog deltaLog, String tablePath) {
private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) {
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
scala.collection.immutable.Seq<org.apache.spark.sql.catalyst.expressions.Expression> emptySeq =
scala.collection.JavaConverters.asScalaBuffer(
new java.util.ArrayList<org.apache.spark.sql.catalyst.expressions.Expression>())
.toList();
org.apache.spark.sql.delta.Snapshot snapshot =
deltaLog.update(false, Option.empty(), Option.empty());
return new org.apache.spark.sql.delta.sources.DeltaSource(
return new DeltaSource(
spark,
deltaLog,
/* catalogTableOpt= */ Option.empty(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.sources;

/**
* Interface for files that can be admitted by admission control in Delta streaming sources.
* This abstraction allows both DSv1 and DSv2 IndexedFile implementations to be used with
* the admission control logic.
*/
public interface AdmittableFile {
/**
* Returns true if this file has an associated file action (AddFile, RemoveFile, or CDCFile).
* Placeholder IndexedFiles with no file action will return false.
*/
boolean hasFileAction();

/**
* Returns the size of the file in bytes.
* This method should only be called when hasFileAction() returns true.
*/
long getFileSize();
}
Loading
Loading