diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index 9a597fd8b00..f1d934915c4 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -47,6 +47,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf; import org.apache.spark.sql.delta.sources.DeltaSource; import org.apache.spark.sql.delta.sources.DeltaSourceOffset; +import org.apache.spark.sql.delta.sources.DeltaSourceOffset$; import org.apache.spark.sql.execution.datasources.FilePartition; import org.apache.spark.sql.execution.datasources.FilePartition$; import org.apache.spark.sql.execution.datasources.PartitionedFile; @@ -193,7 +194,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { @Override public Offset deserializeOffset(String json) { - throw new UnsupportedOperationException("deserializeOffset is not supported"); + return DeltaSourceOffset$.MODULE$.apply(tableId, json); } @Override @@ -312,12 +313,12 @@ public PartitionReaderFactory createReaderFactory() { @Override public void commit(Offset end) { - throw new UnsupportedOperationException("commit is not supported"); + // TODO(#5319): update metadata tracking log. } @Override public void stop() { - throw new UnsupportedOperationException("stop is not supported"); + // TODO(#5318): unpersist any cached initial snapshot. } /////////////////////// diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index 203f6133774..af057d46f1c 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -50,6 +50,7 @@ import org.apache.spark.sql.delta.sources.DeltaSourceOffset; import org.apache.spark.sql.delta.sources.ReadMaxBytes; import org.apache.spark.sql.delta.storage.ClosableIterator; +import org.apache.spark.sql.delta.util.JsonUtils; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -97,29 +98,90 @@ public void testInitialOffset_throwsUnsupportedOperationException(@TempDir File } @Test - public void testDeserializeOffset_throwsUnsupportedOperationException(@TempDir File tempDir) { - SparkMicroBatchStream microBatchStream = createTestStream(tempDir); - UnsupportedOperationException exception = - assertThrows( - UnsupportedOperationException.class, () -> microBatchStream.deserializeOffset("{}")); - assertEquals("deserializeOffset is not supported", exception.getMessage()); + public void testDeserializeOffset_ValidJson(@TempDir File tempDir) throws Exception { + String tablePath = tempDir.getAbsolutePath(); + SparkMicroBatchStream stream = createTestStream(tempDir); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath)); + String tableId = deltaLog.tableId(); + DeltaSourceOffset expected = new DeltaSourceOffset(tableId, 5L, 10L, false); + String json = org.apache.spark.sql.delta.util.JsonUtils.mapper().writeValueAsString(expected); + + Offset result = stream.deserializeOffset(json); + DeltaSourceOffset actual = (DeltaSourceOffset) result; + + assertEquals(expected.reservoirId(), actual.reservoirId()); + assertEquals(expected.reservoirVersion(), actual.reservoirVersion()); + assertEquals(expected.index(), actual.index()); + assertEquals(expected.isInitialSnapshot(), actual.isInitialSnapshot()); } @Test - public void testCommit_throwsUnsupportedOperationException(@TempDir File tempDir) { - SparkMicroBatchStream microBatchStream = createTestStream(tempDir); - Offset end = null; - UnsupportedOperationException exception = - assertThrows(UnsupportedOperationException.class, () -> microBatchStream.commit(end)); - assertEquals("commit is not supported", exception.getMessage()); + public void testDeserializeOffset_MismatchedTableId(@TempDir File tempDir) throws Exception { + SparkMicroBatchStream stream = createTestStream(tempDir); + + // Create offset with wrong tableId + String wrongTableId = "wrong-table-id"; + DeltaSourceOffset offset = + new DeltaSourceOffset( + wrongTableId, + /* reservoirVersion= */ 1L, + /* index= */ 0L, + /* isInitialSnapshot= */ false); + String json = JsonUtils.mapper().writeValueAsString(offset); + RuntimeException exception = + assertThrows(RuntimeException.class, () -> stream.deserializeOffset(json)); + + assertTrue( + exception + .getMessage() + .contains("streaming query was reading from an unexpected Delta table")); } @Test - public void testStop_throwsUnsupportedOperationException(@TempDir File tempDir) { - SparkMicroBatchStream microBatchStream = createTestStream(tempDir); - UnsupportedOperationException exception = - assertThrows(UnsupportedOperationException.class, () -> microBatchStream.stop()); - assertEquals("stop is not supported", exception.getMessage()); + public void testDeserializeOffset_InvalidJson(@TempDir File tempDir) { + SparkMicroBatchStream stream = createTestStream(tempDir); + String invalidJson = "{this is not valid json}"; + assertThrows(RuntimeException.class, () -> stream.deserializeOffset(invalidJson)); + } + + @Test + public void testDeserializeOffset_WithInitialSnapshot(@TempDir File tempDir) throws Exception { + String tablePath = tempDir.getAbsolutePath(); + SparkMicroBatchStream stream = createTestStream(tempDir); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath)); + String tableId = deltaLog.tableId(); + long baseIndex = DeltaSourceOffset.BASE_INDEX(); + DeltaSourceOffset expected = + new DeltaSourceOffset( + tableId, /* reservoirVersion= */ 0L, baseIndex, /* isInitialSnapshot= */ true); + String json = org.apache.spark.sql.delta.util.JsonUtils.mapper().writeValueAsString(expected); + + Offset result = stream.deserializeOffset(json); + DeltaSourceOffset actual = (DeltaSourceOffset) result; + + assertTrue(actual.isInitialSnapshot()); + assertEquals(0L, actual.reservoirVersion()); + assertEquals(baseIndex, actual.index()); + } + + @Test + public void testCommit_NoOp(@TempDir File tempDir) throws Exception { + String tablePath = tempDir.getAbsolutePath(); + SparkMicroBatchStream stream = createTestStream(tempDir); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath)); + String tableId = deltaLog.tableId(); + DeltaSourceOffset offset = new DeltaSourceOffset(tableId, 1L, 0L, false); + + assertDoesNotThrow(() -> stream.commit(offset)); + } + + @Test + public void testStop_NoOp(@TempDir File tempDir) { + SparkMicroBatchStream stream = createTestStream(tempDir); + assertDoesNotThrow(() -> stream.stop()); } // ================================================================================================ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala index 4658df75455..2bcc621d63c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala @@ -140,6 +140,20 @@ object DeltaSourceOffset extends Logging { ) } + /** + * Validate and parse a DeltaSourceOffset from its JSON serialized format + * @param reservoirId Table id + * @param json Raw JSON string + */ + def apply(reservoirId: String, json: String): DeltaSourceOffset = { + val o = JsonUtils.mapper.readValue[DeltaSourceOffset](json) + if (o.reservoirId != reservoirId) { + throw DeltaErrors.differentDeltaTableReadByStreamingSource( + newTableId = reservoirId, oldTableId = o.reservoirId) + } + o + } + /** * Validate and parse a DeltaSourceOffset from its serialized format * @param reservoirId Table id @@ -148,13 +162,7 @@ object DeltaSourceOffset extends Logging { def apply(reservoirId: String, offset: OffsetV2): DeltaSourceOffset = { offset match { case o: DeltaSourceOffset => o - case s => - val o = JsonUtils.mapper.readValue[DeltaSourceOffset](s.json) - if (o.reservoirId != reservoirId) { - throw DeltaErrors.differentDeltaTableReadByStreamingSource( - newTableId = reservoirId, oldTableId = o.reservoirId) - } - o + case s => apply(reservoirId, s.json) } }