Skip to content

Commit efe6029

Browse files
authoredJul 10, 2021
KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message (apache#9541)
The following error message `org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type INT64: class java.lang.Long for field: "moderate_time"` can be confusing because java.lang.Long is acceptable type for schema INT64. In fact, in this case `org.apache.kafka.connect.data.Timestamp` is used but this info is not logged. Reviewers: Randall Hauch <rhauch@gmail.com>, Chris Egerton <chrise@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
1 parent d24094b commit efe6029

File tree

2 files changed

+31
-13
lines changed

2 files changed

+31
-13
lines changed
 

‎connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,6 @@ public static void validateValue(String name, Schema schema, Object value) {
222222
}
223223

224224
List<Class<?>> expectedClasses = expectedClassesFor(schema);
225-
226-
if (expectedClasses == null)
227-
throw new DataException("Invalid Java object for schema type " + schema.type()
228-
+ ": " + value.getClass()
229-
+ " for field: \"" + name + "\"");
230-
231225
boolean foundMatch = false;
232226
for (Class<?> expectedClass : expectedClasses) {
233227
if (expectedClass.isInstance(value)) {
@@ -236,10 +230,17 @@ public static void validateValue(String name, Schema schema, Object value) {
236230
}
237231
}
238232

239-
if (!foundMatch)
240-
throw new DataException("Invalid Java object for schema type " + schema.type()
241-
+ ": " + value.getClass()
242-
+ " for field: \"" + name + "\"");
233+
if (!foundMatch) {
234+
StringBuilder exceptionMessage = new StringBuilder("Invalid Java object for schema");
235+
if (schema.name() != null) {
236+
exceptionMessage.append(" \"").append(schema.name()).append("\"");
237+
}
238+
exceptionMessage.append(" with type ").append(schema.type()).append(": ").append(value.getClass());
239+
if (name != null) {
240+
exceptionMessage.append(" for field: \"").append(name).append("\"");
241+
}
242+
throw new DataException(exceptionMessage.toString());
243+
}
243244

244245
switch (schema.type()) {
245246
case STRUCT:
@@ -266,7 +267,7 @@ public static void validateValue(String name, Schema schema, Object value) {
266267
private static List<Class<?>> expectedClassesFor(Schema schema) {
267268
List<Class<?>> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
268269
if (expectedClasses == null)
269-
expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
270+
expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), Collections.emptyList());
270271
return expectedClasses;
271272
}
272273

‎connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -311,13 +311,30 @@ public void testValidateFieldWithInvalidValueType() {
311311

312312
Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
313313
fakeSchema, new Object()));
314-
assertEquals("Invalid Java object for schema type null: class java.lang.Object for field: \"field\"",
314+
assertEquals("Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\"",
315315
e.getMessage());
316316

317317
e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
318318
Schema.INT8_SCHEMA, new Object()));
319-
assertEquals("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\"",
319+
assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\"",
320320
e.getMessage());
321+
322+
e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object()));
323+
assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object", e.getMessage());
324+
}
325+
326+
@Test
327+
public void testValidateFieldWithInvalidValueMismatchTimestamp() {
328+
String fieldName = "field";
329+
long longValue = 1000L;
330+
331+
// Does not throw
332+
ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);
333+
334+
Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
335+
Timestamp.SCHEMA, longValue));
336+
assertEquals("Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " +
337+
"with type INT64: class java.lang.Long for field: \"field\"", e.getMessage());
321338
}
322339

323340
@Test

0 commit comments

Comments
 (0)
Please sign in to comment.