Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Jan 21, 2025
1 parent 6bcb38f commit 7a2cec7
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.Aggregates
import com.mongodb.client.model.Filters
import com.mongodb.client.model.Updates
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.util.Jsons
import io.debezium.connector.mongodb.MongoDbConnector
Expand Down Expand Up @@ -80,101 +81,119 @@ class CdcPartitionReaderMongoTest :
fn(it.getCollection(stream.name))
}

override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? {
val resumeToken: String =
recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText() ?: return null
return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken))
}
override fun createDebeziumOperations(): DebeziumOperations<BsonTimestamp> {
return object : AbstractCdcPartitionReaderDebeziumOperationsForTest<BsonTimestamp>(stream) {
override fun position(recordValue: DebeziumRecordValue): BsonTimestamp? {
val resumeToken: String =
recordValue.source["resume_token"]?.takeIf { it.isTextual }?.asText()
?: return null
return ResumeTokens.getTimestamp(ResumeTokens.fromData(resumeToken))
}

override fun position(sourceRecord: SourceRecord): BsonTimestamp? {
val offset: Map<String, *> = sourceRecord.sourceOffset()
val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null
return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64))
}
override fun position(sourceRecord: SourceRecord): BsonTimestamp? {
val offset: Map<String, *> = sourceRecord.sourceOffset()
val resumeTokenBase64: String = offset["resume_token"] as? String ?: return null
return ResumeTokens.getTimestamp(ResumeTokens.fromBase64(resumeTokenBase64))
}

override fun MongoDbReplicaSet.currentPosition(): BsonTimestamp =
ResumeTokens.getTimestamp(currentResumeToken())

override fun MongoDbReplicaSet.syntheticInput(): DebeziumInput {
val resumeToken: BsonDocument = currentResumeToken()
val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken)
val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value
val key: ArrayNode =
Jsons.arrayNode().apply {
add(stream.namespace)
add(Jsons.objectNode().apply { put("server_id", stream.namespace) })
override fun deserialize(
opaqueStateValue: OpaqueStateValue,
streams: List<Stream>
): DebeziumInput {
return super.deserialize(opaqueStateValue, streams).let {
DebeziumInput(debeziumProperties(), it.state, it.isSynthetic)
}
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ord", timestamp.inc)
put("sec", timestamp.time)
put("resume_token", resumeTokenString)

override fun deserialize(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
): DeserializedRecord {
val id: Int = key.element("id").asInt()
val record: Record =
if (value.operation == "d") {
Delete(id)
} else {
val v: Int? =
value.after
.takeIf { it.isTextual }
?.asText()
?.let { Jsons.readTree(it)["v"] }
?.asInt()
if (v == null) {
// In case a mongodb document was updated and then deleted, the update
// change
// event will not have any information ({after: null})
// We are going to treat it as a Delete.
Delete(id)
} else if (value.operation == "u") {
Update(id, v)
} else {
Insert(id, v)
}
}
return DeserializedRecord(
data = Jsons.valueToTree(record),
changes = emptyMap(),
)
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> = debeziumProperties()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}

private fun MongoDbReplicaSet.currentResumeToken(): BsonDocument =
withMongoDatabase { mongoDatabase: MongoDatabase ->
val pipeline = listOf<Bson>(Aggregates.match(Filters.`in`("ns.coll", stream.name)))
mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use {
it.tryNext()
it.resumeToken!!
override fun position(offset: DebeziumOffset): BsonTimestamp {
val offsetValue: ObjectNode = offset.wrapped.values.first() as ObjectNode
return BsonTimestamp(offsetValue["sec"].asInt(), offsetValue["ord"].asInt())
}
}

override fun MongoDbReplicaSet.debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MongoDbConnector::class.java)
.withDebeziumName(stream.namespace!!)
.withHeartbeats(heartbeat)
.with("capture.scope", "database")
.with("capture.target", stream.namespace!!)
.with("mongodb.connection.string", connectionString)
.with("snapshot.mode", "no_data")
.with(
"collection.include.list",
DebeziumPropertiesBuilder.joinIncludeList(
listOf(Pattern.quote("${stream.namespace!!}.${stream.name}"))
)
)
.with("database.include.list", stream.namespace!!)
.withOffset()
.buildMap()

override fun deserialize(
key: DebeziumRecordKey,
value: DebeziumRecordValue,
stream: Stream,
): DeserializedRecord {
val id: Int = key.element("id").asInt()
val record: Record =
if (value.operation == "d") {
Delete(id)
} else {
val v: Int? =
value.after
.takeIf { it.isTextual }
?.asText()
?.let { Jsons.readTree(it)["v"] }
?.asInt()
if (v == null) {
// In case a mongodb document was updated and then deleted, the update change
// event will not have any information ({after: null})
// We are going to treat it as a Delete.
Delete(id)
} else if (value.operation == "u") {
Update(id, v)
} else {
Insert(id, v)
}
override fun synthesize(): DebeziumInput {
val resumeToken: BsonDocument = currentResumeToken()
val timestamp: BsonTimestamp = ResumeTokens.getTimestamp(resumeToken)
val resumeTokenString: String = ResumeTokens.getData(resumeToken).asString().value
val key: ArrayNode =
Jsons.arrayNode().apply {
add(stream.namespace)
add(Jsons.objectNode().apply { put("server_id", stream.namespace) })
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ord", timestamp.inc)
put("sec", timestamp.time)
put("resume_token", resumeTokenString)
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> = debeziumProperties()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}
return DeserializedRecord(
data = Jsons.valueToTree(record),
changes = emptyMap(),
)

fun currentResumeToken(): BsonDocument =
container.withMongoDatabase { mongoDatabase: MongoDatabase ->
val pipeline =
listOf<Bson>(Aggregates.match(Filters.`in`("ns.coll", stream.name)))
mongoDatabase.watch(pipeline, BsonDocument::class.java).cursor().use {
it.tryNext()
it.resumeToken!!
}
}

fun debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MongoDbConnector::class.java)
.withDebeziumName(stream.namespace!!)
.withHeartbeats(heartbeat)
.with("capture.scope", "database")
.with("capture.target", stream.namespace!!)
.with("mongodb.connection.string", container.connectionString)
.with("snapshot.mode", "no_data")
.with(
"collection.include.list",
DebeziumPropertiesBuilder.joinIncludeList(
listOf(Pattern.quote("${stream.namespace!!}.${stream.name}"))
)
)
.with("database.include.list", stream.namespace!!)
.withOffset()
.buildMap()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package io.airbyte.cdk.read.cdc

import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.read.Stream
import io.airbyte.cdk.read.cdc.*
import io.airbyte.cdk.testcontainers.TestContainerFactory
import io.airbyte.cdk.util.Jsons
import io.debezium.connector.mysql.MySqlConnector
Expand Down Expand Up @@ -69,72 +72,91 @@ class CdcPartitionReaderMySQLTest :
connection.createStatement().use { fn(it) }
}

override fun position(recordValue: DebeziumRecordValue): Position? {
val file: String =
recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null
val pos: Long =
recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong() ?: return null
return Position(file, pos)
}
override fun createDebeziumOperations(): DebeziumOperations<Position> =
object : AbstractCdcPartitionReaderDebeziumOperationsForTest<Position>(stream) {
override fun position(offset: DebeziumOffset): Position {
val offsetAsJson = offset.wrapped.values.first()
val retVal = Position(offsetAsJson["file"].asText(), offsetAsJson["pos"].asLong())
return retVal
}

override fun position(sourceRecord: SourceRecord): Position? {
val offset: Map<String, *> = sourceRecord.sourceOffset()
val file: String = offset["file"]?.toString() ?: return null
val pos: Long = offset["pos"] as? Long ?: return null
return Position(file, pos)
}
override fun position(recordValue: DebeziumRecordValue): Position? {
val file: String =
recordValue.source["file"]?.takeIf { it.isTextual }?.asText() ?: return null
val pos: Long =
recordValue.source["pos"]?.takeIf { it.isIntegralNumber }?.asLong()
?: return null
return Position(file, pos)
}

override fun MySQLContainer<*>.currentPosition(): Position =
withStatement { statement: Statement ->
statement.executeQuery("SHOW MASTER STATUS").use {
it.next()
Position(it.getString("File"), it.getLong("Position"))
override fun position(sourceRecord: SourceRecord): Position? {
val offset: Map<String, *> = sourceRecord.sourceOffset()
val file: String = offset["file"]?.toString() ?: return null
val pos: Long = offset["pos"] as? Long ?: return null
return Position(file, pos)
}
}

override fun MySQLContainer<*>.syntheticInput(): DebeziumInput {
val position: Position = currentPosition()
val timestamp: Instant = Instant.now()
val key: ArrayNode =
Jsons.arrayNode().apply {
add(databaseName)
add(Jsons.objectNode().apply { put("server", databaseName) })
override fun synthesize(): DebeziumInput {
val position: Position = currentPosition()
val timestamp: Instant = Instant.now()
val key: ArrayNode =
Jsons.arrayNode().apply {
add(container.databaseName)
add(Jsons.objectNode().apply { put("server", container.databaseName) })
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ts_sec", timestamp.epochSecond)
put("file", position.file)
put("pos", position.pos)
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> =
DebeziumPropertiesBuilder()
.with(debeziumProperties())
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}
val value: ObjectNode =
Jsons.objectNode().apply {
put("ts_sec", timestamp.epochSecond)
put("file", position.file)
put("pos", position.pos)

override fun deserialize(
opaqueStateValue: OpaqueStateValue,
streams: List<Stream>
): DebeziumInput {
return super.deserialize(opaqueStateValue, streams).let {
DebeziumInput(debeziumProperties(), it.state, it.isSynthetic)
}
}
val offset = DebeziumOffset(mapOf(key to value))
val state = DebeziumState(offset, schemaHistory = null)
val syntheticProperties: Map<String, String> =
DebeziumPropertiesBuilder()
.with(debeziumProperties())
.with("snapshot.mode", "recovery")
.withStreams(listOf())
.buildMap()
return DebeziumInput(syntheticProperties, state, isSynthetic = true)
}

override fun MySQLContainer<*>.debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MySqlConnector::class.java)
.withDebeziumName(databaseName)
.withHeartbeats(heartbeat)
.with("include.schema.changes", "false")
.with("connect.keep.alive.interval.ms", "1000")
.withDatabase("hostname", host)
.withDatabase("port", firstMappedPort.toString())
.withDatabase("user", username)
.withDatabase("password", password)
.withDatabase("dbname", databaseName)
.withDatabase("server.id", Random.Default.nextInt(5400..6400).toString())
.withDatabase("include.list", databaseName)
.withOffset()
.withSchemaHistory()
.with("snapshot.mode", "when_needed")
.withStreams(listOf(stream))
.buildMap()
fun currentPosition(): Position =
container.withStatement { statement: Statement ->
statement.executeQuery("SHOW MASTER STATUS").use {
it.next()
Position(it.getString("File"), it.getLong("Position"))
}
}

fun debeziumProperties(): Map<String, String> =
DebeziumPropertiesBuilder()
.withDefault()
.withConnector(MySqlConnector::class.java)
.withDebeziumName(container.databaseName)
.withHeartbeats(heartbeat)
.with("include.schema.changes", "false")
.with("connect.keep.alive.interval.ms", "1000")
.withDatabase("hostname", container.host)
.withDatabase("port", container.firstMappedPort.toString())
.withDatabase("user", container.username)
.withDatabase("password", container.password)
.withDatabase("dbname", container.databaseName)
.withDatabase("server.id", Random.Default.nextInt(5400..6400).toString())
.withDatabase("include.list", container.databaseName)
.withOffset()
.withSchemaHistory()
.with("snapshot.mode", "when_needed")
.withStreams(listOf(stream))
.buildMap()
}
}
Loading

0 comments on commit 7a2cec7

Please sign in to comment.