Skip to content

Commit dfd7740

Browse files
committed
introduced incremental schema evolution
1 parent dd97de2 commit dfd7740

File tree

9 files changed

+1128
-29
lines changed

9 files changed

+1128
-29
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2023 Celonis SE
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.celonis.kafka.connect.schema;
17+
18+
import org.apache.kafka.connect.data.Schema;
19+
20+
public interface SchemaEvolution {
21+
Schema evolve(Schema currentSchema, Schema recordSchema) throws SchemaEvolutionException;
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2023 Celonis SE
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.celonis.kafka.connect.schema;
17+
18+
public final class SchemaEvolutionException extends RuntimeException {
19+
public SchemaEvolutionException(String message) {
20+
super(message);
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2023 Celonis SE
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.celonis.kafka.connect.schema;
17+
18+
import org.apache.kafka.connect.data.Schema;
19+
import org.apache.kafka.connect.data.SchemaBuilder;
20+
21+
import java.util.Optional;
22+
23+
public class SchemaUtils {
24+
public static SchemaBuilder withMetadata(SchemaBuilder builder, Schema schema) {
25+
Optional.ofNullable(schema.parameters())
26+
.filter(params -> !params.isEmpty())
27+
.ifPresent(builder::parameters);
28+
29+
Optional.ofNullable(schema.name()).ifPresent(builder::name);
30+
Optional.ofNullable(schema.doc()).ifPresent(builder::doc);
31+
Optional.ofNullable(schema.defaultValue()).ifPresent(builder::defaultValue);
32+
Optional.ofNullable(schema.version()).ifPresent(builder::version);
33+
34+
return builder;
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2023 Celonis SE
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.celonis.kafka.connect.schema;
17+
18+
import org.apache.kafka.connect.data.Schema;
19+
import org.apache.kafka.connect.data.Struct;
20+
21+
import java.util.Collection;
22+
import java.util.Map;
23+
import java.util.stream.Collectors;
24+
25+
public final class StructSchemaAlignment {
26+
/**
27+
* Align an object to the schema accumulated by incrementally calling SchemaEvolution#evolve.
28+
*
29+
* <p>NOTE: this is needed in order to avoid the unnecessarily frequent flush of output Parquet
30+
* files due JSON schema inference producing different records for what effectively is the same
31+
* data (see `EmsOutputRecordSink#put)
32+
*
33+
* @param evolvedSchema the schema this value should align to. Must be a superset of the supplied
34+
* struct schema.
35+
* @param value the current SinKRecord input struct
36+
* @return a struct with the evolved schema
37+
*/
38+
public static Struct alignTo(Schema evolvedSchema, Struct value) {
39+
return (Struct) align(evolvedSchema, value);
40+
}
41+
42+
private static Object align(Schema evolvedSchema, Object value) {
43+
switch (evolvedSchema.type()) {
44+
case ARRAY:
45+
final var collection = (Collection<?>) value;
46+
return collection.stream()
47+
.map(item -> align(evolvedSchema.valueSchema(), item))
48+
.collect(Collectors.toList());
49+
50+
case MAP:
51+
final var map = (Map<?, ?>) value;
52+
return map.entrySet().stream()
53+
.collect(
54+
Collectors.toMap(
55+
entry -> align(evolvedSchema.keySchema(), entry.getKey()),
56+
entry -> align(evolvedSchema.valueSchema(), entry.getValue())));
57+
58+
case STRUCT:
59+
final var structValue = (Struct) value;
60+
if (structValue.schema() == evolvedSchema) return structValue;
61+
final var newStruct = new Struct(evolvedSchema);
62+
63+
for (final var evolvedField : evolvedSchema.fields()) {
64+
if (structValue.schema().field(evolvedField.name()) != null) {
65+
newStruct.put(
66+
evolvedField.name(),
67+
align(evolvedField.schema(), structValue.get(evolvedField.name())));
68+
}
69+
}
70+
71+
return newStruct;
72+
default:
73+
return value;
74+
}
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2023 Celonis SE
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.celonis.kafka.connect.schema;
17+
18+
import org.apache.kafka.connect.data.Schema;
19+
import org.apache.kafka.connect.data.SchemaBuilder;
20+
21+
import java.util.Objects;
22+
23+
/**
24+
* StructSchemaEvolution is responsible for the recursively merging existing schema and schema from
25+
* new record. And thus evolving the schema with each new record received from connector.
26+
*/
27+
public class StructSchemaEvolution implements SchemaEvolution {
28+
29+
/**
30+
* Merge top level Kafka Connect Structs
31+
*
32+
* @param currentSchema existing schema, must be of type Struct
33+
* @param recordSchema schema of new record, must be of type Struct
34+
* @return Schema after merging existing and new schema recursively
35+
*/
36+
@Override
37+
public Schema evolve(Schema currentSchema, Schema recordSchema) throws SchemaEvolutionException {
38+
if (currentSchema == recordSchema) return currentSchema;
39+
40+
// RecordTransformer ensures that the top level schema are of type Struct.
41+
return mergeSchemas(null, currentSchema, recordSchema);
42+
}
43+
44+
/**
45+
* Merge a Kafka Connect Schemas
46+
*
47+
* @param fieldName current field name, `null` when the recursion starts
48+
* @param currentSchema existing schema, (Accepted types MAP, ARRAY, STRUCT)
49+
* @param recordSchema schema of new record, (Accepted types MAP, ARRAY, STRUCT)
50+
* @return Schema after merging existing and new schemas recursively
51+
*/
52+
private Schema mergeSchemas(String fieldName, Schema currentSchema, Schema recordSchema) {
53+
// validationsFirst
54+
validateSchemasTypes(fieldName, currentSchema, recordSchema);
55+
56+
switch (currentSchema.type()) {
57+
case STRUCT:
58+
return mergeStructs(currentSchema, recordSchema);
59+
case ARRAY:
60+
return SchemaBuilder.array(
61+
mergeSchemas(fieldName, currentSchema.valueSchema(), recordSchema.valueSchema()))
62+
.build();
63+
case MAP:
64+
var keySchema =
65+
mergeSchemas(fieldName, currentSchema.keySchema(), recordSchema.keySchema());
66+
var valueSchema =
67+
mergeSchemas(fieldName, currentSchema.valueSchema(), recordSchema.valueSchema());
68+
return SchemaBuilder.map(keySchema, valueSchema).build();
69+
default:
70+
return currentSchema;
71+
}
72+
}
73+
74+
private Schema mergeStructs(Schema currentSchema, Schema recordSchema)
75+
throws SchemaEvolutionException {
76+
SchemaBuilder result = SchemaUtils.withMetadata(SchemaBuilder.struct(), currentSchema);
77+
78+
// First currentSchemaFields
79+
currentSchema.fields().stream()
80+
.forEach(
81+
currentSchemaField -> {
82+
final var recordSchemaField = recordSchema.field(currentSchemaField.name());
83+
if (recordSchemaField == null) {
84+
// If not present in recordSchema, just add it
85+
result.field(currentSchemaField.name(), currentSchemaField.schema());
86+
} else {
87+
// Recursively evolve otherwise
88+
result.field(
89+
currentSchemaField.name(),
90+
mergeSchemas(
91+
currentSchemaField.name(),
92+
currentSchemaField.schema(),
93+
recordSchemaField.schema()));
94+
}
95+
});
96+
97+
// Just add remaining record schema fields as they are
98+
recordSchema.fields().stream()
99+
.filter(rf -> currentSchema.field(rf.name()) == null)
100+
.forEach(rf -> result.field(rf.name(), rf.schema()));
101+
102+
return result.build();
103+
}
104+
105+
private void validateSchemasTypes(String fieldName, Schema currentSchema, Schema recordSchema) {
106+
if (bothPrimitives(currentSchema, recordSchema) && !sameLogicalType(currentSchema, recordSchema)
107+
|| !currentSchema.type().equals(recordSchema.type())) {
108+
109+
throw new SchemaEvolutionException(
110+
String.format(
111+
"New schema has field '%s' with a different type! "
112+
+ "previous type: %s, current type: %s",
113+
fieldName, currentSchema, recordSchema));
114+
}
115+
}
116+
117+
private boolean bothPrimitives(Schema s1, Schema s2) {
118+
return s1.type().isPrimitive() && s2.type().isPrimitive();
119+
}
120+
121+
private boolean sameLogicalType(Schema s1, Schema s2) {
122+
return Objects.equals(s1.type(), s2.type())
123+
&& Objects.equals(s1.name(), s2.name())
124+
&& Objects.equals(s1.version(), s2.version())
125+
&& Objects.equals(s1.parameters(), s2.parameters());
126+
}
127+
}

Diff for: connector/src/main/scala/com/celonis/kafka/connect/transform/RecordTransformer.scala

+44-29
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ import com.celonis.kafka.connect.ems.errors.FailedObfuscationException
2525
import com.celonis.kafka.connect.ems.model._
2626
import com.celonis.kafka.connect.ems.obfuscation.ObfuscationUtils._
2727
import com.celonis.kafka.connect.ems.storage.PrimaryKeysValidator
28+
import com.celonis.kafka.connect.schema.{StructSchemaAlignment, StructSchemaEvolution}
2829
import com.celonis.kafka.connect.transform.conversion.ConnectConversion
2930
import com.celonis.kafka.connect.transform.fields.EmbeddedKafkaMetadata
3031
import com.celonis.kafka.connect.transform.fields.FieldInserter
3132
import com.celonis.kafka.connect.transform.flatten.Flattener
3233
import com.typesafe.scalalogging.StrictLogging
3334
import org.apache.avro.generic.GenericRecord
35+
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
3436
import org.apache.kafka.connect.sink.SinkRecord
3537

3638
/** The main business transformation.
@@ -45,9 +47,13 @@ final class RecordTransformer(
4547
obfuscation: Option[ObfuscationConfig],
4648
inserter: FieldInserter,
4749
) extends StrictLogging {
50+
51+
private var targetSchema: Schema = SchemaBuilder.struct().build();
52+
private val schemaEvolution = new StructSchemaEvolution();
53+
4854
def transform(sinkRecord: SinkRecord): IO[GenericRecord] = {
4955
val (convertedValue, convertedSchema) = preConversion.convert(sinkRecord.value(), Option(sinkRecord.valueSchema()))
50-
val flattenedValue = flattener.flatten(convertedValue, convertedSchema)
56+
val flattenedValue = flattener.flatten(convertedValue, convertedSchema)
5157

5258
for {
5359
transformedValue <- IO(
@@ -56,7 +62,8 @@ final class RecordTransformer(
5662
EmbeddedKafkaMetadata(sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset(), sinkRecord.timestamp()),
5763
),
5864
)
59-
v <- IO.fromEither(DataConverter.apply(transformedValue))
65+
schemaAlignedValue = evolveSchemaAndAlignValue(transformedValue)
66+
v <- IO.fromEither(DataConverter.apply(schemaAlignedValue))
6067
_ <- IO(logger.debug("[{}] EmsSinkTask:put obfuscation={}", sinkName, obfuscation))
6168
value <- obfuscation.fold(IO.pure(v)) { o =>
6269
IO.fromEither(v.obfuscate(o).leftMap(FailedObfuscationException))
@@ -68,34 +75,42 @@ final class RecordTransformer(
6875
_ <- IO.fromEither(pksValidator.validate(value, metadata))
6976
} yield value
7077
}
78+
79+
private def evolveSchemaAndAlignValue(value: Any): Any =
80+
value match {
81+
case struct: Struct =>
82+
targetSchema = schemaEvolution.evolve(targetSchema, struct.schema())
83+
StructSchemaAlignment.alignTo(targetSchema, struct)
84+
case _ => value
85+
}
7186
}
7287

73-
object RecordTransformer {
74-
def fromConfig(
75-
sinkName: String,
76-
preConversionConfig: PreConversionConfig,
77-
flattenerConfig: Option[FlattenerConfig],
78-
primaryKeys: List[String],
79-
obfuscation: Option[ObfuscationConfig],
80-
allowNullsAsPks: Boolean,
81-
inserter: FieldInserter): RecordTransformer =
82-
new RecordTransformer(
83-
sinkName,
84-
ConnectConversion.fromConfig(preConversionConfig),
85-
Flattener.fromConfig(flattenerConfig),
86-
new PrimaryKeysValidator(primaryKeys, allowNullsAsPks),
87-
obfuscation,
88-
inserter,
89-
)
88+
object RecordTransformer {
89+
def fromConfig(
90+
sinkName: String,
91+
preConversionConfig: PreConversionConfig,
92+
flattenerConfig: Option[FlattenerConfig],
93+
primaryKeys: List[String],
94+
obfuscation: Option[ObfuscationConfig],
95+
allowNullsAsPks: Boolean,
96+
inserter: FieldInserter): RecordTransformer =
97+
new RecordTransformer(
98+
sinkName,
99+
ConnectConversion.fromConfig(preConversionConfig),
100+
Flattener.fromConfig(flattenerConfig),
101+
new PrimaryKeysValidator(primaryKeys, allowNullsAsPks),
102+
obfuscation,
103+
inserter,
104+
)
90105

91-
def fromConfig(config: EmsSinkConfig): RecordTransformer =
92-
fromConfig(
93-
config.sinkName,
94-
config.preConversionConfig,
95-
config.flattenerConfig,
96-
config.primaryKeys,
97-
config.obfuscation,
98-
config.allowNullsAsPks,
99-
FieldInserter.embeddedKafkaMetadata(config.embedKafkaMetadata, config.orderField.name),
100-
)
106+
def fromConfig(config: EmsSinkConfig): RecordTransformer =
107+
fromConfig(
108+
config.sinkName,
109+
config.preConversionConfig,
110+
config.flattenerConfig,
111+
config.primaryKeys,
112+
config.obfuscation,
113+
config.allowNullsAsPks,
114+
FieldInserter.embeddedKafkaMetadata(config.embedKafkaMetadata, config.orderField.name),
115+
)
101116
}

0 commit comments

Comments
 (0)