From 7a2cec7c0fcaacfe6fccf7fb2d618e014ce4cae1 Mon Sep 17 00:00:00 2001 From: Stephane Geneix Date: Tue, 21 Jan 2025 12:55:47 -1000 Subject: [PATCH] refactor --- .../read/cdc/CdcPartitionReaderMongoTest.kt | 195 +++++++------- .../read/cdc/CdcPartitionReaderMySQLTest.kt | 144 ++++++----- .../cdc/CdcPartitionReaderPostgresTest.kt | 128 +++++----- .../cdc/AbstractCdcPartitionReaderTest.kt | 238 +++++++++--------- 4 files changed, 386 insertions(+), 319 deletions(-) diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt index e7b2c0b989fd..d37acf278ab5 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt @@ -14,6 +14,7 @@ import com.mongodb.client.MongoDatabase import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Updates +import io.airbyte.cdk.command.OpaqueStateValue import io.airbyte.cdk.read.Stream import io.airbyte.cdk.util.Jsons import io.debezium.connector.mongodb.MongoDbConnector @@ -80,101 +81,119 @@ class CdcPartitionReaderMongoTest : fn(it.getCollection(stream.name)) } - override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? { - val resumeToken: String = - recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText() ?: return null - return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken)) - } + override fun createDebeziumOperations(): DebeziumOperations { + return object : AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { + override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? { + val resumeToken: String = + recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText() + ?: return null + return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken)) + } - override fun position(sourceRecord: SourceRecord): BsonTimestamp? { - val offset: Map = sourceRecord.sourceOffset() - val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null - return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64)) - } + override fun position(sourceRecord: SourceRecord): BsonTimestamp? { + val offset: Map = sourceRecord.sourceOffset() + val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null + return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64)) + } - override fun MongoDbReplicaSet.currentPosition(): BsonTimestamp = - ResumeTokens.getTimestamp(currentResumeToken()) - - override fun MongoDbReplicaSet.syntheticInput(): DebeziumInput { - val resumeToken: BsonDocument = currentResumeToken() - val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) - val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value - val key: ArrayNode = - Jsons.arrayNode().apply { - add(stream.namespace) - add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) + override fun deserialize( + opaqueStateValue: OpaqueStateValue, + streams: List + ): DebeziumInput { + return super.deserialize(opaqueStateValue, streams).let { + DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) + } } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ord", timestamp.inc) - put("sec", timestamp.time) - put("resume_token", resumeTokenString) + + override fun deserialize( + key: DebeziumRecordKey, + value: DebeziumRecordValue, + stream: Stream, + ): DeserializedRecord { + val id: Int = key.element("id").asInt() + val record: Record = + if (value.operation == "d") { + Delete(id) + } else { + val v: Int? = + value.after + .takeIf { it.isTextual } + ?.asText() + ?.let { Jsons.readTree(it)["v"] } + ?.asInt() + if (v == null) { + // In case a mongodb document was updated and then deleted, the update + // change + // event will not have any information ({after: null}) + // We are going to treat it as a Delete. + Delete(id) + } else if (value.operation == "u") { + Update(id, v) + } else { + Insert(id, v) + } + } + return DeserializedRecord( + data = Jsons.valueToTree(record), + changes = emptyMap(), + ) } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } - private fun MongoDbReplicaSet.currentResumeToken(): BsonDocument = - withMongoDatabase { mongoDatabase: MongoDatabase -> - val pipeline = listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) - mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { - it.tryNext() - it.resumeToken!! + override fun position(offset: DebeziumOffset): BsonTimestamp { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt()) } - } - override fun MongoDbReplicaSet.debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MongoDbConnector::class.java) - .withDebeziumName(stream.namespace!!) - .withHeartbeats(heartbeat) - .with("capture.scope", "database") - .with("capture.target", stream.namespace!!) - .with("mongodb.connection.string", connectionString) - .with("snapshot.mode", "no_data") - .with( - "collection.include.list", - DebeziumPropertiesBuilder.joinIncludeList( - listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) - ) - ) - .with("database.include.list", stream.namespace!!) - .withOffset() - .buildMap() - - override fun deserialize( - key: DebeziumRecordKey, - value: DebeziumRecordValue, - stream: Stream, - ): DeserializedRecord { - val id: Int = key.element("id").asInt() - val record: Record = - if (value.operation == "d") { - Delete(id) - } else { - val v: Int? = - value.after - .takeIf { it.isTextual } - ?.asText() - ?.let { Jsons.readTree(it)["v"] } - ?.asInt() - if (v == null) { - // In case a mongodb document was updated and then deleted, the update change - // event will not have any information ({after: null}) - // We are going to treat it as a Delete. - Delete(id) - } else if (value.operation == "u") { - Update(id, v) - } else { - Insert(id, v) - } + override fun synthesize(): DebeziumInput { + val resumeToken: BsonDocument = currentResumeToken() + val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken) + val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value + val key: ArrayNode = + Jsons.arrayNode().apply { + add(stream.namespace) + add(Jsons.objectNode().apply { put("server_id", stream.namespace) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ord", timestamp.inc) + put("sec", timestamp.time) + put("resume_token", resumeTokenString) + } + val offset = DebeziumOffset(mapOf(key to value)) + val state = DebeziumState(offset, schemaHistory = null) + val syntheticProperties: Map = debeziumProperties() + return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - return DeserializedRecord( - data = Jsons.valueToTree(record), - changes = emptyMap(), - ) + + fun currentResumeToken(): BsonDocument = + container.withMongoDatabase { mongoDatabase: MongoDatabase -> + val pipeline = + listOf(Aggregates.match(Filters.`in`("ns.coll", stream.name))) + mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use { + it.tryNext() + it.resumeToken!! + } + } + + fun debeziumProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MongoDbConnector::class.java) + .withDebeziumName(stream.namespace!!) + .withHeartbeats(heartbeat) + .with("capture.scope", "database") + .with("capture.target", stream.namespace!!) + .with("mongodb.connection.string", container.connectionString) + .with("snapshot.mode", "no_data") + .with( + "collection.include.list", + DebeziumPropertiesBuilder.joinIncludeList( + listOf(Pattern.quote("${stream.namespace!!}.${stream.name}")) + ) + ) + .with("database.include.list", stream.namespace!!) + .withOffset() + .buildMap() + } } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt index 5932b5b62f59..263f48198555 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt @@ -6,6 +6,9 @@ package io.airbyte.cdk.read.cdc import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.command.OpaqueStateValue +import io.airbyte.cdk.read.Stream +import io.airbyte.cdk.read.cdc.* import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons import io.debezium.connector.mysql.MySqlConnector @@ -69,72 +72,91 @@ class CdcPartitionReaderMySQLTest : connection.createStatement().use { fn(it) } } - override fun position(recordValue: DebeziumRecordValue): Position? { - val file: String = - recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null - val pos: Long = - recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null - return Position(file, pos) - } + override fun createDebeziumOperations(): DebeziumOperations = + object : AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { + override fun position(offset: DebeziumOffset): Position { + val offsetAsJson = offset.wrapped.values.first() + val retVal = Position(offsetAsJson["file"].asText(), offsetAsJson["pos"].asLong()) + return retVal + } - override fun position(sourceRecord: SourceRecord): Position? { - val offset: Map = sourceRecord.sourceOffset() - val file: String = offset["file"]?.toString() ?: return null - val pos: Long = offset["pos"] as? Long ?: return null - return Position(file, pos) - } + override fun position(recordValue: DebeziumRecordValue): Position? { + val file: String = + recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null + val pos: Long = + recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong() + ?: return null + return Position(file, pos) + } - override fun MySQLContainer<*>.currentPosition(): Position = - withStatement { statement: Statement -> - statement.executeQuery("SHOW MASTER STATUS").use { - it.next() - Position(it.getString("File"), it.getLong("Position")) + override fun position(sourceRecord: SourceRecord): Position? { + val offset: Map = sourceRecord.sourceOffset() + val file: String = offset["file"]?.toString() ?: return null + val pos: Long = offset["pos"] as? Long ?: return null + return Position(file, pos) } - } - override fun MySQLContainer<*>.syntheticInput(): DebeziumInput { - val position: Position = currentPosition() - val timestamp: Instant = Instant.now() - val key: ArrayNode = - Jsons.arrayNode().apply { - add(databaseName) - add(Jsons.objectNode().apply { put("server", databaseName) }) + override fun synthesize(): DebeziumInput { + val position: Position = currentPosition() + val timestamp: Instant = Instant.now() + val key: ArrayNode = + Jsons.arrayNode().apply { + add(container.databaseName) + add(Jsons.objectNode().apply { put("server", container.databaseName) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ts_sec", timestamp.epochSecond) + put("file", position.file) + put("pos", position.pos) + } + val offset = DebeziumOffset(mapOf(key to value)) + val state = DebeziumState(offset, schemaHistory = null) + val syntheticProperties: Map = + DebeziumPropertiesBuilder() + .with(debeziumProperties()) + .with("snapshot.mode", "recovery") + .withStreams(listOf()) + .buildMap() + return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ts_sec", timestamp.epochSecond) - put("file", position.file) - put("pos", position.pos) + + override fun deserialize( + opaqueStateValue: OpaqueStateValue, + streams: List + ): DebeziumInput { + return super.deserialize(opaqueStateValue, streams).let { + DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) + } } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = - DebeziumPropertiesBuilder() - .with(debeziumProperties()) - .with("snapshot.mode", "recovery") - .withStreams(listOf()) - .buildMap() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } - override fun MySQLContainer<*>.debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(MySqlConnector::class.java) - .withDebeziumName(databaseName) - .withHeartbeats(heartbeat) - .with("include.schema.changes", "false") - .with("connect.keep.alive.interval.ms", "1000") - .withDatabase("hostname", host) - .withDatabase("port", firstMappedPort.toString()) - .withDatabase("user", username) - .withDatabase("password", password) - .withDatabase("dbname", databaseName) - .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) - .withDatabase("include.list", databaseName) - .withOffset() - .withSchemaHistory() - .with("snapshot.mode", "when_needed") - .withStreams(listOf(stream)) - .buildMap() + fun currentPosition(): Position = + container.withStatement { statement: Statement -> + statement.executeQuery("SHOW MASTER STATUS").use { + it.next() + Position(it.getString("File"), it.getLong("Position")) + } + } + + fun debeziumProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(MySqlConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("include.schema.changes", "false") + .with("connect.keep.alive.interval.ms", "1000") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withDatabase("server.id", Random.Default.nextInt(5400..6400).toString()) + .withDatabase("include.list", container.databaseName) + .withOffset() + .withSchemaHistory() + .with("snapshot.mode", "when_needed") + .withStreams(listOf(stream)) + .buildMap() + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt index a27fdfa964c8..97a31616b6b6 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt @@ -6,6 +6,8 @@ package io.airbyte.cdk.read.cdc import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.command.OpaqueStateValue +import io.airbyte.cdk.read.Stream import io.airbyte.cdk.testcontainers.TestContainerFactory import io.airbyte.cdk.util.Jsons import io.debezium.connector.postgresql.PostgresConnector @@ -72,69 +74,81 @@ class CdcPartitionReaderPostgresTest : connection.createStatement().use { fn(it) } } - override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? { - val lsn: Long = - recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null - return LogSequenceNumber.valueOf(lsn) - } + override fun createDebeziumOperations(): DebeziumOperations { + return object : + AbstractCdcPartitionReaderDebeziumOperationsForTest(stream) { + override fun position(offset: DebeziumOffset): LogSequenceNumber { + val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode + return LogSequenceNumber.valueOf(offsetValue["lsn"].asLong()) + } - override fun position(sourceRecord: SourceRecord): LogSequenceNumber? { - val offset: Map = sourceRecord.sourceOffset() - val lsn: Long = offset["lsn"] as? Long ?: return null - return LogSequenceNumber.valueOf(lsn) - } + override fun position(recordValue: DebeziumRecordValue): LogSequenceNumber? { + val lsn: Long = + recordValue.source["lsn"]?.takeIf { it.isIntegralNumber }?.asLong() + ?: return null + return LogSequenceNumber.valueOf(lsn) + } - override fun PostgreSQLContainer<*>.currentPosition(): LogSequenceNumber = - withStatement { statement: Statement -> - statement.executeQuery("SELECT pg_current_wal_lsn()").use { - it.next() - LogSequenceNumber.valueOf(it.getString(1)) + override fun position(sourceRecord: SourceRecord): LogSequenceNumber? { + val offset: Map = sourceRecord.sourceOffset() + val lsn: Long = offset["lsn"] as? Long ?: return null + return LogSequenceNumber.valueOf(lsn) } - } - override fun PostgreSQLContainer<*>.syntheticInput(): DebeziumInput { - val (position: LogSequenceNumber, txID: Long) = - withStatement { statement: Statement -> - statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { - it.next() - LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) + override fun deserialize( + opaqueStateValue: OpaqueStateValue, + streams: List + ): DebeziumInput { + return super.deserialize(opaqueStateValue, streams).let { + DebeziumInput(debeziumProperties(), it.state, it.isSynthetic) } } - val timestamp: Instant = Instant.now() - val key: ArrayNode = - Jsons.arrayNode().apply { - add(databaseName) - add(Jsons.objectNode().apply { put("server", databaseName) }) - } - val value: ObjectNode = - Jsons.objectNode().apply { - put("ts_usec", timestamp.toEpochMilli() * 1000L) - put("lsn", position.asLong()) - put("txId", txID) + + override fun synthesize(): DebeziumInput { + val (position: LogSequenceNumber, txID: Long) = + container.withStatement { statement: Statement -> + statement.executeQuery("SELECT pg_current_wal_lsn(), txid_current()").use { + it.next() + LogSequenceNumber.valueOf(it.getString(1)) to it.getLong(2) + } + } + val timestamp: Instant = Instant.now() + val key: ArrayNode = + Jsons.arrayNode().apply { + add(container.databaseName) + add(Jsons.objectNode().apply { put("server", container.databaseName) }) + } + val value: ObjectNode = + Jsons.objectNode().apply { + put("ts_usec", timestamp.toEpochMilli() * 1000L) + put("lsn", position.asLong()) + put("txId", txID) + } + val offset = DebeziumOffset(mapOf(key to value)) + val state = DebeziumState(offset, schemaHistory = null) + val syntheticProperties: Map = debeziumProperties() + return DebeziumInput(syntheticProperties, state, isSynthetic = true) } - val offset = DebeziumOffset(mapOf(key to value)) - val state = DebeziumState(offset, schemaHistory = null) - val syntheticProperties: Map = debeziumProperties() - return DebeziumInput(syntheticProperties, state, isSynthetic = true) - } - override fun PostgreSQLContainer<*>.debeziumProperties(): Map = - DebeziumPropertiesBuilder() - .withDefault() - .withConnector(PostgresConnector::class.java) - .withDebeziumName(databaseName) - .withHeartbeats(heartbeat) - .with("plugin.name", "pgoutput") - .with("slot.name", SLOT_NAME) - .with("publication.name", PUBLICATION_NAME) - .with("publication.autocreate.mode", "disabled") - .with("flush.lsn.source", "false") - .withDatabase("hostname", host) - .withDatabase("port", firstMappedPort.toString()) - .withDatabase("user", username) - .withDatabase("password", password) - .withDatabase("dbname", databaseName) - .withOffset() - .withStreams(listOf(stream)) - .buildMap() + fun debeziumProperties(): Map = + DebeziumPropertiesBuilder() + .withDefault() + .withConnector(PostgresConnector::class.java) + .withDebeziumName(container.databaseName) + .withHeartbeats(heartbeat) + .with("plugin.name", "pgoutput") + .with("slot.name", SLOT_NAME) + .with("publication.name", PUBLICATION_NAME) + .with("publication.autocreate.mode", "disabled") + .with("flush.lsn.source", "false") + .withDatabase("hostname", container.host) + .withDatabase("port", container.firstMappedPort.toString()) + .withDatabase("user", container.username) + .withDatabase("password", container.password) + .withDatabase("dbname", container.databaseName) + .withOffset() + .withStreams(listOf(stream)) + .buildMap() + } + } } diff --git a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt index ab6d058559ae..01bea3ef4f8e 100644 --- a/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt +++ b/airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt @@ -46,7 +46,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab namespace: String?, val heartbeat: Duration = Duration.ofMillis(100), val timeout: Duration = Duration.ofSeconds(10), -) : CdcPartitionReaderDebeziumOperations { +) { val stream = Stream( @@ -66,9 +66,9 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab abstract fun C.update135() abstract fun C.delete24() - abstract fun C.currentPosition(): T - abstract fun C.syntheticInput(): DebeziumInput - abstract fun C.debeziumProperties(): Map + abstract fun createDebeziumOperations(): DebeziumOperations + val container: C by lazy { createContainer() } + val debeziumOperations by lazy { createDebeziumOperations() } @Test /** @@ -80,57 +80,56 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab * [syntheticInput] and [debeziumProperties], and exercises all [PartitionReader] methods. */ fun integrationTest() { - createContainer().use { container: C -> - container.createStream() - val p0: T = container.currentPosition() - val r0: ReadResult = read(container.syntheticInput(), p0) - Assertions.assertEquals(emptyList(), r0.records) - Assertions.assertNotEquals( - CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, - r0.closeReason, - ) - - container.insert12345() - val insert = - listOf( - Insert(1, 1), - Insert(2, 2), - Insert(3, 3), - Insert(4, 4), - Insert(5, 5), - ) - container.update135() - val update = - listOf( - Update(1, 6), - Update(3, 7), - Update(5, 8), - ) - val p1: T = container.currentPosition() - container.delete24() - val delete = - listOf( - Delete(2), - Delete(4), - ) - val p2: T = container.currentPosition() - - val input = DebeziumInput(container.debeziumProperties(), r0.state, isSynthetic = false) - val r1: ReadResult = read(input, p1) - Assertions.assertEquals(insert + update, r1.records.take(insert.size + update.size)) - Assertions.assertNotNull(r1.closeReason) + container.createStream() + val p0: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) + val r0: ReadResult = read(debeziumOperations.synthesize(), p0) + Assertions.assertEquals(emptyList(), r0.records) + Assertions.assertNotEquals( + CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, + r0.closeReason, + ) - val r2: ReadResult = read(input, p2) - Assertions.assertEquals( - insert + update + delete, - r2.records.take(insert.size + update.size + delete.size), + container.insert12345() + val insert = + listOf( + Insert(1, 1), + Insert(2, 2), + Insert(3, 3), + Insert(4, 4), + Insert(5, 5), ) - Assertions.assertNotNull(r2.closeReason) - Assertions.assertNotEquals( - CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, - r2.closeReason + container.update135() + val update = + listOf( + Update(1, 6), + Update(3, 7), + Update(5, 8), ) - } + val p1: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) + container.delete24() + val delete = + listOf( + Delete(2), + Delete(4), + ) + val p2: T = debeziumOperations.position(debeziumOperations.synthesize().state.offset) + + val input: DebeziumInput = + debeziumOperations.deserialize(debeziumOperations.serialize(r0.state), listOf(stream)) + val r1: ReadResult = read(input, p1) + Assertions.assertEquals(insert + update, r1.records.take(insert.size + update.size)) + Assertions.assertNotNull(r1.closeReason) + + val r2: ReadResult = read(input, p2) + Assertions.assertEquals( + insert + update + delete, + r2.records.take(insert.size + update.size + delete.size), + ) + Assertions.assertNotNull(r2.closeReason) + Assertions.assertNotEquals( + CdcPartitionReader.CloseReason.RECORD_REACHED_TARGET_POSITION, + r2.closeReason + ) } private fun read( @@ -161,7 +160,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab CdcPartitionReader( ConcurrencyResource(1), streamRecordConsumers, - this, + debeziumOperations, upperBound, input, ) @@ -197,7 +196,7 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab Assertions.assertEquals(0, reader.numEventValuesWithoutPosition.get()) return ReadResult( outputConsumer.records().map { Jsons.treeToValue(it.data, Record::class.java) }, - deserialize(checkpoint.opaqueStateValue), + debeziumOperations.deserialize(checkpoint.opaqueStateValue, listOf(stream)).state, reader.closeReasonReference.get(), ) } @@ -221,68 +220,81 @@ abstract class AbstractCdcPartitionReaderTest, C : AutoCloseab data class Update(override val id: Int, val v: Int) : Record data class Delete(override val id: Int) : Record - override fun deserialize( - key: DebeziumRecordKey, - value: DebeziumRecordValue, - stream: Stream, - ): DeserializedRecord { - val id: Int = key.element("id").asInt() - val after: Int? = value.after["v"]?.asInt() - val record: Record = - if (after == null) { - Delete(id) - } else if (value.before["v"] == null) { - Insert(id, after) - } else { - Update(id, after) - } - return DeserializedRecord( - data = Jsons.valueToTree(record) as ObjectNode, - changes = emptyMap(), - ) - } - - override fun findStreamNamespace(key: DebeziumRecordKey, value: DebeziumRecordValue): String? = - stream.id.namespace + abstract inner class AbstractCdcPartitionReaderDebeziumOperationsForTest>( + val stream: Stream + ) : DebeziumOperations { + override fun deserialize( + key: DebeziumRecordKey, + value: DebeziumRecordValue, + stream: Stream, + ): DeserializedRecord { + val id: Int = key.element("id").asInt() + val after: Int? = value.after["v"]?.asInt() + val record: Record = + if (after == null) { + Delete(id) + } else if (value.before["v"] == null) { + Insert(id, after) + } else { + Update(id, after) + } + return DeserializedRecord( + data = Jsons.valueToTree(record) as ObjectNode, + changes = emptyMap(), + ) + } - override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? = - stream.id.name + override fun findStreamNamespace( + key: DebeziumRecordKey, + value: DebeziumRecordValue + ): String? = stream.id.namespace - override fun serialize(debeziumState: DebeziumState): OpaqueStateValue = - Jsons.valueToTree( - mapOf( - "offset" to - debeziumState.offset.wrapped - .map { - Jsons.writeValueAsString(it.key) to Jsons.writeValueAsString(it.value) - } - .toMap(), - "schemaHistory" to - debeziumState.schemaHistory?.wrapped?.map { - DocumentWriter.defaultWriter().write(it.document()) - }, - ), - ) + override fun findStreamName(key: DebeziumRecordKey, value: DebeziumRecordValue): String? = + stream.id.name - private fun deserialize(opaqueStateValue: OpaqueStateValue): DebeziumState { - val offsetNode: ObjectNode = opaqueStateValue["offset"] as ObjectNode - val offset = - DebeziumOffset( - offsetNode - .fields() - .asSequence() - .map { Jsons.readTree(it.key) to Jsons.readTree(it.value.asText()) } - .toMap(), + override fun serialize(debeziumState: DebeziumState): OpaqueStateValue = + Jsons.valueToTree( + mapOf( + "offset" to + debeziumState.offset.wrapped + .map { + Jsons.writeValueAsString(it.key) to + Jsons.writeValueAsString(it.value) + } + .toMap(), + "schemaHistory" to + debeziumState.schemaHistory?.wrapped?.map { + DocumentWriter.defaultWriter().write(it.document()) + }, + ), ) - val historyNode: ArrayNode = - opaqueStateValue["schemaHistory"] as? ArrayNode - ?: return DebeziumState(offset, schemaHistory = null) - val schemaHistory = - DebeziumSchemaHistory( - historyNode.elements().asSequence().toList().map { - HistoryRecord(DocumentReader.defaultReader().read(it.asText())) - }, - ) - return DebeziumState(offset, schemaHistory) + + override fun deserialize( + opaqueStateValue: OpaqueStateValue, + streams: List + ): DebeziumInput { + val offsetNode: ObjectNode = opaqueStateValue["offset"] as ObjectNode + val offset = + DebeziumOffset( + offsetNode + .fields() + .asSequence() + .map { Jsons.readTree(it.key) to Jsons.readTree(it.value.asText()) } + .toMap(), + ) + val historyNode: ArrayNode? = opaqueStateValue["schemaHistory"] as? ArrayNode + val schemaHistory: DebeziumSchemaHistory? = + if (historyNode != null) { + DebeziumSchemaHistory( + historyNode.elements().asSequence().toList().map { + HistoryRecord(DocumentReader.defaultReader().read(it.asText())) + }, + ) + } else { + null + } + val deserializedStateValue = DebeziumState(offset, schemaHistory) + return DebeziumInput(emptyMap(), deserializedStateValue, false) + } } }