Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import io.confluent.kafka.schemaregistry.utils.ExceptionUtils;
Expand All @@ -28,7 +29,10 @@
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.NonRecordContainer;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.NetworkException;
Expand Down Expand Up @@ -199,6 +203,16 @@ public byte[] serialize(
value,
schema);
}

@Override
protected DatumWriter<?> getDatumWriter(
Object value, org.apache.avro.Schema schema, boolean useLogicalTypes, boolean allowNull) {
GenericData data = AvroSchemaUtils.getThreadLocalGenericData();
if (data == null) {
data = AvroSchemaUtils.getGenericData(useLogicalTypes);
}
return new GenericDatumWriter<>(schema, data);
}
}

static class Deserializer extends AbstractKafkaAvroDeserializer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@
.field("boolean", Schema.BOOLEAN_SCHEMA)
.field("string", Schema.STRING_SCHEMA)
.field("bytes", Schema.BYTES_SCHEMA)
.field("array", SchemaBuilder.array(Schema.STRING_SCHEMA).build())

Check failure on line 527 in avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java#L527

Define a constant instead of duplicating this literal "array" 27 times.
.field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build())
.field("mapNonStringKeys",
SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build())
Expand Down Expand Up @@ -999,7 +999,7 @@
arraySchema.addProp("connect.default", arrayNode);
org.apache.avro.Schema expectedAvroSchema = org.apache.avro.SchemaBuilder.builder()
.record("ConnectDefault").namespace("io.confluent.connect.avro").fields()
.name("array").type(arraySchema).noDefault() // no default
.name("array").type(arraySchema).withDefault(Arrays.asList("a", "b", "c"))
.endRecord();

assertEquals(expectedAvroSchema, avroSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,15 @@ private void writeDatum(ByteArrayOutputStream out, Object value, Schema rawSchem

DatumWriter<Object> writer;
writer = datumWriterCache.get(rawSchema,
() -> (DatumWriter<Object>) AvroSchemaUtils.getDatumWriter(
() -> (DatumWriter<Object>) getDatumWriter(
value, rawSchema, avroUseLogicalTypeConverters, avroReflectionAllowNull)
);
writer.write(value, encoder);
encoder.flush();
}

protected DatumWriter<?> getDatumWriter(
Object value, Schema schema, boolean useLogicalTypes, boolean allowNull) {
return AvroSchemaUtils.getDatumWriter(value, schema, useLogicalTypes, allowNull);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private IndexedRecord createInvalidAvroRecord() {
+ " \"name\": \"test\",\n"
+ " \"items\": {\n"
+ "\"type\": \"record\",\n"
+ "\"namespace\": \"example.avro\",\n"
+ "\"namespace\": \"io.confluent.kafka.example\",\n"
+ "\"name\": \"User\",\n"
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}}");

Expand All @@ -291,7 +291,7 @@ private IndexedRecord createInvalidAvroRecord() {
+ " \"name\": \"test\",\n"
+ " \"values\": {\n"
+ "\"type\": \"record\",\n"
+ "\"namespace\": \"example.avro\",\n"
+ "\"namespace\": \"io.confluent.kafka.example\",\n"
+ "\"name\": \"User\",\n"
+ "\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}}");

Expand Down Expand Up @@ -912,6 +912,7 @@ public void testKafkaAvroSerializerWithCyclicReference() throws IOException, Res
new SchemaReference("io.confluent.kafka.example.User", "user", -1)
)));
}

@Test
public void testKafkaAvroSerializerWithArraySpecific() throws IOException, RestClientException {
Map serializerConfigs = ImmutableMap.of(
Expand Down Expand Up @@ -958,7 +959,7 @@ public void testKafkaAvroSerializerWithMapSpecific() throws IOException, RestCli
true
);
Map<Utf8, IndexedRecord> data = new HashMap<>();
data.put(new Utf8("one"), createUserRecordUtf8());
data.put(new Utf8("one"), createSpecificAvroRecord());
schemaRegistry.register(topic + "-value", new AvroSchema(mapSchema));
avroSerializer.configure(serializerConfigs, false);
avroDeserializer.configure(deserializerConfigs, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericContainer;
Expand Down Expand Up @@ -224,8 +225,12 @@
}

protected Schema.Parser getParser() {
Schema.Parser parser = new Schema.Parser();
parser.setValidateDefaults(isNew());
boolean isNew = isNew();

Check warning on line 228 in client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

client/src/main/java/io/confluent/kafka/schemaregistry/avro/AvroSchema.java#L228

Rename "isNew" which hides the field declared at line 84.
NameValidator nameValidator = isNew
? NameValidator.STRICT_VALIDATOR
: NameValidator.NO_VALIDATION;
Schema.Parser parser = new Schema.Parser(nameValidator);
parser.setValidateDefaults(isNew);
return parser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,24 @@ public static SpecificData getSpecificData() {
}

public static void addLogicalTypeConversion(GenericData avroData) {
avroData.addLogicalTypeConversion(new Conversions.BigDecimalConversion());
avroData.addLogicalTypeConversion(new Conversions.DecimalConversion());
avroData.addLogicalTypeConversion(new Conversions.DurationConversion());
avroData.addLogicalTypeConversion(new Conversions.UUIDConversion());

avroData.addLogicalTypeConversion(new TimeConversions.DateConversion());

avroData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
avroData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
avroData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());

avroData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
avroData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
avroData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());

avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
avroData.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion());
}

private static final EncoderFactory encoderFactory = EncoderFactory.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class Config {
private String alias;
private Boolean normalize;
private Boolean validateFields;
private Boolean validateNames;
private Boolean validateRules;
private String compatibilityLevel;
private String compatibilityPolicy;
Expand All @@ -46,6 +47,7 @@ public class Config {
public Config(@JsonProperty("alias") String alias,
@JsonProperty("normalize") Boolean normalize,
@JsonProperty("validateFields") Boolean validateFields,
@JsonProperty("validateNames") Boolean validateNames,
@JsonProperty("validateRules") Boolean validateRules,
@JsonProperty("compatibilityLevel") String compatibilityLevel,
@JsonProperty("compatibilityPolicy") String compatibilityPolicy,
Expand All @@ -57,6 +59,7 @@ public Config(@JsonProperty("alias") String alias,
this.alias = alias;
this.normalize = normalize;
this.validateFields = validateFields;
this.validateNames = validateNames;
this.validateRules = validateRules;
this.compatibilityLevel = compatibilityLevel;
this.compatibilityPolicy = compatibilityPolicy;
Expand Down Expand Up @@ -100,6 +103,7 @@ public Config(ConfigUpdateRequest request) {
this.alias = request.getAlias();
this.normalize = request.isNormalize();
this.validateFields = request.isValidateFields();
this.validateNames = request.isValidateNames();
this.validateRules = request.isValidateRules();
this.compatibilityLevel = request.getCompatibilityLevel();
this.compatibilityPolicy = request.getCompatibilityPolicy();
Expand Down Expand Up @@ -140,6 +144,16 @@ public void setValidateFields(Boolean validateFields) {
this.validateFields = validateFields;
}

@JsonProperty("validateNames")
public Boolean isValidateNames() {
return validateNames;
}

@JsonProperty("validateNames")
public void setValidateNames(Boolean validateNames) {
this.validateNames = validateNames;
}

@JsonProperty("validateRules")
public Boolean isValidateRules() {
return validateRules;
Expand Down Expand Up @@ -244,6 +258,7 @@ public boolean equals(Object o) {
return Objects.equals(alias, config.alias)
&& Objects.equals(normalize, config.normalize)
&& Objects.equals(validateFields, config.validateFields)
&& Objects.equals(validateNames, config.validateNames)
&& Objects.equals(validateRules, config.validateRules)
&& Objects.equals(compatibilityLevel, config.compatibilityLevel)
&& Objects.equals(compatibilityPolicy, config.compatibilityPolicy)
Expand All @@ -256,7 +271,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(alias, normalize, validateFields, validateRules,
return Objects.hash(alias, normalize, validateFields, validateNames, validateRules,
compatibilityLevel, compatibilityPolicy, compatibilityGroup,
defaultMetadata, overrideMetadata, defaultRuleSet, overrideRuleSet);
}
Expand All @@ -267,6 +282,7 @@ public String toString() {
+ "alias='" + alias + '\''
+ ", normalize=" + normalize
+ ", validateFields=" + validateFields
+ ", validateNames=" + validateNames
+ ", validateRules=" + validateRules
+ ", compatibilityLevel='" + compatibilityLevel + '\''
+ ", compatibilityPolicy='" + compatibilityPolicy + '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
private Optional<String> alias;
private Optional<Boolean> normalize;
private Optional<Boolean> validateFields;
private Optional<Boolean> validateNames;
private Optional<Boolean> validateRules;
private Optional<String> compatibilityLevel;
private Optional<String> compatibilityPolicy;
Expand All @@ -55,6 +56,7 @@
setAlias(config.getAlias());
setNormalize(config.isNormalize());
setValidateFields(config.isValidateFields());
setValidateNames(config.isValidateNames());
setValidateRules(config.isValidateRules());
setCompatibilityLevel(config.getCompatibilityLevel());
setCompatibilityPolicy(config.getCompatibilityPolicy());
Expand Down Expand Up @@ -129,6 +131,26 @@
this.validateFields = validateFields != null ? Optional.of(validateFields) : null;
}

@JsonProperty("validateNames")
public Optional<Boolean> isOptionalValidateNames() {
return validateNames;
}

@JsonIgnore
public Boolean isValidateNames() {
return validateNames != null ? validateNames.orElse(null) : null;

Check warning on line 141 in client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java

View check run for this annotation

SonarQube-Confluent / schema-registry Sonarqube Results

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/ConfigUpdateRequest.java#L141

Ensure this "Optional" could never be null and remove this null-check.
}

@JsonProperty("validateNames")
public void setValidateNames(Optional<Boolean> validateNames) {
this.validateNames = validateNames;
}

@JsonIgnore
public void setValidateNames(Boolean validateNames) {
this.validateNames = validateNames != null ? Optional.of(validateNames) : null;
}

@JsonProperty("validateRules")
public Optional<Boolean> isOptionalValidateRules() {
return validateRules;
Expand Down Expand Up @@ -311,6 +333,7 @@
return Objects.equals(alias, that.alias)
&& Objects.equals(normalize, that.normalize)
&& Objects.equals(validateFields, that.validateFields)
&& Objects.equals(validateNames, that.validateNames)
&& Objects.equals(validateRules, that.validateRules)
&& Objects.equals(compatibilityLevel, that.compatibilityLevel)
&& Objects.equals(compatibilityPolicy, that.compatibilityPolicy)
Expand All @@ -323,7 +346,7 @@

@Override
public int hashCode() {
return Objects.hash(alias, normalize, validateFields, validateRules,
return Objects.hash(alias, normalize, validateFields, validateNames, validateRules,
compatibilityLevel, compatibilityPolicy, compatibilityGroup,
defaultMetadata, overrideMetadata, defaultRuleSet, overrideRuleSet);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ public class SchemaRegistryConfig extends RestConfig {
public static final String SCHEMA_VALIDATE_FIELDS_CONFIG = "schema.validate.fields";
public static final boolean SCHEMA_VALIDATE_FIELDS_DEFAULT = false;

public static final String SCHEMA_VALIDATE_NAMES_CONFIG = "schema.validate.names";
public static final boolean SCHEMA_VALIDATE_NAMES_DEFAULT = true;

/**
* <code>schema.cache.size</code>
*/
Expand Down Expand Up @@ -407,6 +410,8 @@ public class SchemaRegistryConfig extends RestConfig {
+ "enabled or not. If enabled, it checks whether any top level fields conflict with the "
+ "reserved fields in metadata. It also checks for the presence of any field names "
+ "beginning with $$";
protected static final String VALIDATE_NAMES_DOC = "Determines whether name validation is "
+ "enabled or not. If enabled, it validates both namespaces and names in Avro.";
protected static final String SCHEMA_CACHE_SIZE_DOC =
"The maximum size of the schema cache.";
protected static final String SCHEMA_CACHE_EXPIRY_SECS_DOC =
Expand Down Expand Up @@ -633,6 +638,9 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
.define(SCHEMA_VALIDATE_FIELDS_CONFIG, ConfigDef.Type.BOOLEAN, SCHEMA_VALIDATE_FIELDS_DEFAULT,
ConfigDef.Importance.LOW, VALIDATE_FIELDS_DOC
)
.define(SCHEMA_VALIDATE_NAMES_CONFIG, ConfigDef.Type.BOOLEAN, SCHEMA_VALIDATE_NAMES_DEFAULT,
ConfigDef.Importance.LOW, VALIDATE_NAMES_DOC
)
.define(SCHEMA_CACHE_SIZE_CONFIG, ConfigDef.Type.INT, SCHEMA_CACHE_SIZE_DEFAULT,
ConfigDef.Importance.LOW, SCHEMA_CACHE_SIZE_DOC
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public abstract class AbstractSchemaRegistry implements SchemaRegistry,
protected final LoadingCache<RawSchema, ParsedSchema> oldSchemaCache;
protected final CompatibilityLevel defaultCompatibilityLevel;
protected final boolean defaultValidateFields;
protected final boolean defaultValidateNames;
protected final Mode defaultMode;
protected final int schemaSearchDefaultLimit;
protected final int schemaSearchMaxLimit;
Expand Down Expand Up @@ -170,6 +171,8 @@ protected AbstractSchemaRegistry(SchemaRegistryConfig config, MetricsContainer m
this.defaultCompatibilityLevel = config.compatibilityType();
this.defaultValidateFields =
config.getBoolean(SchemaRegistryConfig.SCHEMA_VALIDATE_FIELDS_CONFIG);
this.defaultValidateNames =
config.getBoolean(SchemaRegistryConfig.SCHEMA_VALIDATE_NAMES_CONFIG);
this.defaultMode = Mode.READWRITE;
this.schemaSearchDefaultLimit =
config.getInt(SchemaRegistryConfig.SCHEMA_SEARCH_DEFAULT_LIMIT_CONFIG);
Expand Down Expand Up @@ -508,10 +511,14 @@ protected void logSchemaOp(Schema schema, String operation) {
tenant(), schema.getId(), schema.getSubject(), operation);
}

private boolean isSchemaFieldValidationEnabled(Config config) {
protected boolean isSchemaFieldValidationEnabled(Config config) {
return config.isValidateFields() != null ? config.isValidateFields() : defaultValidateFields;
}

protected boolean isSchemaNameValidationEnabled(Config config) {
return config.isValidateNames() != null ? config.isValidateNames() : defaultValidateNames;
}

private ParsedSchema maybeValidateAndNormalizeSchema(ParsedSchema parsedSchema,
Schema schema,
Config config,
Expand Down Expand Up @@ -1262,7 +1269,8 @@ public List<String> isCompatible(String subject,
}

Config config = getConfigInScope(subject);
ParsedSchema parsedSchema = canonicalizeSchema(newSchema, config, true, normalize);
boolean doValidation = isSchemaNameValidationEnabled(config);
ParsedSchema parsedSchema = canonicalizeSchema(newSchema, config, doValidation, normalize);
if (parsedSchema == null) {
log.error("Empty schema");
throw new InvalidSchemaException("Empty schema");
Expand Down
Loading
Loading