Skip to content

Commit 3763efa

Browse files
committed
initial offset
1 parent 2a51205 commit 3763efa

File tree

2 files changed

+131
-24
lines changed

2 files changed

+131
-24
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
6363
private final boolean shouldValidateOffsets;
6464
private final SparkSession spark;
6565

66+
// Tracks whether this is the initial batch for this stream (no checkpointed offset).
67+
private boolean isInitialBatch = false;
68+
6669
public SparkMicroBatchStream(DeltaSnapshotManager snapshotManager, Configuration hadoopConf) {
6770
this(
6871
snapshotManager,
@@ -95,10 +98,34 @@ public SparkMicroBatchStream(
9598
// offset //
9699
////////////
97100

101+
/**
102+
* Returns the initial offset for a streaming query to start reading from (if there's no
103+
* checkpointed offset). Returns null if there's no data to read.
104+
*/
98105
@Override
99106
public Offset initialOffset() {
100-
// TODO(#5318): Implement initialOffset
101-
throw new UnsupportedOperationException("initialOffset is not supported");
107+
Optional<Long> startingVersionOpt = getStartingVersion();
108+
long version;
109+
boolean isInitialSnapshot;
110+
111+
if (startingVersionOpt.isPresent()) {
112+
version = startingVersionOpt.get();
113+
isInitialSnapshot = false;
114+
} else {
115+
// TODO(#5318): Support initial snapshot case (isInitialSnapshot == true)
116+
throw new UnsupportedOperationException(
117+
"initialOffset with initial snapshot is not supported yet");
118+
}
119+
120+
if (version < 0) {
121+
// This shouldn't happen; defensively return null.
122+
return null;
123+
}
124+
125+
isInitialBatch = true;
126+
127+
return DeltaSourceOffset.apply(
128+
tableId, version, DeltaSourceOffset.BASE_INDEX(), isInitialSnapshot);
102129
}
103130

104131
@Override
@@ -133,6 +160,8 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
133160
DeltaSourceOffset.validateOffsets(deltaStartOffset, endOffset.get());
134161
}
135162

163+
isInitialBatch = false;
164+
136165
// endOffset is null: no data is available to read for this batch.
137166
return endOffset.orElse(null);
138167
}
@@ -153,7 +182,9 @@ public ReadLimit getDefaultReadLimit() {
153182
*
154183
* @param previousOffset The previous offset
155184
* @param limits Rate limits for this batch (Optional.empty() for no limits)
156-
* @return The next offset, or the previous offset if no new data is available
185+
* @return The next offset, or the previous offset if no new data is available (except on the
186+
* initial batch where we return empty to match DSv1's
187+
* getStartingOffsetFromSpecificDeltaVersion behavior)
157188
*/
158189
private Optional<DeltaSourceOffset> getNextOffsetFromPreviousOffset(
159190
DeltaSourceOffset previousOffset, Optional<DeltaSource.AdmissionLimits> limits) {
@@ -169,6 +200,11 @@ private Optional<DeltaSourceOffset> getNextOffsetFromPreviousOffset(
169200
Optional<IndexedFile> lastFileChange = StreamingHelper.iteratorLast(changes);
170201

171202
if (!lastFileChange.isPresent()) {
203+
// On the initial batch, return empty to match DSv1's
204+
// getStartingOffsetFromSpecificDeltaVersion
205+
if (isInitialBatch) {
206+
return Optional.empty();
207+
}
172208
return Optional.of(previousOffset);
173209
}
174210
// TODO(#5318): Check read-incompatible schema changes during stream start
@@ -218,6 +254,8 @@ public void stop() {
218254
* Extracts whether users provided the option to time travel a relation. If a query restarts from
219255
* a checkpoint and the checkpoint has recorded the offset, this method should never be called.
220256
*
257+
* <p>Returns Optional.empty() if no starting version is provided.
258+
*
221259
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion.
222260
*/
223261
Optional<Long> getStartingVersion() {

kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@
3232
import java.util.stream.Stream;
3333
import org.apache.hadoop.conf.Configuration;
3434
import org.apache.hadoop.fs.Path;
35+
import org.apache.spark.sql.catalyst.expressions.Expression;
3536
import org.apache.spark.sql.connector.read.streaming.Offset;
3637
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
3738
import org.apache.spark.sql.delta.DeltaLog;
3839
import org.apache.spark.sql.delta.DeltaOptions;
40+
import org.apache.spark.sql.delta.Snapshot;
3941
import org.apache.spark.sql.delta.sources.DeltaSource;
4042
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
4143
import org.apache.spark.sql.delta.sources.ReadMaxBytes;
@@ -46,7 +48,9 @@
4648
import org.junit.jupiter.params.provider.Arguments;
4749
import org.junit.jupiter.params.provider.MethodSource;
4850
import scala.Option;
51+
import scala.collection.JavaConverters;
4952
import scala.collection.immutable.Map$;
53+
import scala.collection.immutable.Seq;
5054

5155
public class SparkMicroBatchStreamTest extends SparkDsv2TestBase {
5256

@@ -100,7 +104,8 @@ public void testInitialOffset_throwsUnsupportedOperationException(@TempDir File
100104
SparkMicroBatchStream microBatchStream = createTestStream(tempDir);
101105
UnsupportedOperationException exception =
102106
assertThrows(UnsupportedOperationException.class, () -> microBatchStream.initialOffset());
103-
assertEquals("initialOffset is not supported", exception.getMessage());
107+
assertEquals(
108+
"initialOffset with initial snapshot is not supported yet", exception.getMessage());
104109
}
105110

106111
@Test
@@ -129,6 +134,71 @@ public void testStop_throwsUnsupportedOperationException(@TempDir File tempDir)
129134
assertEquals("stop is not supported", exception.getMessage());
130135
}
131136

137+
// ================================================================================================
138+
// Tests for initialOffset parity between DSv1 and DSv2
139+
// ================================================================================================
140+
141+
@ParameterizedTest
142+
@MethodSource("initialOffsetParameters")
143+
public void testInitialOffset_FirstBatchParity(
144+
String startingVersion,
145+
ReadLimitConfig limitConfig,
146+
String testDescription,
147+
@TempDir File tempDir)
148+
throws Exception {
149+
String testTablePath = tempDir.getAbsolutePath();
150+
String testTableName = "test_initial_" + System.nanoTime();
151+
createEmptyTestTable(testTablePath, testTableName);
152+
insertVersions(
153+
testTableName,
154+
/* numVersions= */ 5,
155+
/* rowsPerVersion= */ 10,
156+
/* includeEmptyVersion= */ false);
157+
158+
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
159+
ReadLimit readLimit = limitConfig.toReadLimit();
160+
DeltaOptions options;
161+
if (startingVersion == null) {
162+
options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
163+
} else {
164+
scala.collection.immutable.Map<String, String> scalaMap =
165+
Map$.MODULE$.<String, String>empty().updated("startingVersion", startingVersion);
166+
options = new DeltaOptions(scalaMap, spark.sessionState().conf());
167+
}
168+
169+
// DSv1
170+
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath, options);
171+
// DSv1 sources don't have an initialOffset() method.
172+
// Batch 0 is called with startOffset=null.
173+
Offset dsv1Offset = deltaSource.latestOffset(/* startOffset= */ null, readLimit);
174+
175+
// DSv2
176+
Configuration hadoopConf = new Configuration();
177+
PathBasedSnapshotManager snapshotManager =
178+
new PathBasedSnapshotManager(testTablePath, hadoopConf);
179+
SparkMicroBatchStream stream =
180+
new SparkMicroBatchStream(snapshotManager, hadoopConf, spark, options);
181+
Offset initialOffset = stream.initialOffset();
182+
Offset dsv2Offset = stream.latestOffset(initialOffset, readLimit);
183+
184+
compareOffsets(dsv1Offset, dsv2Offset, testDescription);
185+
}
186+
187+
/** Provides test parameters for the initialOffset parity test. */
188+
private static Stream<Arguments> initialOffsetParameters() {
189+
return Stream.of(
190+
Arguments.of("0", ReadLimitConfig.noLimit(), "NoLimit1"),
191+
Arguments.of("1", ReadLimitConfig.noLimit(), "NoLimit2"),
192+
Arguments.of("3", ReadLimitConfig.noLimit(), "NoLimit3"),
193+
Arguments.of("latest", ReadLimitConfig.noLimit(), "LatestNoLimit"),
194+
Arguments.of("latest", ReadLimitConfig.maxFiles(1000), "LatestMaxFiles"),
195+
Arguments.of("latest", ReadLimitConfig.maxBytes(1000), "LatestMaxBytes"),
196+
Arguments.of("0", ReadLimitConfig.maxFiles(5), "MaxFiles1"),
197+
Arguments.of("1", ReadLimitConfig.maxFiles(10), "MaxFiles2"),
198+
Arguments.of("0", ReadLimitConfig.maxBytes(1000), "MaxBytes1"),
199+
Arguments.of("1", ReadLimitConfig.maxBytes(2000), "MaxBytes2"));
200+
}
201+
132202
// ================================================================================================
133203
// Tests for getFileChanges parity between DSv1 and DSv2
134204
// ================================================================================================
@@ -1095,26 +1165,6 @@ private Optional<DeltaSource.AdmissionLimits> createAdmissionLimits(
10951165
return Optional.of(new DeltaSource.AdmissionLimits(options, scalaMaxFiles, scalaMaxBytes));
10961166
}
10971167

1098-
/** Helper method to create a DeltaSource instance for testing. */
1099-
private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) {
1100-
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
1101-
scala.collection.immutable.Seq<org.apache.spark.sql.catalyst.expressions.Expression> emptySeq =
1102-
scala.collection.JavaConverters.asScalaBuffer(
1103-
new java.util.ArrayList<org.apache.spark.sql.catalyst.expressions.Expression>())
1104-
.toList();
1105-
org.apache.spark.sql.delta.Snapshot snapshot =
1106-
deltaLog.update(false, scala.Option.empty(), scala.Option.empty());
1107-
return new DeltaSource(
1108-
spark,
1109-
deltaLog,
1110-
/* catalogTableOpt= */ scala.Option.empty(),
1111-
options,
1112-
/* snapshotAtSourceInit= */ snapshot,
1113-
/* metadataPath= */ tablePath + "/_checkpoint",
1114-
/* metadataTrackingLog= */ scala.Option.empty(),
1115-
/* filters= */ emptySeq);
1116-
}
1117-
11181168
/** Helper method to format a DSv1 IndexedFile for debugging. */
11191169
private String formatIndexedFile(org.apache.spark.sql.delta.sources.IndexedFile file) {
11201170
return String.format(
@@ -1173,4 +1223,23 @@ private void compareOffsetSequence(
11731223
String.format("%s (iteration %d)", testDescription, i));
11741224
}
11751225
}
1226+
1227+
private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) {
1228+
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
1229+
return createDeltaSource(deltaLog, tablePath, options);
1230+
}
1231+
1232+
private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath, DeltaOptions options) {
1233+
Seq<Expression> emptySeq = JavaConverters.asScalaBuffer(new ArrayList<Expression>()).toList();
1234+
Snapshot snapshot = deltaLog.update(false, Option.empty(), Option.empty());
1235+
return new DeltaSource(
1236+
spark,
1237+
deltaLog,
1238+
/* catalogTableOpt= */ Option.empty(),
1239+
options,
1240+
/* snapshotAtSourceInit= */ snapshot,
1241+
/* metadataPath= */ tablePath + "/_checkpoint",
1242+
/* metadataTrackingLog= */ Option.empty(),
1243+
/* filters= */ emptySeq);
1244+
}
11761245
}

0 commit comments

Comments
 (0)