Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.spark.sql.delta.sources.DeltaSQLConf;
import org.apache.spark.sql.delta.sources.DeltaSource;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset$;
import org.apache.spark.sql.execution.datasources.FilePartition;
import org.apache.spark.sql.execution.datasources.FilePartition$;
import org.apache.spark.sql.execution.datasources.PartitionedFile;
Expand Down Expand Up @@ -193,7 +194,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) {

@Override
public Offset deserializeOffset(String json) {
throw new UnsupportedOperationException("deserializeOffset is not supported");
return DeltaSourceOffset$.MODULE$.apply(tableId, json);
}

@Override
Expand Down Expand Up @@ -312,12 +313,12 @@ public PartitionReaderFactory createReaderFactory() {

@Override
public void commit(Offset end) {
throw new UnsupportedOperationException("commit is not supported");
// TODO(#5319): update metadata tracking log.
}

@Override
public void stop() {
throw new UnsupportedOperationException("stop is not supported");
// TODO(#5318): unpersist any cached initial snapshot.
}

///////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.apache.spark.sql.delta.sources.ReadMaxBytes;
import org.apache.spark.sql.delta.storage.ClosableIterator;
import org.apache.spark.sql.delta.util.JsonUtils;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -97,29 +98,90 @@ public void testInitialOffset_throwsUnsupportedOperationException(@TempDir File
}

@Test
public void testDeserializeOffset_throwsUnsupportedOperationException(@TempDir File tempDir) {
SparkMicroBatchStream microBatchStream = createTestStream(tempDir);
UnsupportedOperationException exception =
assertThrows(
UnsupportedOperationException.class, () -> microBatchStream.deserializeOffset("{}"));
assertEquals("deserializeOffset is not supported", exception.getMessage());
public void testDeserializeOffset_ValidJson(@TempDir File tempDir) throws Exception {
String tablePath = tempDir.getAbsolutePath();
SparkMicroBatchStream stream = createTestStream(tempDir);

DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
String tableId = deltaLog.tableId();
DeltaSourceOffset expected = new DeltaSourceOffset(tableId, 5L, 10L, false);
String json = org.apache.spark.sql.delta.util.JsonUtils.mapper().writeValueAsString(expected);

Offset result = stream.deserializeOffset(json);
DeltaSourceOffset actual = (DeltaSourceOffset) result;

assertEquals(expected.reservoirId(), actual.reservoirId());
assertEquals(expected.reservoirVersion(), actual.reservoirVersion());
assertEquals(expected.index(), actual.index());
assertEquals(expected.isInitialSnapshot(), actual.isInitialSnapshot());
}

@Test
public void testCommit_throwsUnsupportedOperationException(@TempDir File tempDir) {
SparkMicroBatchStream microBatchStream = createTestStream(tempDir);
Offset end = null;
UnsupportedOperationException exception =
assertThrows(UnsupportedOperationException.class, () -> microBatchStream.commit(end));
assertEquals("commit is not supported", exception.getMessage());
public void testDeserializeOffset_MismatchedTableId(@TempDir File tempDir) throws Exception {
SparkMicroBatchStream stream = createTestStream(tempDir);

// Create offset with wrong tableId
String wrongTableId = "wrong-table-id";
DeltaSourceOffset offset =
new DeltaSourceOffset(
wrongTableId,
/* reservoirVersion= */ 1L,
/* index= */ 0L,
/* isInitialSnapshot= */ false);
String json = JsonUtils.mapper().writeValueAsString(offset);
RuntimeException exception =
assertThrows(RuntimeException.class, () -> stream.deserializeOffset(json));

assertTrue(
exception
.getMessage()
.contains("streaming query was reading from an unexpected Delta table"));
}

@Test
public void testStop_throwsUnsupportedOperationException(@TempDir File tempDir) {
SparkMicroBatchStream microBatchStream = createTestStream(tempDir);
UnsupportedOperationException exception =
assertThrows(UnsupportedOperationException.class, () -> microBatchStream.stop());
assertEquals("stop is not supported", exception.getMessage());
public void testDeserializeOffset_InvalidJson(@TempDir File tempDir) {
SparkMicroBatchStream stream = createTestStream(tempDir);
String invalidJson = "{this is not valid json}";
assertThrows(RuntimeException.class, () -> stream.deserializeOffset(invalidJson));
}

@Test
public void testDeserializeOffset_WithInitialSnapshot(@TempDir File tempDir) throws Exception {
String tablePath = tempDir.getAbsolutePath();
SparkMicroBatchStream stream = createTestStream(tempDir);

DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
String tableId = deltaLog.tableId();
long baseIndex = DeltaSourceOffset.BASE_INDEX();
DeltaSourceOffset expected =
new DeltaSourceOffset(
tableId, /* reservoirVersion= */ 0L, baseIndex, /* isInitialSnapshot= */ true);
String json = org.apache.spark.sql.delta.util.JsonUtils.mapper().writeValueAsString(expected);

Offset result = stream.deserializeOffset(json);
DeltaSourceOffset actual = (DeltaSourceOffset) result;

assertTrue(actual.isInitialSnapshot());
assertEquals(0L, actual.reservoirVersion());
assertEquals(baseIndex, actual.index());
}

@Test
public void testCommit_NoOp(@TempDir File tempDir) throws Exception {
String tablePath = tempDir.getAbsolutePath();
SparkMicroBatchStream stream = createTestStream(tempDir);

DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(tablePath));
String tableId = deltaLog.tableId();
DeltaSourceOffset offset = new DeltaSourceOffset(tableId, 1L, 0L, false);

assertDoesNotThrow(() -> stream.commit(offset));
}

@Test
public void testStop_NoOp(@TempDir File tempDir) {
SparkMicroBatchStream stream = createTestStream(tempDir);
assertDoesNotThrow(() -> stream.stop());
}

// ================================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ object DeltaSourceOffset extends Logging {
)
}

/**
* Validate and parse a DeltaSourceOffset from its JSON serialized format
* @param reservoirId Table id
* @param json Raw JSON string
*/
def apply(reservoirId: String, json: String): DeltaSourceOffset = {
val o = JsonUtils.mapper.readValue[DeltaSourceOffset](json)
if (o.reservoirId != reservoirId) {
throw DeltaErrors.differentDeltaTableReadByStreamingSource(
newTableId = reservoirId, oldTableId = o.reservoirId)
}
o
}

/**
* Validate and parse a DeltaSourceOffset from its serialized format
* @param reservoirId Table id
Expand All @@ -148,13 +162,7 @@ object DeltaSourceOffset extends Logging {
def apply(reservoirId: String, offset: OffsetV2): DeltaSourceOffset = {
offset match {
case o: DeltaSourceOffset => o
case s =>
val o = JsonUtils.mapper.readValue[DeltaSourceOffset](s.json)
if (o.reservoirId != reservoirId) {
throw DeltaErrors.differentDeltaTableReadByStreamingSource(
newTableId = reservoirId, oldTableId = o.reservoirId)
}
o
case s => apply(reservoirId, s.json)
}
}

Expand Down
Loading