Skip to content

Commit

Permalink
[DBZ-PGYB] Add transformer PGCompatible to emit events with standar…
Browse files Browse the repository at this point in the history
…d structure (#168)

This PR adds a transformer `PGCompatible` with full path
`io.debezium.connector.postgresql.transforms.yugabytedb`, this will be
helpful in converting the structure of the emitted events to match the
one emitted by standard Debezium connectors.

**Example:**
Consider the following schema for a table `test`: `(id INT PRIMARY KEY,
name TEXT, age INT)`
- If a record is inserted having values `(1, 'John Doe', 25)` then after
using the above transformer, the `payload` of the record would look
like:
```json
"payload": {
  "id": 1,
  "name": "John Doe",
  "age": 25
}
```
- If the same record is now updated and age is changed to 30 i.e.
`UPDATE test SET age = 30 WHERE id = 1;` then the `payload` would look
like:

```json
"payload": {
  "id": 1,
  "name": null,
  "age": 30
}
```

> **NOTE:** The above example assumes that the replica identity of the
table is `CHANGE` and that is how the assumption was made that the
`UPDATE` event will not contain the value for the fields which were not
updated. For more information on replica identity, see [YugabyteDB
docs](https://docs.yugabyte.com/preview/explore/change-data-capture/using-logical-replication/key-concepts/#replica-identity).
  • Loading branch information
vaibhav-yb authored Jan 2, 2025
1 parent 2066314 commit 276ea4e
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.debezium.connector.postgresql.transforms.yugabytedb;

import java.util.Map;
import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Custom extractor for YugabyteDB to be used to convert the output of yboutput/CHANGE to the
* standard structure emitted by Debezium Connector for Postgres; this will be used
* to transform records from the format {@code fieldName:{value:"someValue",set:true}}
* to {@code fieldName:"someValue"} and set the columns to null which are not updated in the
* given change event.
* @param <R>
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class PGCompatible<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(PGCompatible.class);

@Override
public R apply(final R record) {
if (record == null || (record.value() != null && !(record.value() instanceof Struct))) {
return record;
}

Pair<Schema, Struct> p = getUpdatedValueAndSchema(record.keySchema(), (Struct) record.key());
Schema updatedSchemaForKey = p.getFirst();
Struct updatedValueForKey = p.getSecond();

Schema updatedSchemaForValue = null;
Struct updatedValueForValue = null;
if (record.value() != null) {
Pair<Schema, Struct> val = getUpdatedValueAndSchema(record.valueSchema(), (Struct) record.value());
updatedSchemaForValue = val.getFirst();
updatedValueForValue = val.getSecond();
}

return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchemaForKey, updatedValueForKey, updatedSchemaForValue, updatedValueForValue, record.timestamp());
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
}

private boolean isValueSetStruct(Field field) {
return field.schema().fields().size() == 2
&& (Objects.equals(field.schema().fields().get(0).name(), "value")
&& Objects.equals(field.schema().fields().get(1).name(), "set"));
}

private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

if (schema.isOptional()) {
builder.optional();
} else {
builder.required();
}

for (Field field : schema.fields()) {
if (field.schema().type() == Type.STRUCT) {
if (isValueSetStruct(field)) {
builder.field(field.name(), field.schema().field("value").schema());
} else {
builder.field(field.name(), makeUpdatedSchema(field.schema()));
}
} else {
builder.field(field.name(), field.schema());
}
}

return builder.build();
}

private Struct makeUpdatedValue(Schema updatedSchema, Struct value) {
final Struct updatedValue = new Struct(updatedSchema);

for (Field field : value.schema().fields()) {
LOGGER.debug("Considering value {}", field.name());
if (field.schema().type() == Type.STRUCT) {
LOGGER.debug("Value is a struct");
Struct fieldValue = (Struct) value.get(field);
if (isValueSetStruct(field) && fieldValue != null) {
updatedValue.put(field.name(), fieldValue.get("value"));
} else if (fieldValue != null) {
updatedValue.put(field.name(), makeUpdatedValue(updatedSchema.field(field.name()).schema(), fieldValue));
}
} else {
updatedValue.put(field.name(), value.get(field));
}
}

return updatedValue;
}

public Pair<Schema, Struct> getUpdatedValueAndSchema(Schema schema, Struct value) {
Schema updatedSchema = makeUpdatedSchema(schema);
Struct updatedValue = makeUpdatedValue(updatedSchema, value);

LOGGER.trace("Updated schema as json: " + io.debezium.data.SchemaUtil.asString(updatedValue.schema()));

return new Pair<>(updatedSchema, updatedValue);
}

@Override
public void configure(Map<String, ?> map) {

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.debezium.connector.postgresql.transforms.yugabytedb;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.Map;

public class SchemaUtil {
public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) {
builder.name(source.name());
builder.version(source.version());
builder.doc(source.doc());

final Map<String, String> params = source.parameters();
if (params != null) {
builder.parameters(params);
}

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,3 @@ private Pair<Schema, Struct> getUpdatedValueAndSchema(Struct obj) {
}
}

class SchemaUtil {

public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) {
builder.name(source.name());
builder.version(source.version());
builder.doc(source.doc());

final Map<String, String> params = source.parameters();
if (params != null) {
builder.parameters(params);
}

return builder;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.debezium.connector.postgresql.transforms.yugabytedb;

import io.debezium.data.Envelope;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.ADD_HEADERS;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.HANDLE_DELETES;

/**
* Tests for {@link PGCompatible}
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class PGCompatibleTest {
final Schema idSchema = SchemaBuilder.struct()
.field("value", Schema.INT64_SCHEMA)
.field("set", Schema.BOOLEAN_SCHEMA);

final Schema nameSchema = SchemaBuilder.struct()
.field("value", Schema.OPTIONAL_STRING_SCHEMA)
.field("set", Schema.BOOLEAN_SCHEMA)
.optional();

final Schema keySchema = SchemaBuilder.struct()
.field("id", idSchema)
.build();

final Schema valueSchema = SchemaBuilder.struct()
.field("id", idSchema)
.field("name", nameSchema)
.field("location", nameSchema).optional()
.build();

final Schema sourceSchema = SchemaBuilder.struct()
.field("lsn", Schema.INT32_SCHEMA)
.field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA)
.field("op", Schema.STRING_SCHEMA)
.build();

final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(valueSchema)
.withSource(sourceSchema)
.build();

private Struct createIdStruct() {
final Struct id = new Struct(idSchema);
id.put("value", 1L);
id.put("set", true);

return id;
}

private Struct createNameStruct() {
final Struct name = new Struct(nameSchema);
name.put("value", "yb");
name.put("set", true);
return name;
}

private Struct createLocationStruct() {
final Struct name = new Struct(nameSchema);
name.put("value", null);
name.put("set", false);
return name;
}

private Struct createValue() {
final Struct value = new Struct(valueSchema);
value.put("id", createIdStruct());
value.put("name", createNameStruct());
value.put("location", createLocationStruct());

return value;
}

@Test
public void testSingleLevelStruct() {
try (final PGCompatible<SourceRecord> transform = new PGCompatible<>()) {
final Pair<Schema, Struct> unwrapped = transform.getUpdatedValueAndSchema(valueSchema, createValue());
Assert.assertEquals(1, (long) unwrapped.getSecond().getInt64("id"));
Assert.assertEquals("yb", unwrapped.getSecond().getString("name"));
Assert.assertNull(unwrapped.getSecond().getString("location"));
}
}

private Struct createPayload() {
final Struct source = new Struct(sourceSchema);
source.put("lsn", 1234);
source.put("ts_ms", 12836);
source.put("op", "c");
return envelope.create(createValue(), source, Instant.now());
}

@Test
public void testPayload() {
try (final PGCompatible<SourceRecord> transform = new PGCompatible<>()) {
Struct payload = createPayload();
final Pair<Schema, Struct> unwrapped = transform.getUpdatedValueAndSchema(payload.schema(), payload);
Schema valueSchema = unwrapped.getFirst();

Assert.assertSame(valueSchema.type(), Schema.Type.STRUCT);
Assert.assertEquals(6, valueSchema.fields().size());
Assert.assertSame(valueSchema.field("op").schema().type(), Schema.Type.STRING);

Schema afterSchema = valueSchema.field("after").schema();
Assert.assertSame(afterSchema.type(), Schema.Type.STRUCT);
Assert.assertEquals(3, afterSchema.fields().size());
Assert.assertSame(afterSchema.field("id").schema().type(), Schema.Type.INT64);
Assert.assertSame(afterSchema.field("name").schema().type(), Schema.Type.STRING);
Assert.assertSame(afterSchema.field("location").schema().type(), Schema.Type.STRING);

Struct after = unwrapped.getSecond().getStruct("after");
Assert.assertEquals(1, (long) after.getInt64("id"));
Assert.assertEquals("yb", after.getString("name"));
}
}

private SourceRecord createCreateRecord() {
final Struct key = new Struct(keySchema);
key.put("id", createIdStruct());

final Struct payload = createPayload();
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", keySchema, key, envelope.schema(), payload);
}

@Test
public void testHandleCreateRewrite() {
try (final PGCompatible<SourceRecord> transform = new PGCompatible<>()) {
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES.name(), "rewrite");
props.put(ADD_HEADERS.name(), "op");
transform.configure(props);

final SourceRecord createRecord = createCreateRecord();
final SourceRecord unwrapped = transform.apply(createRecord);
Struct after = ((Struct) unwrapped.value()).getStruct("after");
Assert.assertEquals(1, (long) ((Struct) unwrapped.value()).getStruct("after").getInt64("id"));
Assert.assertEquals("yb", ((Struct) unwrapped.value()).getStruct("after").getString("name"));

Assert.assertEquals("c", ((Struct) unwrapped.value()).getString("op"));
}
}
}

0 comments on commit 276ea4e

Please sign in to comment.