From 67b931376aae541267d366b63a78655716bc2581 Mon Sep 17 00:00:00 2001 From: Mark Tranter Date: Sun, 4 Sep 2016 14:32:11 +0100 Subject: [PATCH] Handle nullable string cases --- .../kafka/connect/cassandra/package.scala | 3 ++- .../kafka/connect/cassandra/SchemaSpec.scala | 25 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala b/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala index 30c684e..5d0bad6 100644 --- a/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala +++ b/src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala @@ -86,7 +86,8 @@ package object cassandra { def convert(schema: Schema, result: Struct, col: String): AnyRef = schema.field(col).schema match { case x if x.`type`() == Schema.STRING_SCHEMA.`type`() => - s"'${result.get(col).toString}'" + val fieldValue = result.get(col) + if(fieldValue != null) s"'${fieldValue.toString}'" else "null" case x if x.name() == Timestamp.LOGICAL_NAME => val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate]) s"$time" diff --git a/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala b/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala index 93ce833..c02d66b 100644 --- a/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala +++ b/src/test/scala/com/tuplejump/kafka/connect/cassandra/SchemaSpec.scala @@ -64,6 +64,31 @@ class SchemaSpec extends AbstractFlatSpec { query.cql should be("INSERT INTO keyspacex.tablex(available,name,age) VALUES(false,'user',15)") } + it should "convert a struct schema with multiple fields including nullable string field" in { + val topic = "test_kfk" + val sc = sinkConfig(topic, "keyspacex", "tablex", List("available", "name", "type", "age")) + + val nullableStringSchema = SchemaBuilder.string().optional().build() + + val schema = SchemaBuilder.struct.name("record").version(1) + .field("available", Schema.BOOLEAN_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("type", nullableStringSchema) + .field("age", Schema.INT32_SCHEMA).build + + val value = new Struct(schema).put("name", "user").put("available", false).put("age", 15).put("type", null) + val record = new SinkRecord("test_kfk", 1, SchemaBuilder.struct.build, "key", schema, value, 0) + + schema.asColumnNames should be (sc.schema.columnNames) + + sc.schema.route.topic should be (record.topic) + sc.schema is record should be (true) + + sc.query.cql should be ("INSERT INTO keyspacex.tablex(available,name,type,age) VALUES(?,?,?,?)") + val query = record.as(sc.schema.namespace) + query.cql should be("INSERT INTO keyspacex.tablex(available,name,type,age) VALUES(false,'user',null,15)") + } + it should "convert cassandra column defs to a source schema" in { val colDef = Map( "id" -> DataType.cint(),