Skip to content

Commit

Permalink
DP-2782: Backport evolution bugfix to Connector 1 (DP-2767) (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicmart authored Feb 13, 2024
1 parent 101b6c0 commit cb95933
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ private Schema mergeStructs(Schema currentSchema, Schema recordSchema)
throws SchemaEvolutionException {
SchemaBuilder result = SchemaUtils.withMetadata(SchemaBuilder.struct(), currentSchema);

if (currentSchema.isOptional() || recordSchema.isOptional()) result.optional();

// First currentSchemaFields
currentSchema.fields().stream()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,28 @@ class StructSchemaEvolutionTest extends AnyFunSuite with Matchers with Inside {

schemaEv.evolve(currentSchema, recordSchema) shouldEqual expectedResult
}

test("testSchemaEvolutionPreservesFieldOptionality") {
val currentSchema =
SchemaBuilder.struct.field("a_struct",
SchemaBuilder.struct.field("optional_string", Schema.OPTIONAL_STRING_SCHEMA).build,
).field("an_empty_struct", SchemaBuilder.struct.optional.schema).build
val recordSchema = SchemaBuilder.struct.field(
"a_struct",
SchemaBuilder.struct.optional.field("optional_string", Schema.OPTIONAL_STRING_SCHEMA).field("an_int",
Schema.INT64_SCHEMA,
).field("a_string_2", Schema.STRING_SCHEMA).build,
).field("an_empty_struct", SchemaBuilder.struct.schema).build
val expectedResult = SchemaBuilder.struct.field(
"a_struct",
SchemaBuilder.struct.optional.field("optional_string", Schema.OPTIONAL_STRING_SCHEMA).field("an_int",
Schema.INT64_SCHEMA,
).field("a_string_2", Schema.STRING_SCHEMA).build,
).field("an_empty_struct", SchemaBuilder.struct.optional.schema).build
val result = schemaEv.evolve(currentSchema, recordSchema)
result shouldEqual expectedResult
}

test("SchemaEvolutionWithAdditionalStructRecordField") {
val currentSchema =
SchemaBuilder.struct()
Expand Down

0 comments on commit cb95933

Please sign in to comment.