diff --git a/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java b/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java index acfb018ad64..6b898988a4f 100644 --- a/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java +++ b/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java @@ -18,6 +18,7 @@ import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; import io.confluent.kafka.schemaregistry.utils.ExceptionUtils; @@ -28,7 +29,10 @@ import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import io.confluent.kafka.serializers.NonRecordContainer; import org.apache.avro.generic.GenericContainer; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.DatumWriter; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.NetworkException; @@ -199,6 +203,16 @@ public byte[] serialize( value, schema); } + + @Override + protected DatumWriter getDatumWriter( + Object value, org.apache.avro.Schema schema, boolean useLogicalTypes, boolean allowNull) { + GenericData data = AvroSchemaUtils.getThreadLocalGenericData(); + if (data == null) { + data = AvroSchemaUtils.getGenericData(useLogicalTypes); + } + return new GenericDatumWriter<>(schema, data); + } } static class Deserializer extends AbstractKafkaAvroDeserializer { diff --git a/avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java b/avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java index d3e417ba4e4..b37269cafaf 100644 --- a/avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java +++ b/avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java @@ -999,7 +999,7 @@ public void testFromConnectOptionalWithInvalidDefault() { arraySchema.addProp("connect.default", arrayNode); org.apache.avro.Schema expectedAvroSchema = org.apache.avro.SchemaBuilder.builder() .record("ConnectDefault").namespace("io.confluent.connect.avro").fields() - .name("array").type(arraySchema).noDefault() // no default + .name("array").type(arraySchema).withDefault(Arrays.asList("a", "b", "c")) .endRecord(); assertEquals(expectedAvroSchema, avroSchema); diff --git a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java index e5dff758ce9..bfcd5630310 100644 --- a/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java +++ b/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java @@ -211,10 +211,15 @@ private void writeDatum(ByteArrayOutputStream out, Object value, Schema rawSchem DatumWriter writer; writer = datumWriterCache.get(rawSchema, - () -> (DatumWriter) AvroSchemaUtils.getDatumWriter( + () -> (DatumWriter) getDatumWriter( value, rawSchema, avroUseLogicalTypeConverters, avroReflectionAllowNull) ); writer.write(value, encoder); encoder.flush(); } + + protected DatumWriter getDatumWriter( + Object value, Schema schema, boolean useLogicalTypes, boolean allowNull) { + return AvroSchemaUtils.getDatumWriter(value, schema, useLogicalTypes, allowNull); + } } diff --git a/avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java b/avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java index b01d31cbed9..6cd18ab530d 100644 --- a/avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java +++ b/avro-serializer/src/test/java/io/confluent/kafka/serializers/KafkaAvroSerializerTest.java @@ -281,7 +281,7 @@ private IndexedRecord createInvalidAvroRecord() { + " \"name\": \"test\",\n" + " \"items\": {\n" + "\"type\": \"record\",\n" - + "\"namespace\": \"example.avro\",\n" + + "\"namespace\": \"io.confluent.kafka.example\",\n" + "\"name\": \"User\",\n" + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}}"); @@ -291,7 +291,7 @@ private IndexedRecord createInvalidAvroRecord() { + " \"name\": \"test\",\n" + " \"values\": {\n" + "\"type\": \"record\",\n" - + "\"namespace\": \"example.avro\",\n" + + "\"namespace\": \"io.confluent.kafka.example\",\n" + "\"name\": \"User\",\n" + "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}}"); @@ -912,6 +912,7 @@ public void testKafkaAvroSerializerWithCyclicReference() throws IOException, Res new SchemaReference("io.confluent.kafka.example.User", "user", -1) ))); } + @Test public void testKafkaAvroSerializerWithArraySpecific() throws IOException, RestClientException { Map serializerConfigs = ImmutableMap.of( @@ -958,7 +959,7 @@ public void testKafkaAvroSerializerWithMapSpecific() throws IOException, RestCli true ); Map data = new HashMap<>(); - data.put(new Utf8("one"), createUserRecordUtf8()); + data.put(new Utf8("one"), createSpecificAvroRecord()); schemaRegistry.register(topic + "-value", new AvroSchema(mapSchema)); avroSerializer.configure(serializerConfigs, false); avroDeserializer.configure(deserializerConfigs, false); diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java index ceb61862701..9a6e9c0cbb1 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java @@ -56,6 +56,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.apache.avro.NameValidator; import org.apache.avro.Schema; import org.apache.avro.SchemaCompatibility; import org.apache.avro.generic.GenericContainer; @@ -224,8 +225,12 @@ public ParsedSchema copy(Map> tagsToAdd, } protected Schema.Parser getParser() { - Schema.Parser parser = new Schema.Parser(); - parser.setValidateDefaults(isNew()); + boolean isNew = isNew(); + NameValidator nameValidator = isNew + ? NameValidator.STRICT_VALIDATOR + : NameValidator.NO_VALIDATION; + Schema.Parser parser = new Schema.Parser(nameValidator); + parser.setValidateDefaults(isNew); return parser; } diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchemaUtils.java b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchemaUtils.java index 9f26b392140..53a9296f40f 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchemaUtils.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchemaUtils.java @@ -170,19 +170,24 @@ public static SpecificData getSpecificData() { } public static void addLogicalTypeConversion(GenericData avroData) { + avroData.addLogicalTypeConversion(new Conversions.BigDecimalConversion()); avroData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + avroData.addLogicalTypeConversion(new Conversions.DurationConversion()); avroData.addLogicalTypeConversion(new Conversions.UUIDConversion()); avroData.addLogicalTypeConversion(new TimeConversions.DateConversion()); avroData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); avroData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); + avroData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion()); avroData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion()); avroData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + avroData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion()); avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion()); + avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion()); } private static final EncoderFactory encoderFactory = EncoderFactory.get(); diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Config.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Config.java index e6319dc74e0..72cdb693d13 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Config.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/Config.java @@ -33,6 +33,7 @@ public class Config { private String alias; private Boolean normalize; private Boolean validateFields; + private Boolean validateNames; private Boolean validateRules; private String compatibilityLevel; private String compatibilityPolicy; @@ -46,6 +47,7 @@ public class Config { public Config(@JsonProperty("alias") String alias, @JsonProperty("normalize") Boolean normalize, @JsonProperty("validateFields") Boolean validateFields, + @JsonProperty("validateNames") Boolean validateNames, @JsonProperty("validateRules") Boolean validateRules, @JsonProperty("compatibilityLevel") String compatibilityLevel, @JsonProperty("compatibilityPolicy") String compatibilityPolicy, @@ -57,6 +59,7 @@ public Config(@JsonProperty("alias") String alias, this.alias = alias; this.normalize = normalize; this.validateFields = validateFields; + this.validateNames = validateNames; this.validateRules = validateRules; this.compatibilityLevel = compatibilityLevel; this.compatibilityPolicy = compatibilityPolicy; @@ -100,6 +103,7 @@ public Config(ConfigUpdateRequest request) { this.alias = request.getAlias(); this.normalize = request.isNormalize(); this.validateFields = request.isValidateFields(); + this.validateNames = request.isValidateNames(); this.validateRules = request.isValidateRules(); this.compatibilityLevel = request.getCompatibilityLevel(); this.compatibilityPolicy = request.getCompatibilityPolicy(); @@ -140,6 +144,16 @@ public void setValidateFields(Boolean validateFields) { this.validateFields = validateFields; } + @JsonProperty("validateNames") + public Boolean isValidateNames() { + return validateNames; + } + + @JsonProperty("validateNames") + public void setValidateNames(Boolean validateNames) { + this.validateNames = validateNames; + } + @JsonProperty("validateRules") public Boolean isValidateRules() { return validateRules; @@ -244,6 +258,7 @@ public boolean equals(Object o) { return Objects.equals(alias, config.alias) && Objects.equals(normalize, config.normalize) && Objects.equals(validateFields, config.validateFields) + && Objects.equals(validateNames, config.validateNames) && Objects.equals(validateRules, config.validateRules) && Objects.equals(compatibilityLevel, config.compatibilityLevel) && Objects.equals(compatibilityPolicy, config.compatibilityPolicy) @@ -256,7 +271,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(alias, normalize, validateFields, validateRules, + return Objects.hash(alias, normalize, validateFields, validateNames, validateRules, compatibilityLevel, compatibilityPolicy, compatibilityGroup, defaultMetadata, overrideMetadata, defaultRuleSet, overrideRuleSet); } @@ -267,6 +282,7 @@ public String toString() { + "alias='" + alias + '\'' + ", normalize=" + normalize + ", validateFields=" + validateFields + + ", validateNames=" + validateNames + ", validateRules=" + validateRules + ", compatibilityLevel='" + compatibilityLevel + '\'' + ", compatibilityPolicy='" + compatibilityPolicy + '\'' diff --git a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java index 64d83336575..d6733ac6ff1 100644 --- a/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java +++ b/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java @@ -39,6 +39,7 @@ public class ConfigUpdateRequest { private Optional alias; private Optional normalize; private Optional validateFields; + private Optional validateNames; private Optional validateRules; private Optional compatibilityLevel; private Optional compatibilityPolicy; @@ -55,6 +56,7 @@ public ConfigUpdateRequest(Config config) { setAlias(config.getAlias()); setNormalize(config.isNormalize()); setValidateFields(config.isValidateFields()); + setValidateNames(config.isValidateNames()); setValidateRules(config.isValidateRules()); setCompatibilityLevel(config.getCompatibilityLevel()); setCompatibilityPolicy(config.getCompatibilityPolicy()); @@ -129,6 +131,26 @@ public void setValidateFields(Boolean validateFields) { this.validateFields = validateFields != null ? Optional.of(validateFields) : null; } + @JsonProperty("validateNames") + public Optional isOptionalValidateNames() { + return validateNames; + } + + @JsonIgnore + public Boolean isValidateNames() { + return validateNames != null ? validateNames.orElse(null) : null; + } + + @JsonProperty("validateNames") + public void setValidateNames(Optional validateNames) { + this.validateNames = validateNames; + } + + @JsonIgnore + public void setValidateNames(Boolean validateNames) { + this.validateNames = validateNames != null ? Optional.of(validateNames) : null; + } + @JsonProperty("validateRules") public Optional isOptionalValidateRules() { return validateRules; @@ -311,6 +333,7 @@ public boolean equals(Object o) { return Objects.equals(alias, that.alias) && Objects.equals(normalize, that.normalize) && Objects.equals(validateFields, that.validateFields) + && Objects.equals(validateNames, that.validateNames) && Objects.equals(validateRules, that.validateRules) && Objects.equals(compatibilityLevel, that.compatibilityLevel) && Objects.equals(compatibilityPolicy, that.compatibilityPolicy) @@ -323,7 +346,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(alias, normalize, validateFields, validateRules, + return Objects.hash(alias, normalize, validateFields, validateNames, validateRules, compatibilityLevel, compatibilityPolicy, compatibilityGroup, defaultMetadata, overrideMetadata, defaultRuleSet, overrideRuleSet); } diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java index aa8da87fe32..2b9304a24c4 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/rest/SchemaRegistryConfig.java @@ -191,6 +191,9 @@ public class SchemaRegistryConfig extends RestConfig { public static final String SCHEMA_VALIDATE_FIELDS_CONFIG = "schema.validate.fields"; public static final boolean SCHEMA_VALIDATE_FIELDS_DEFAULT = false; + public static final String SCHEMA_VALIDATE_NAMES_CONFIG = "schema.validate.names"; + public static final boolean SCHEMA_VALIDATE_NAMES_DEFAULT = true; + /** * schema.cache.size */ @@ -407,6 +410,8 @@ public class SchemaRegistryConfig extends RestConfig { + "enabled or not. If enabled, it checks whether any top level fields conflict with the " + "reserved fields in metadata. It also checks for the presence of any field names " + "beginning with $$"; + protected static final String VALIDATE_NAMES_DOC = "Determines whether name validation is " + + "enabled or not. If enabled, it validates both namespaces and names in Avro."; protected static final String SCHEMA_CACHE_SIZE_DOC = "The maximum size of the schema cache."; protected static final String SCHEMA_CACHE_EXPIRY_SECS_DOC = @@ -633,6 +638,9 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0), .define(SCHEMA_VALIDATE_FIELDS_CONFIG, ConfigDef.Type.BOOLEAN, SCHEMA_VALIDATE_FIELDS_DEFAULT, ConfigDef.Importance.LOW, VALIDATE_FIELDS_DOC ) + .define(SCHEMA_VALIDATE_NAMES_CONFIG, ConfigDef.Type.BOOLEAN, SCHEMA_VALIDATE_NAMES_DEFAULT, + ConfigDef.Importance.LOW, VALIDATE_NAMES_DOC + ) .define(SCHEMA_CACHE_SIZE_CONFIG, ConfigDef.Type.INT, SCHEMA_CACHE_SIZE_DEFAULT, ConfigDef.Importance.LOW, SCHEMA_CACHE_SIZE_DOC ) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/AbstractSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/AbstractSchemaRegistry.java index 985060ee942..7e5e98e86e5 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/AbstractSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/AbstractSchemaRegistry.java @@ -117,6 +117,7 @@ public abstract class AbstractSchemaRegistry implements SchemaRegistry, protected final LoadingCache oldSchemaCache; protected final CompatibilityLevel defaultCompatibilityLevel; protected final boolean defaultValidateFields; + protected final boolean defaultValidateNames; protected final Mode defaultMode; protected final int schemaSearchDefaultLimit; protected final int schemaSearchMaxLimit; @@ -170,6 +171,8 @@ protected AbstractSchemaRegistry(SchemaRegistryConfig config, MetricsContainer m this.defaultCompatibilityLevel = config.compatibilityType(); this.defaultValidateFields = config.getBoolean(SchemaRegistryConfig.SCHEMA_VALIDATE_FIELDS_CONFIG); + this.defaultValidateNames = + config.getBoolean(SchemaRegistryConfig.SCHEMA_VALIDATE_NAMES_CONFIG); this.defaultMode = Mode.READWRITE; this.schemaSearchDefaultLimit = config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG); @@ -508,10 +511,14 @@ protected void logSchemaOp(Schema schema, String operation) { tenant(), schema.getId(), schema.getSubject(), operation); } - private boolean isSchemaFieldValidationEnabled(Config config) { + protected boolean isSchemaFieldValidationEnabled(Config config) { return config.isValidateFields() != null ? config.isValidateFields() : defaultValidateFields; } + protected boolean isSchemaNameValidationEnabled(Config config) { + return config.isValidateNames() != null ? config.isValidateNames() : defaultValidateNames; + } + private ParsedSchema maybeValidateAndNormalizeSchema(ParsedSchema parsedSchema, Schema schema, Config config, @@ -1262,7 +1269,8 @@ public List isCompatible(String subject, } Config config = getConfigInScope(subject); - ParsedSchema parsedSchema = canonicalizeSchema(newSchema, config, true, normalize); + boolean doValidation = isSchemaNameValidationEnabled(config); + ParsedSchema parsedSchema = canonicalizeSchema(newSchema, config, doValidation, normalize); if (parsedSchema == null) { log.error("Empty schema"); throw new InvalidSchemaException("Empty schema"); diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/ConfigValue.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/ConfigValue.java index a6622650367..49a7a98af92 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/ConfigValue.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/ConfigValue.java @@ -34,6 +34,7 @@ public class ConfigValue extends SubjectValue { private String alias; private Boolean normalize; private Boolean validateFields; + private Boolean validateNames; private Boolean validateRules; private CompatibilityLevel compatibilityLevel; private CompatibilityPolicy compatibilityPolicy; @@ -47,6 +48,7 @@ public ConfigValue(@JsonProperty("subject") String subject, @JsonProperty("alias") String alias, @JsonProperty("normalize") Boolean normalize, @JsonProperty("validateFields") Boolean validateFields, + @JsonProperty("validateNames") Boolean validateNames, @JsonProperty("validateRules") Boolean validateRules, @JsonProperty("compatibilityLevel") CompatibilityLevel compatibilityLevel, @JsonProperty("compatibilityPolicy") CompatibilityPolicy compatibilityPolicy, @@ -59,6 +61,7 @@ public ConfigValue(@JsonProperty("subject") String subject, this.alias = alias; this.normalize = normalize; this.validateFields = validateFields; + this.validateNames = validateNames; this.validateRules = validateRules; this.compatibilityLevel = compatibilityLevel; this.compatibilityPolicy = compatibilityPolicy; @@ -73,6 +76,8 @@ public ConfigValue(String subject, Config configEntity) { super(subject); this.alias = configEntity.getAlias(); this.normalize = configEntity.isNormalize(); + this.validateFields = configEntity.isValidateFields(); + this.validateNames = configEntity.isValidateNames(); this.validateRules = configEntity.isValidateRules(); this.compatibilityLevel = CompatibilityLevel.forName(configEntity.getCompatibilityLevel()); this.compatibilityPolicy = CompatibilityPolicy.forName(configEntity.getCompatibilityPolicy()); @@ -96,6 +101,7 @@ public ConfigValue(String subject, Config configEntity, RuleSetHandler ruleSetHa this.alias = configEntity.getAlias(); this.normalize = configEntity.isNormalize(); this.validateFields = configEntity.isValidateFields(); + this.validateNames = configEntity.isValidateNames(); this.validateRules = configEntity.isValidateRules(); this.compatibilityLevel = CompatibilityLevel.forName(configEntity.getCompatibilityLevel()); this.compatibilityPolicy = CompatibilityPolicy.forName(configEntity.getCompatibilityPolicy()); @@ -145,6 +151,16 @@ public void setValidateFields(Boolean validateFields) { this.validateFields = validateFields; } + @JsonProperty("validateNames") + public Boolean isValidateNames() { + return validateNames; + } + + @JsonProperty("validateNames") + public void setValidateNames(Boolean validateNames) { + this.validateNames = validateNames; + } + @JsonProperty("validateRules") public Boolean isValidateRules() { return validateRules; @@ -240,6 +256,7 @@ public boolean equals(Object o) { return Objects.equals(alias, that.alias) && Objects.equals(normalize, that.normalize) && Objects.equals(validateFields, that.validateFields) + && Objects.equals(validateNames, that.validateNames) && Objects.equals(validateRules, that.validateRules) && compatibilityLevel == that.compatibilityLevel && Objects.equals(compatibilityPolicy, that.compatibilityPolicy) @@ -252,7 +269,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(super.hashCode(), alias, normalize, validateFields, validateRules, + return Objects.hash(super.hashCode(), alias, normalize, + validateFields, validateNames, validateRules, compatibilityLevel, compatibilityPolicy, compatibilityGroup, defaultMetadata, overrideMetadata, defaultRuleSet, overrideRuleSet); @@ -264,6 +282,7 @@ public String toString() { + "alias='" + alias + '\'' + ", normalize=" + normalize + ", validateFields=" + validateFields + + ", validateNames=" + validateNames + ", validateRules=" + validateRules + ", compatibilityLevel=" + compatibilityLevel + ", compatibilityPolicy='" + compatibilityPolicy + '\'' @@ -285,6 +304,7 @@ public Config toConfigEntity() { alias, normalize, validateFields, + validateNames, validateRules, compatibilityLevel != null ? compatibilityLevel.name : null, compatibilityPolicy != null ? compatibilityPolicy.name : null, @@ -333,6 +353,8 @@ public static ConfigValue update( ? newConfig.isNormalize() : oldConfig.isNormalize(), newConfig.isOptionalValidateFields() != null ? newConfig.isValidateFields() : oldConfig.isValidateFields(), + newConfig.isOptionalValidateNames() != null + ? newConfig.isValidateNames() : oldConfig.isValidateNames(), newConfig.isOptionalValidateRules() != null ? newConfig.isValidateRules() : oldConfig.isValidateRules(), newConfig.getOptionalCompatibilityLevel() != null diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java index 1671e6ee7f9..d031ebb6cc9 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java @@ -449,7 +449,8 @@ public Schema register(String subject, } int schemaId = schema.getId(); - ParsedSchema parsedSchema = canonicalizeSchema(schema, config, schemaId < 0, normalize); + boolean doValidation = schemaId < 0 && isSchemaNameValidationEnabled(config); + ParsedSchema parsedSchema = canonicalizeSchema(schema, config, doValidation, normalize); if (parsedSchema != null) { // see if the schema to be registered already exists diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java index b9a82336571..5c6e2564f91 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/rest/RestApiTest.java @@ -72,7 +72,6 @@ import java.util.Objects; import java.util.Properties; import org.apache.avro.Schema.Parser; -import org.apache.avro.SchemaParseException; import org.junit.jupiter.api.Test; public class RestApiTest extends ClusterTestHarness { @@ -282,7 +281,7 @@ public void testRegisterInvalidSchemaBadType() throws Exception { try { new Parser().parse(badSchemaString); fail("Parsing invalid schema string should fail with SchemaParseException"); - } catch (SchemaParseException spe) { + } catch (Exception spe) { expectedErrorMessage = spe.getMessage(); } @@ -2687,13 +2686,13 @@ public void testRegisterDropsRuleSet() throws Exception { public void testRegisterSchemaWithReservedFields() throws RestClientException, IOException { String subject0 = "testSubject0"; ParsedSchema schema1 = AvroUtils.parseSchema("{\"type\":\"record\"," - + "\"name\":\"myrecord\"," - + "\"fields\":" - + "[{\"type\":\"string\",\"name\":" - + "\"f" + "\"}," - + "{\"type\":\"string\",\"name\":" - + "\"g\" , \"default\":\"d\"}" - + "]}"); + + "\"name\":\"myrecord\"," + + "\"fields\":" + + "[{\"type\":\"string\",\"name\":" + + "\"f" + "\"}," + + "{\"type\":\"string\",\"name\":" + + "\"g\" , \"default\":\"d\"}" + + "]}"); RegisterSchemaRequest request1 = new RegisterSchemaRequest(Objects.requireNonNull(schema1)); request1.setMetadata(new Metadata(Collections.emptyMap(), Collections.singletonMap(ParsedSchema.RESERVED, "f"), @@ -2784,12 +2783,12 @@ public void testRegisterSchemaWithReservedFields() throws RestClientException, I // remove reserved fields for subject0 schema1 = AvroUtils.parseSchema("{\"type\":\"record\"," - + "\"name\":\"myrecord\"," - + "\"fields\":" - + "[" - + "{\"type\":\"string\",\"name\":" - + "\"g\" , \"default\":\"d\"}" - + "]}"); + + "\"name\":\"myrecord\"," + + "\"fields\":" + + "[" + + "{\"type\":\"string\",\"name\":" + + "\"g\" , \"default\":\"d\"}" + + "]}"); RegisterSchemaRequest request2 = new RegisterSchemaRequest(Objects.requireNonNull(schema1)); request2.setMetadata(new Metadata(Collections.emptyMap(), Collections.singletonMap(ParsedSchema.RESERVED, "g"), @@ -2801,6 +2800,41 @@ public void testRegisterSchemaWithReservedFields() throws RestClientException, I ); } + @Test + public void testRegisterSchemaWithInvalidNamespace() throws RestClientException, IOException { + String subject0 = "testSubject0"; + ParsedSchema schema1 = AvroUtils.parseSchema("{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"namespace\":\"a-bad.namespace\"," + + "\"fields\":" + + "[{\"type\":\"string\",\"name\":" + + "\"f" + "\"}," + + "{\"type\":\"string\",\"name\":" + + "\"g\" , \"default\":\"d\"}" + + "]}"); + RegisterSchemaRequest request1 = new RegisterSchemaRequest(Objects.requireNonNull(schema1)); + assertThrows( + RestClientException.class, + () -> restApp.restClient.registerSchema(request1, subject0, false), + "Fail registering subject0 because of global validateFields" + ); + + // global validateNames = false + ConfigUpdateRequest configUpdateRequest = new ConfigUpdateRequest(); + configUpdateRequest.setCompatibilityLevel(BACKWARD.name()); + configUpdateRequest.setValidateNames(false); + assertEquals( + configUpdateRequest, + restApp.restClient.updateConfig(configUpdateRequest, null), + "Updating config should succeed" + ); + assertEquals( + 1, + restApp.restClient.registerSchema(request1, subject0, false).getId(), + "Should register despite reserved fields" + ); + } + @Test public void testInvalidSchema() { assertThrows(InvalidSchemaException.class, () -> diff --git a/pom.xml b/pom.xml index 2ba40f4284a..2840aade1e4 100644 --- a/pom.xml +++ b/pom.xml @@ -90,6 +90,7 @@ 0.11.1 1.9.10 4.13.1 + 1.12.1 4.9.2 2.9.3 0.5.1