diff --git a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala index 65274b477..4fd56f4bd 100644 --- a/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala +++ b/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala @@ -133,9 +133,13 @@ class DataFrameValueWriter(writeUnknownTypes: Boolean = false) extends Filtering generator.writeBeginArray() if (value != null) { value.foreach { v => - val result = write(schema, v, generator) - if (!result.isSuccesful()) { - return handleUnknown(value, generator) + if (v == null) { + generator.writeNull() + } else { + val result = write(schema, v, generator) + if (!result.isSuccesful()) { + return handleUnknown(value, generator) + } } } } @@ -159,9 +163,13 @@ class DataFrameValueWriter(writeUnknownTypes: Boolean = false) extends Filtering for ((k, v) <- value) { if (shouldKeep(generator.getParentPath(), k.toString())) { generator.writeFieldName(k.toString) - val result = write(schema.valueType, v, generator) - if (!result.isSuccesful()) { - return handleUnknown(v, generator) + if (v == null) { + generator.writeNull() + } else { + val result = write(schema.valueType, v, generator) + if (!result.isSuccesful()) { + return handleUnknown(v, generator) + } } } } diff --git a/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/DataFrameValueWriterTest.scala b/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/DataFrameValueWriterTest.scala index 2ddc0ae4f..a3925256c 100644 --- a/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/DataFrameValueWriterTest.scala +++ b/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/DataFrameValueWriterTest.scala @@ -156,4 +156,31 @@ class DataFrameValueWriterTest { } } + @Test + def testNullStructInArray(): Unit = { + val schema = StructType(Seq(StructField("s", ArrayType(StructType(Seq(StructField("a", StringType))))))) + val row = Row(Array(null)) + assertEquals("""{"s":[null]}""", serialize(row, schema)) + } + + @Test + def testNullStructInMap(): Unit = { + val schema = StructType(Seq(StructField("s", MapType(StringType, StructType(Seq(StructField("b", StringType))))))) + val row = Row(Map("a" -> null)) + assertEquals("""{"s":{"a":null}}""", serialize(row, schema)) + } + + @Test + def testNullNestedArray(): Unit = { + val schema = StructType(Seq(StructField("s", ArrayType(ArrayType(StringType))))) + val row = Row(Array(null)) + assertEquals("""{"s":[null]}""", serialize(row, schema)) + } + + @Test + def testNullNestedMap(): Unit = { + val schema = StructType(Seq(StructField("s", ArrayType(MapType(StringType, StringType))))) + val row = Row(Array(null)) + assertEquals("""{"s":[null]}""", serialize(row, schema)) + } } \ No newline at end of file