aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-11-10 11:21:31 -0800
committerYin Huai <yhuai@databricks.com>2015-11-10 11:21:31 -0800
commit53600854c270d4c953fe95fbae528740b5cf6603 (patch)
tree5bccb351008aff9c6b5182fdf22fbf6f77d69b88 /sql
parentdfcfcbcc0448ebc6f02eba6bf0495832a321c87e (diff)
downloadspark-53600854c270d4c953fe95fbae528740b5cf6603.tar.gz
spark-53600854c270d4c953fe95fbae528740b5cf6603.tar.bz2
spark-53600854c270d4c953fe95fbae528740b5cf6603.zip
[SPARK-11590][SQL] use native json_tuple in lateral view
Author: Wenchen Fan <wenchen@databricks.com> Closes #9562 from cloud-fan/json-tuple.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala23
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala23
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala31
8 files changed, 104 insertions, 40 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 8c9853e628..8cd73236a7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -314,7 +314,7 @@ case class GetJsonObject(json: Expression, path: Expression)
}
case class JsonTuple(children: Seq[Expression])
- extends Expression with CodegenFallback {
+ extends Generator with CodegenFallback {
import SharedFactory._
@@ -324,8 +324,8 @@ case class JsonTuple(children: Seq[Expression])
}
// if processing fails this shared value will be returned
- @transient private lazy val nullRow: InternalRow =
- new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+ @transient private lazy val nullRow: Seq[InternalRow] =
+ new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) :: Nil
// the json body is the first child
@transient private lazy val jsonExpr: Expression = children.head
@@ -344,15 +344,8 @@ case class JsonTuple(children: Seq[Expression])
// and count the number of foldable fields, we'll use this later to optimize evaluation
@transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null)
- override lazy val dataType: StructType = {
- val fields = fieldExpressions.zipWithIndex.map {
- case (_, idx) => StructField(
- name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
- dataType = StringType,
- nullable = true)
- }
-
- StructType(fields)
+ override def elementTypes: Seq[(DataType, Boolean, String)] = fieldExpressions.zipWithIndex.map {
+ case (_, idx) => (StringType, true, s"c$idx")
}
override def prettyName: String = "json_tuple"
@@ -367,7 +360,7 @@ case class JsonTuple(children: Seq[Expression])
}
}
- override def eval(input: InternalRow): InternalRow = {
+ override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
if (json == null) {
return nullRow
@@ -383,7 +376,7 @@ case class JsonTuple(children: Seq[Expression])
}
}
- private def parseRow(parser: JsonParser, input: InternalRow): InternalRow = {
+ private def parseRow(parser: JsonParser, input: InternalRow): Seq[InternalRow] = {
// only objects are supported
if (parser.nextToken() != JsonToken.START_OBJECT) {
return nullRow
@@ -433,7 +426,7 @@ case class JsonTuple(children: Seq[Expression])
parser.skipChildren()
}
- new GenericInternalRow(row)
+ new GenericInternalRow(row) :: Nil
}
private def copyCurrentStructure(generator: JsonGenerator, parser: JsonParser): Unit = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index f33125f463..7b754091f4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -209,8 +209,12 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
Literal("f5") ::
Nil
+ private def checkJsonTuple(jt: JsonTuple, expected: InternalRow): Unit = {
+ assert(jt.eval(null).toSeq.head === expected)
+ }
+
test("json_tuple - hive key 1") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(
Literal("""{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
jsonTupleQuery),
@@ -218,7 +222,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("json_tuple - hive key 2") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(
Literal("""{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
jsonTupleQuery),
@@ -226,7 +230,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("json_tuple - hive key 2 (mix of foldable fields)") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal("""{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
Literal("f1") ::
NonFoldableLiteral("f2") ::
@@ -238,7 +242,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("json_tuple - hive key 3") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(
Literal("""{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") ::
jsonTupleQuery),
@@ -247,7 +251,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("json_tuple - hive key 3 (nonfoldable json)") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(
NonFoldableLiteral(
"""{"f1": "value13", "f4": "value44",
@@ -258,7 +262,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("json_tuple - hive key 3 (nonfoldable fields)") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal(
"""{"f1": "value13", "f4": "value44",
| "f3": "value33", "f2": 2, "f5": 5.01}""".stripMargin) ::
@@ -273,43 +277,43 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("json_tuple - hive key 4 - null json") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal(null) :: jsonTupleQuery),
InternalRow.fromSeq(Seq(null, null, null, null, null)))
}
test("json_tuple - hive key 5 - null and empty fields") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal("""{"f1": "", "f5": null}""") :: jsonTupleQuery),
InternalRow.fromSeq(Seq(UTF8String.fromString(""), null, null, null, null)))
}
test("json_tuple - hive key 6 - invalid json (array)") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal("[invalid JSON string]") :: jsonTupleQuery),
InternalRow.fromSeq(Seq(null, null, null, null, null)))
}
test("json_tuple - invalid json (object start only)") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal("{") :: jsonTupleQuery),
InternalRow.fromSeq(Seq(null, null, null, null, null)))
}
test("json_tuple - invalid json (no object end)") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal("""{"foo": "bar"""") :: jsonTupleQuery),
InternalRow.fromSeq(Seq(null, null, null, null, null)))
}
test("json_tuple - invalid json (invalid json)") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal("\\") :: jsonTupleQuery),
InternalRow.fromSeq(Seq(null, null, null, null, null)))
}
test("json_tuple - preserve newlines") {
- checkEvaluation(
+ checkJsonTuple(
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3b69247dc5..9368435a63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -750,10 +750,14 @@ class DataFrame private[sql](
// will remove intermediate Alias for ExtractValue chain, and we need to alias it again to
// make it a NamedExpression.
case Column(u: UnresolvedAttribute) => UnresolvedAlias(u)
+
case Column(expr: NamedExpression) => expr
- // Leave an unaliased explode with an empty list of names since the analyzer will generate the
- // correct defaults after the nested expression's type has been resolved.
+
+ // Leave an unaliased generator with an empty list of names since the analyzer will generate
+ // the correct defaults after the nested expression's type has been resolved.
case Column(explode: Explode) => MultiAlias(explode, Nil)
+ case Column(jt: JsonTuple) => MultiAlias(jt, Nil)
+
case Column(expr: Expression) => Alias(expr, expr.prettyString)()
}
Project(namedExpressions.toSeq, logicalPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 22104e4d48..a59d738010 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2308,6 +2308,18 @@ object functions extends LegacyFunctions {
def explode(e: Column): Column = withExpr { Explode(e.expr) }
/**
+ * Creates a new row for a json column according to the given field names.
+ *
+ * @group collection_funcs
+ * @since 1.6.0
+ */
+ @scala.annotation.varargs
+ def json_tuple(json: Column, fields: String*): Column = withExpr {
+ require(fields.length > 0, "at least 1 field name should be given.")
+ JsonTuple(json.expr +: fields.map(Literal.apply))
+ }
+
+ /**
* Returns length of array or map.
*
* @group collection_funcs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index e3531d0d6d..14fd56fc8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -41,23 +41,26 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
test("json_tuple select") {
val df: DataFrame = tuples.toDF("key", "jstring")
- val expected = Row("1", Row("value1", "value2", "3", null, "5.23")) ::
- Row("2", Row("value12", "2", "value3", "4.01", null)) ::
- Row("3", Row("value13", "2", "value33", "value44", "5.01")) ::
- Row("4", Row(null, null, null, null, null)) ::
- Row("5", Row("", null, null, null, null)) ::
- Row("6", Row(null, null, null, null, null)) ::
+ val expected =
+ Row("1", "value1", "value2", "3", null, "5.23") ::
+ Row("2", "value12", "2", "value3", "4.01", null) ::
+ Row("3", "value13", "2", "value33", "value44", "5.01") ::
+ Row("4", null, null, null, null, null) ::
+ Row("5", "", null, null, null, null) ::
+ Row("6", null, null, null, null, null) ::
Nil
- checkAnswer(df.selectExpr("key", "json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5')"), expected)
+ checkAnswer(
+ df.select($"key", functions.json_tuple($"jstring", "f1", "f2", "f3", "f4", "f5")),
+ expected)
}
test("json_tuple filter and group") {
val df: DataFrame = tuples.toDF("key", "jstring")
val expr = df
- .selectExpr("json_tuple(jstring, 'f1', 'f2') as jt")
- .where($"jt.c0".isNotNull)
- .groupBy($"jt.c1")
+ .select(functions.json_tuple($"jstring", "f1", "f2"))
+ .where($"c0".isNotNull)
+ .groupBy($"c1")
.count()
val expected = Row(null, 1) ::
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 6f8ed413a0..091caab921 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -1821,6 +1821,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
val explode = "(?i)explode".r
+ val jsonTuple = "(?i)json_tuple".r
def nodesToGenerator(nodes: Seq[Node]): (Generator, Seq[String]) = {
val function = nodes.head
@@ -1833,6 +1834,9 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Token("TOK_FUNCTION", Token(explode(), Nil) :: child :: Nil) =>
(Explode(nodeToExpr(child)), attributes)
+ case Token("TOK_FUNCTION", Token(jsonTuple(), Nil) :: children) =>
+ (JsonTuple(children.map(nodeToExpr)), attributes)
+
case Token("TOK_FUNCTION", Token(functionName, Nil) :: children) =>
val functionInfo: FunctionInfo =
Option(FunctionRegistry.getFunctionInfo(functionName.toLowerCase)).getOrElse(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
index 528a7398b1..a330362b4e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.hive
import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.spark.sql.catalyst.expressions.JsonTuple
+import org.apache.spark.sql.catalyst.plans.logical.Generate
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
@@ -183,4 +185,15 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll {
assertError("select interval '.1111111111' second",
"nanosecond 1111111111 outside range")
}
+
+ test("use native json_tuple instead of hive's UDTF in LATERAL VIEW") {
+ val plan = HiveQl.parseSql(
+ """
+ |SELECT *
+ |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+ |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b
+ """.stripMargin)
+
+ assert(plan.children.head.asInstanceOf[Generate].generator.isInstanceOf[JsonTuple])
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 9a425d7f6b..3427152b2d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1448,4 +1448,35 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
Row("1", "10") :: Row("2", "20") :: Row("3", "30") :: Row("4", "40") :: Nil)
}
}
+
+ test("SPARK-11590: use native json_tuple in lateral view") {
+ checkAnswer(sql(
+ """
+ |SELECT a, b
+ |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+ |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b
+ """.stripMargin), Row("value1", "12"))
+
+ // we should use `c0`, `c1`... as the name of fields if no alias is provided, to follow hive.
+ checkAnswer(sql(
+ """
+ |SELECT c0, c1
+ |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+ |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt
+ """.stripMargin), Row("value1", "12"))
+
+ // we can also use `json_tuple` in project list.
+ checkAnswer(sql(
+ """
+ |SELECT json_tuple(json, 'f1', 'f2')
+ |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test
+ """.stripMargin), Row("value1", "12"))
+
+ // we can also mix `json_tuple` with other project expressions.
+ checkAnswer(sql(
+ """
+ |SELECT json_tuple(json, 'f1', 'f2'), 3.14, str
+ |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test
+ """.stripMargin), Row("value1", "12", 3.14, "hello"))
+ }
}