diff options
author | Wenchen Fan <wenchen@databricks.com> | 2015-11-10 11:21:31 -0800 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-11-10 11:21:31 -0800 |
commit | 53600854c270d4c953fe95fbae528740b5cf6603 (patch) | |
tree | 5bccb351008aff9c6b5182fdf22fbf6f77d69b88 | |
parent | dfcfcbcc0448ebc6f02eba6bf0495832a321c87e (diff) | |
download | spark-53600854c270d4c953fe95fbae528740b5cf6603.tar.gz spark-53600854c270d4c953fe95fbae528740b5cf6603.tar.bz2 spark-53600854c270d4c953fe95fbae528740b5cf6603.zip |
[SPARK-11590][SQL] use native json_tuple in lateral view
Author: Wenchen Fan <wenchen@databricks.com>
Closes #9562 from cloud-fan/json-tuple.
8 files changed, 104 insertions, 40 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 8c9853e628..8cd73236a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -314,7 +314,7 @@ case class GetJsonObject(json: Expression, path: Expression) } case class JsonTuple(children: Seq[Expression]) - extends Expression with CodegenFallback { + extends Generator with CodegenFallback { import SharedFactory._ @@ -324,8 +324,8 @@ case class JsonTuple(children: Seq[Expression]) } // if processing fails this shared value will be returned - @transient private lazy val nullRow: InternalRow = - new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) + @transient private lazy val nullRow: Seq[InternalRow] = + new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil // the json body is the first child @transient private lazy val jsonExpr: Expression = children.head @@ -344,15 +344,8 @@ case class JsonTuple(children: Seq[Expression]) // and count the number of foldable fields, we'll use this later to optimize evaluation @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) - override lazy val dataType: StructType = { - val fields = fieldExpressions.zipWithIndex.map { - case (_, idx) => StructField( - name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize - dataType = StringType, - nullable = true) - } - - StructType(fields) + override def elementTypes: Seq[(DataType, Boolean, String)] = fieldExpressions.zipWithIndex.map { + case (_, idx) => (StringType, true, s"c$idx") } override def prettyName: String = "json_tuple" @@ -367,7 +360,7 @@ case class JsonTuple(children: Seq[Expression]) } } - override def eval(input: InternalRow): InternalRow = { + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { val json = jsonExpr.eval(input).asInstanceOf[UTF8String] if (json == null) { return nullRow @@ -383,7 +376,7 @@ case class JsonTuple(children: Seq[Expression]) } } - private def parseRow(parser: JsonParser, input: InternalRow): InternalRow = { + private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = { // only objects are supported if (parser.nextToken() != JsonToken.START_OBJECT) { return nullRow @@ -433,7 +426,7 @@ case class JsonTuple(children: Seq[Expression]) parser.skipChildren() } - new GenericInternalRow(row) + new GenericInternalRow(row) :: Nil } private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index f33125f463..7b754091f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -209,8 +209,12 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal("f5") :: Nil + private def checkJsonTuple(jt: JsonTuple, expected: InternalRow): Unit = { + assert(jt.eval(null).toSeq.head === expected) + } + test("json_tuple - hive key 1") { - checkEvaluation( + checkJsonTuple( JsonTuple( Literal("""{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") :: jsonTupleQuery), @@ -218,7 +222,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("json_tuple - hive key 2") { - checkEvaluation( + checkJsonTuple( JsonTuple( Literal("""{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") :: jsonTupleQuery), @@ -226,7 +230,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("json_tuple - hive key 2 (mix of foldable fields)") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal("""{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") :: Literal("f1") :: NonFoldableLiteral("f2") :: @@ -238,7 +242,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("json_tuple - hive key 3") { - checkEvaluation( + checkJsonTuple( JsonTuple( Literal("""{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") :: jsonTupleQuery), @@ -247,7 +251,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("json_tuple - hive key 3 (nonfoldable json)") { - checkEvaluation( + checkJsonTuple( JsonTuple( NonFoldableLiteral( """{"f1": "value13", "f4": "value44", @@ -258,7 +262,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("json_tuple - hive key 3 (nonfoldable fields)") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal( """{"f1": "value13", "f4": "value44", | "f3": "value33", "f2": 2, "f5": 5.01}""".stripMargin) :: @@ -273,43 +277,43 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("json_tuple - hive key 4 - null json") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal(null) :: jsonTupleQuery), InternalRow.fromSeq(Seq(null, null, null, null, null))) } test("json_tuple - hive key 5 - null and empty fields") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery), InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null))) } test("json_tuple - hive key 6 - invalid json (array)") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery), InternalRow.fromSeq(Seq(null, null, null, null, null))) } test("json_tuple - invalid json (object start only)") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal("{") :: jsonTupleQuery), InternalRow.fromSeq(Seq(null, null, null, null, null))) } test("json_tuple - invalid json (no object end)") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery), InternalRow.fromSeq(Seq(null, null, null, null, null))) } test("json_tuple - invalid json (invalid json)") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal("\\") :: jsonTupleQuery), InternalRow.fromSeq(Seq(null, null, null, null, null))) } test("json_tuple - preserve newlines") { - checkEvaluation( + checkJsonTuple( JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil), InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc")))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3b69247dc5..9368435a63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -750,10 +750,14 @@ class DataFrame private[sql]( // will remove intermediate Alias for ExtractValue chain, and we need to alias it again to // make it a NamedExpression. case Column(u: UnresolvedAttribute) => UnresolvedAlias(u) + case Column(expr: NamedExpression) => expr - // Leave an unaliased explode with an empty list of names since the analyzer will generate the - // correct defaults after the nested expression's type has been resolved. + + // Leave an unaliased generator with an empty list of names since the analyzer will generate + // the correct defaults after the nested expression's type has been resolved. case Column(explode: Explode) => MultiAlias(explode, Nil) + case Column(jt: JsonTuple) => MultiAlias(jt, Nil) + case Column(expr: Expression) => Alias(expr, expr.prettyString)() } Project(namedExpressions.toSeq, logicalPlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 22104e4d48..a59d738010 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2308,6 +2308,18 @@ object functions extends LegacyFunctions { def explode(e: Column): Column = withExpr { Explode(e.expr) } /** + * Creates a new row for a json column according to the given field names. + * + * @group collection_funcs + * @since 1.6.0 + */ + @scala.annotation.varargs + def json_tuple(json: Column, fields: String*): Column = withExpr { + require(fields.length > 0, "at least 1 field name should be given.") + JsonTuple(json.expr +: fields.map(Literal.apply)) + } + + /** * Returns length of array or map. * * @group collection_funcs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index e3531d0d6d..14fd56fc8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -41,23 +41,26 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { test("json_tuple select") { val df: DataFrame = tuples.toDF("key", "jstring") - val expected = Row("1", Row("value1", "value2", "3", null, "5.23")) :: - Row("2", Row("value12", "2", "value3", "4.01", null)) :: - Row("3", Row("value13", "2", "value33", "value44", "5.01")) :: - Row("4", Row(null, null, null, null, null)) :: - Row("5", Row("", null, null, null, null)) :: - Row("6", Row(null, null, null, null, null)) :: + val expected = + Row("1", "value1", "value2", "3", null, "5.23") :: + Row("2", "value12", "2", "value3", "4.01", null) :: + Row("3", "value13", "2", "value33", "value44", "5.01") :: + Row("4", null, null, null, null, null) :: + Row("5", "", null, null, null, null) :: + Row("6", null, null, null, null, null) :: Nil - checkAnswer(df.selectExpr("key", "json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5')"), expected) + checkAnswer( + df.select($"key", functions.json_tuple($"jstring", "f1", "f2", "f3", "f4", "f5")), + expected) } test("json_tuple filter and group") { val df: DataFrame = tuples.toDF("key", "jstring") val expr = df - .selectExpr("json_tuple(jstring, 'f1', 'f2') as jt") - .where($"jt.c0".isNotNull) - .groupBy($"jt.c1") + .select(functions.json_tuple($"jstring", "f1", "f2")) + .where($"c0".isNotNull) + .groupBy($"c1") .count() val expected = Row(null, 1) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 6f8ed413a0..091caab921 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -1821,6 +1821,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } val explode = "(?i)explode".r + val jsonTuple = "(?i)json_tuple".r def nodesToGenerator(nodes: Seq[Node]): (Generator, Seq[String]) = { val function = nodes.head @@ -1833,6 +1834,9 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case Token("TOK_FUNCTION", Token(explode(), Nil) :: child :: Nil) => (Explode(nodeToExpr(child)), attributes) + case Token("TOK_FUNCTION", Token(jsonTuple(), Nil) :: children) => + (JsonTuple(children.map(nodeToExpr)), attributes) + case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) => val functionInfo: FunctionInfo = Option(FunctionRegistry.getFunctionInfo(functionName.toLowerCase)).getOrElse( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 528a7398b1..a330362b4e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.hive import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.spark.sql.catalyst.expressions.JsonTuple +import org.apache.spark.sql.catalyst.plans.logical.Generate import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite @@ -183,4 +185,15 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { assertError("select interval '.1111111111' second", "nanosecond 1111111111 outside range") } + + test("use native json_tuple instead of hive's UDTF in LATERAL VIEW") { + val plan = HiveQl.parseSql( + """ + |SELECT * + |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test + |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b + """.stripMargin) + + assert(plan.children.head.asInstanceOf[Generate].generator.isInstanceOf[JsonTuple]) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 9a425d7f6b..3427152b2d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1448,4 +1448,35 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Row("4", "40") :: Nil) } } + + test("SPARK-11590: use native json_tuple in lateral view") { + checkAnswer(sql( + """ + |SELECT a, b + |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test + |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b + """.stripMargin), Row("value1", "12")) + + // we should use `c0`, `c1`... as the name of fields if no alias is provided, to follow hive. + checkAnswer(sql( + """ + |SELECT c0, c1 + |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test + |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt + """.stripMargin), Row("value1", "12")) + + // we can also use `json_tuple` in project list. + checkAnswer(sql( + """ + |SELECT json_tuple(json, 'f1', 'f2') + |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test + """.stripMargin), Row("value1", "12")) + + // we can also mix `json_tuple` with other project expressions. + checkAnswer(sql( + """ + |SELECT json_tuple(json, 'f1', 'f2'), 3.14, str + |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test + """.stripMargin), Row("value1", "12", 3.14, "hello")) + } } |