diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionUtils.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionUtils.java index 2bad67d426af6..38bdcbec2069d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionUtils.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionUtils.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import org.apache.spark.sql.catalyst.expressions.SharedFactory; import org.apache.spark.sql.catalyst.json.CreateJacksonParser; import org.apache.spark.sql.catalyst.util.GenericArrayData; import org.apache.spark.unsafe.types.UTF8String; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala index 7ff2bfe51729c..c9d15e1eb2e4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/json/JsonExpressionEvalUtils.scala @@ -16,13 +16,16 @@ */ package org.apache.spark.sql.catalyst.expressions.json -import java.io.{ByteArrayOutputStream, CharArrayWriter} +import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter} + +import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ +import com.fasterxml.jackson.core.json.JsonReadFeature import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow, SharedFactory} +import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonGenerator, JacksonParser, JsonInferSchema, JSONOptions} import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, FailureSafeParser, MapData, PermissiveMode} @@ -32,6 +35,79 @@ import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, St import org.apache.spark.unsafe.types.{UTF8String, VariantVal} import org.apache.spark.util.Utils +private[this] sealed trait PathInstruction +private[this] object PathInstruction { + private[expressions] case object Subscript extends PathInstruction + private[expressions] case object Wildcard extends PathInstruction + private[expressions] case object Key extends PathInstruction + private[expressions] case class Index(index: Long) extends PathInstruction + private[expressions] case class Named(name: String) extends PathInstruction +} + +private[this] sealed trait WriteStyle +private[this] object WriteStyle { + private[expressions] case object RawStyle extends WriteStyle + private[expressions] case object QuotedStyle extends WriteStyle + private[expressions] case object FlattenStyle extends WriteStyle +} + +private[this] object JsonPathParser extends RegexParsers { + import PathInstruction._ + + def root: Parser[Char] = '$' + + def long: Parser[Long] = "\\d+".r ^? { + case x => x.toLong + } + + // parse `[*]` and `[123]` subscripts + def subscript: Parser[List[PathInstruction]] = + for { + operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']' + } yield { + Subscript :: operand :: Nil + } + + // parse `.name` or `['name']` child expressions + def named: Parser[List[PathInstruction]] = + for { + name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\']+".r <~ "']" + } yield { + Key :: Named(name) :: Nil + } + + // child wildcards: `..`, `.*` or `['*']` + def wildcard: Parser[List[PathInstruction]] = + (".*" | "['*']") ^^^ List(Wildcard) + + def node: Parser[List[PathInstruction]] = + wildcard | + named | + subscript + + val expression: Parser[List[PathInstruction]] = { + phrase(root ~> rep(node) ^^ (x => x.flatten)) + } + + def parse(str: String): Option[List[PathInstruction]] = { + this.parseAll(expression, str) match { + case Success(result, _) => + Some(result) + + case _ => + None + } + } +} + +private[this] object SharedFactory { + val jsonFactory: JsonFactory = new JsonFactoryBuilder() + // The two options below enabled for Hive compatibility + .enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS) + .enable(JsonReadFeature.ALLOW_SINGLE_QUOTES) + .build() +} + case class JsonToStructsEvaluator( options: Map[String, String], nullableSchema: DataType, @@ -278,3 +354,225 @@ case class JsonTupleEvaluator(foldableFieldNames: Array[Option[String]]) { } } } + +/** + * The expression `GetJsonObject` will utilize it to support codegen. + */ +case class GetJsonObjectEvaluator(cachedPath: UTF8String) { + import com.fasterxml.jackson.core.JsonToken._ + import PathInstruction._ + import SharedFactory._ + import WriteStyle._ + + def this() = this(null) + + @transient + private lazy val parsedPath: Option[List[PathInstruction]] = parsePath(cachedPath) + + @transient + private var jsonStr: UTF8String = _ + + @transient + private var pathStr: UTF8String = _ + + def setJson(arg: UTF8String): Unit = { + jsonStr = arg + } + + def setPath(arg: UTF8String): Unit = { + pathStr = arg + } + + def evaluate(): Any = { + if (jsonStr == null) return null + + val parsed = if (cachedPath != null) { + parsedPath + } else { + parsePath(pathStr) + } + + if (parsed.isDefined) { + try { + /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson + detect character encoding which could fail for some malformed strings */ + Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser => + val output = new ByteArrayOutputStream() + val matched = Utils.tryWithResource( + jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator => + parser.nextToken() + evaluatePath(parser, generator, RawStyle, parsed.get) + } + if (matched) { + UTF8String.fromBytes(output.toByteArray) + } else { + null + } + } + } catch { + case _: JsonProcessingException => null + } + } else { + null + } + } + + private def parsePath(path: UTF8String): Option[List[PathInstruction]] = { + if (path != null) { + JsonPathParser.parse(path.toString) + } else { + None + } + } + + // advance to the desired array index, assumes to start at the START_ARRAY token + private def arrayIndex(p: JsonParser, f: () => Boolean): Long => Boolean = { + case _ if p.getCurrentToken == END_ARRAY => + // terminate, nothing has been written + false + + case 0 => + // we've reached the desired index + val dirty = f() + + while (p.nextToken() != END_ARRAY) { + // advance the token stream to the end of the array + p.skipChildren() + } + + dirty + + case i if i > 0 => + // skip this token and evaluate the next + p.skipChildren() + p.nextToken() + arrayIndex(p, f)(i - 1) + } + + /** + * Evaluate a list of JsonPath instructions, returning a bool that indicates if any leaf nodes + * have been written to the generator + */ + private def evaluatePath( + p: JsonParser, + g: JsonGenerator, + style: WriteStyle, + path: List[PathInstruction]): Boolean = { + (p.getCurrentToken, path) match { + case (VALUE_STRING, Nil) if style == RawStyle => + // there is no array wildcard or slice parent, emit this string without quotes + if (p.hasTextCharacters) { + g.writeRaw(p.getTextCharacters, p.getTextOffset, p.getTextLength) + } else { + g.writeRaw(p.getText) + } + true + + case (START_ARRAY, Nil) if style == FlattenStyle => + // flatten this array into the parent + var dirty = false + while (p.nextToken() != END_ARRAY) { + dirty |= evaluatePath(p, g, style, Nil) + } + dirty + + case (_, Nil) => + // general case: just copy the child tree verbatim + g.copyCurrentStructure(p) + true + + case (START_OBJECT, Key :: xs) => + var dirty = false + while (p.nextToken() != END_OBJECT) { + if (dirty) { + // once a match has been found we can skip other fields + p.skipChildren() + } else { + dirty = evaluatePath(p, g, style, xs) + } + } + dirty + + case (START_ARRAY, Subscript :: Wildcard :: Subscript :: Wildcard :: xs) => + // special handling for the non-structure preserving double wildcard behavior in Hive + var dirty = false + g.writeStartArray() + while (p.nextToken() != END_ARRAY) { + dirty |= evaluatePath(p, g, FlattenStyle, xs) + } + g.writeEndArray() + dirty + + case (START_ARRAY, Subscript :: Wildcard :: xs) if style != QuotedStyle => + // retain Flatten, otherwise use Quoted... cannot use Raw within an array + val nextStyle = style match { + case RawStyle => QuotedStyle + case FlattenStyle => FlattenStyle + case QuotedStyle => throw SparkException.internalError("Unexpected the quoted style.") + } + + // temporarily buffer child matches, the emitted json will need to be + // modified slightly if there is only a single element written + val buffer = new StringWriter() + + var dirty = 0 + Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator => + flattenGenerator.writeStartArray() + + while (p.nextToken() != END_ARRAY) { + // track the number of array elements and only emit an outer array if + // we've written more than one element, this matches Hive's behavior + dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) + } + flattenGenerator.writeEndArray() + } + + val buf = buffer.getBuffer + if (dirty > 1) { + g.writeRawValue(buf.toString) + } else if (dirty == 1) { + // remove outer array tokens + g.writeRawValue(buf.substring(1, buf.length() - 1)) + } // else do not write anything + + dirty > 0 + + case (START_ARRAY, Subscript :: Wildcard :: xs) => + var dirty = false + g.writeStartArray() + while (p.nextToken() != END_ARRAY) { + // wildcards can have multiple matches, continually update the dirty count + dirty |= evaluatePath(p, g, QuotedStyle, xs) + } + g.writeEndArray() + + dirty + + case (START_ARRAY, Subscript :: Index(idx) :: (xs@Subscript :: Wildcard :: _)) => + p.nextToken() + // we're going to have 1 or more results, switch to QuotedStyle + arrayIndex(p, () => evaluatePath(p, g, QuotedStyle, xs))(idx) + + case (START_ARRAY, Subscript :: Index(idx) :: xs) => + p.nextToken() + arrayIndex(p, () => evaluatePath(p, g, style, xs))(idx) + + case (FIELD_NAME, Named(name) :: xs) if p.currentName == name => + // exact field match + if (p.nextToken() != JsonToken.VALUE_NULL) { + evaluatePath(p, g, style, xs) + } else { + false + } + + case (FIELD_NAME, Wildcard :: xs) => + // wildcard field match + p.nextToken() + evaluatePath(p, g, style, xs) + + case _ => + p.skipChildren() + false + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 93eda307ec9c1..e80f543f14eda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -17,20 +17,12 @@ package org.apache.spark.sql.catalyst.expressions -import java.io._ - -import scala.util.parsing.combinator.RegexParsers - -import com.fasterxml.jackson.core._ -import com.fasterxml.jackson.core.json.JsonReadFeature - -import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper -import org.apache.spark.sql.catalyst.expressions.json.{JsonExpressionUtils, JsonToStructsEvaluator, JsonTupleEvaluator, SchemaOfJsonEvaluator, StructsToJsonEvaluator} +import org.apache.spark.sql.catalyst.expressions.json.{GetJsonObjectEvaluator, JsonExpressionUtils, JsonToStructsEvaluator, JsonTupleEvaluator, SchemaOfJsonEvaluator, StructsToJsonEvaluator} import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke} import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.trees.TreePattern.{JSON_TO_STRUCT, RUNTIME_REPLACEABLE, TreePattern} @@ -39,80 +31,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.types.StringTypeWithCollation import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.Utils - -private[this] sealed trait PathInstruction -private[this] object PathInstruction { - private[expressions] case object Subscript extends PathInstruction - private[expressions] case object Wildcard extends PathInstruction - private[expressions] case object Key extends PathInstruction - private[expressions] case class Index(index: Long) extends PathInstruction - private[expressions] case class Named(name: String) extends PathInstruction -} - -private[this] sealed trait WriteStyle -private[this] object WriteStyle { - private[expressions] case object RawStyle extends WriteStyle - private[expressions] case object QuotedStyle extends WriteStyle - private[expressions] case object FlattenStyle extends WriteStyle -} - -private[this] object JsonPathParser extends RegexParsers { - import PathInstruction._ - - def root: Parser[Char] = '$' - - def long: Parser[Long] = "\\d+".r ^? { - case x => x.toLong - } - - // parse `[*]` and `[123]` subscripts - def subscript: Parser[List[PathInstruction]] = - for { - operand <- '[' ~> ('*' ^^^ Wildcard | long ^^ Index) <~ ']' - } yield { - Subscript :: operand :: Nil - } - - // parse `.name` or `['name']` child expressions - def named: Parser[List[PathInstruction]] = - for { - name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\']+".r <~ "']" - } yield { - Key :: Named(name) :: Nil - } - - // child wildcards: `..`, `.*` or `['*']` - def wildcard: Parser[List[PathInstruction]] = - (".*" | "['*']") ^^^ List(Wildcard) - - def node: Parser[List[PathInstruction]] = - wildcard | - named | - subscript - - val expression: Parser[List[PathInstruction]] = { - phrase(root ~> rep(node) ^^ (x => x.flatten)) - } - - def parse(str: String): Option[List[PathInstruction]] = { - this.parseAll(expression, str) match { - case Success(result, _) => - Some(result) - - case _ => - None - } - } -} - -private[expressions] object SharedFactory { - val jsonFactory = new JsonFactoryBuilder() - // The two options below enabled for Hive compatibility - .enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS) - .enable(JsonReadFeature.ALLOW_SINGLE_QUOTES) - .build() -} /** * Extracts json object from a json string based on json path specified, and returns json string @@ -213,228 +131,6 @@ case class GetJsonObject(json: Expression, path: Expression) copy(json = newLeft, path = newRight) } -class GetJsonObjectEvaluator(cachedPath: UTF8String) { - import com.fasterxml.jackson.core.JsonToken._ - import PathInstruction._ - import SharedFactory._ - import WriteStyle._ - - def this() = this(null) - - @transient - private lazy val parsedPath: Option[List[PathInstruction]] = - parsePath(cachedPath) - - @transient - private var jsonStr: UTF8String = null - - @transient - private var pathStr: UTF8String = null - - def setJson(arg: UTF8String): Unit = { - jsonStr = arg - } - - def setPath(arg: UTF8String): Unit = { - pathStr = arg - } - - def evaluate(): Any = { - if (jsonStr == null) { - return null - } - - val parsed = if (cachedPath != null) { - parsedPath - } else { - parsePath(pathStr) - } - - if (parsed.isDefined) { - try { - /* We know the bytes are UTF-8 encoded. Pass a Reader to avoid having Jackson - detect character encoding which could fail for some malformed strings */ - Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, jsonStr)) { parser => - val output = new ByteArrayOutputStream() - val matched = Utils.tryWithResource( - jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator => - parser.nextToken() - evaluatePath(parser, generator, RawStyle, parsed.get) - } - if (matched) { - UTF8String.fromBytes(output.toByteArray) - } else { - null - } - } - } catch { - case _: JsonProcessingException => null - } - } else { - null - } - } - - private def parsePath(path: UTF8String): Option[List[PathInstruction]] = { - if (path != null) { - JsonPathParser.parse(path.toString) - } else { - None - } - } - - // advance to the desired array index, assumes to start at the START_ARRAY token - private def arrayIndex(p: JsonParser, f: () => Boolean): Long => Boolean = { - case _ if p.getCurrentToken == END_ARRAY => - // terminate, nothing has been written - false - - case 0 => - // we've reached the desired index - val dirty = f() - - while (p.nextToken() != END_ARRAY) { - // advance the token stream to the end of the array - p.skipChildren() - } - - dirty - - case i if i > 0 => - // skip this token and evaluate the next - p.skipChildren() - p.nextToken() - arrayIndex(p, f)(i - 1) - } - - /** - * Evaluate a list of JsonPath instructions, returning a bool that indicates if any leaf nodes - * have been written to the generator - */ - private def evaluatePath( - p: JsonParser, - g: JsonGenerator, - style: WriteStyle, - path: List[PathInstruction]): Boolean = { - (p.getCurrentToken, path) match { - case (VALUE_STRING, Nil) if style == RawStyle => - // there is no array wildcard or slice parent, emit this string without quotes - if (p.hasTextCharacters) { - g.writeRaw(p.getTextCharacters, p.getTextOffset, p.getTextLength) - } else { - g.writeRaw(p.getText) - } - true - - case (START_ARRAY, Nil) if style == FlattenStyle => - // flatten this array into the parent - var dirty = false - while (p.nextToken() != END_ARRAY) { - dirty |= evaluatePath(p, g, style, Nil) - } - dirty - - case (_, Nil) => - // general case: just copy the child tree verbatim - g.copyCurrentStructure(p) - true - - case (START_OBJECT, Key :: xs) => - var dirty = false - while (p.nextToken() != END_OBJECT) { - if (dirty) { - // once a match has been found we can skip other fields - p.skipChildren() - } else { - dirty = evaluatePath(p, g, style, xs) - } - } - dirty - - case (START_ARRAY, Subscript :: Wildcard :: Subscript :: Wildcard :: xs) => - // special handling for the non-structure preserving double wildcard behavior in Hive - var dirty = false - g.writeStartArray() - while (p.nextToken() != END_ARRAY) { - dirty |= evaluatePath(p, g, FlattenStyle, xs) - } - g.writeEndArray() - dirty - - case (START_ARRAY, Subscript :: Wildcard :: xs) if style != QuotedStyle => - // retain Flatten, otherwise use Quoted... cannot use Raw within an array - val nextStyle = style match { - case RawStyle => QuotedStyle - case FlattenStyle => FlattenStyle - case QuotedStyle => throw SparkException.internalError("Unexpected the quoted style.") - } - - // temporarily buffer child matches, the emitted json will need to be - // modified slightly if there is only a single element written - val buffer = new StringWriter() - - var dirty = 0 - Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator => - flattenGenerator.writeStartArray() - - while (p.nextToken() != END_ARRAY) { - // track the number of array elements and only emit an outer array if - // we've written more than one element, this matches Hive's behavior - dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) - } - flattenGenerator.writeEndArray() - } - - val buf = buffer.getBuffer - if (dirty > 1) { - g.writeRawValue(buf.toString) - } else if (dirty == 1) { - // remove outer array tokens - g.writeRawValue(buf.substring(1, buf.length() - 1)) - } // else do not write anything - - dirty > 0 - - case (START_ARRAY, Subscript :: Wildcard :: xs) => - var dirty = false - g.writeStartArray() - while (p.nextToken() != END_ARRAY) { - // wildcards can have multiple matches, continually update the dirty count - dirty |= evaluatePath(p, g, QuotedStyle, xs) - } - g.writeEndArray() - - dirty - - case (START_ARRAY, Subscript :: Index(idx) :: (xs@Subscript :: Wildcard :: _)) => - p.nextToken() - // we're going to have 1 or more results, switch to QuotedStyle - arrayIndex(p, () => evaluatePath(p, g, QuotedStyle, xs))(idx) - - case (START_ARRAY, Subscript :: Index(idx) :: xs) => - p.nextToken() - arrayIndex(p, () => evaluatePath(p, g, style, xs))(idx) - - case (FIELD_NAME, Named(name) :: xs) if p.currentName == name => - // exact field match - if (p.nextToken() != JsonToken.VALUE_NULL) { - evaluatePath(p, g, style, xs) - } else { - false - } - - case (FIELD_NAME, Wildcard :: xs) => - // wildcard field match - p.nextToken() - evaluatePath(p, g, style, xs) - - case _ => - p.skipChildren() - false - } - } -} - // scalastyle:off line.size.limit line.contains.tab @ExpressionDescription( usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.",