aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-11-10 11:21:31 -0800
committerYin Huai <yhuai@databricks.com>2015-11-10 11:21:31 -0800
commit53600854c270d4c953fe95fbae528740b5cf6603 (patch)
tree5bccb351008aff9c6b5182fdf22fbf6f77d69b88 /sql/core/src
parentdfcfcbcc0448ebc6f02eba6bf0495832a321c87e (diff)
downloadspark-53600854c270d4c953fe95fbae528740b5cf6603.tar.gz
spark-53600854c270d4c953fe95fbae528740b5cf6603.tar.bz2
spark-53600854c270d4c953fe95fbae528740b5cf6603.zip
[SPARK-11590][SQL] use native json_tuple in lateral view
Author: Wenchen Fan <wenchen@databricks.com> Closes #9562 from cloud-fan/json-tuple.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala23
3 files changed, 31 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3b69247dc5..9368435a63 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -750,10 +750,14 @@ class DataFrame private[sql](
// will remove intermediate Alias for ExtractValue chain, and we need to alias it again to
// make it a NamedExpression.
case Column(u: UnresolvedAttribute) => UnresolvedAlias(u)
+
case Column(expr: NamedExpression) => expr
- // Leave an unaliased explode with an empty list of names since the analyzer will generate the
- // correct defaults after the nested expression's type has been resolved.
+
+ // Leave an unaliased generator with an empty list of names since the analyzer will generate
+ // the correct defaults after the nested expression's type has been resolved.
case Column(explode: Explode) => MultiAlias(explode, Nil)
+ case Column(jt: JsonTuple) => MultiAlias(jt, Nil)
+
case Column(expr: Expression) => Alias(expr, expr.prettyString)()
}
Project(namedExpressions.toSeq, logicalPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 22104e4d48..a59d738010 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2308,6 +2308,18 @@ object functions extends LegacyFunctions {
def explode(e: Column): Column = withExpr { Explode(e.expr) }
/**
+ * Creates a new row for a json column according to the given field names.
+ *
+ * @group collection_funcs
+ * @since 1.6.0
+ */
+ @scala.annotation.varargs
+ def json_tuple(json: Column, fields: String*): Column = withExpr {
+ require(fields.length > 0, "at least 1 field name should be given.")
+ JsonTuple(json.expr +: fields.map(Literal.apply))
+ }
+
+ /**
* Returns length of array or map.
*
* @group collection_funcs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index e3531d0d6d..14fd56fc8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -41,23 +41,26 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
test("json_tuple select") {
val df: DataFrame = tuples.toDF("key", "jstring")
- val expected = Row("1", Row("value1", "value2", "3", null, "5.23")) ::
- Row("2", Row("value12", "2", "value3", "4.01", null)) ::
- Row("3", Row("value13", "2", "value33", "value44", "5.01")) ::
- Row("4", Row(null, null, null, null, null)) ::
- Row("5", Row("", null, null, null, null)) ::
- Row("6", Row(null, null, null, null, null)) ::
+ val expected =
+ Row("1", "value1", "value2", "3", null, "5.23") ::
+ Row("2", "value12", "2", "value3", "4.01", null) ::
+ Row("3", "value13", "2", "value33", "value44", "5.01") ::
+ Row("4", null, null, null, null, null) ::
+ Row("5", "", null, null, null, null) ::
+ Row("6", null, null, null, null, null) ::
Nil
- checkAnswer(df.selectExpr("key", "json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5')"), expected)
+ checkAnswer(
+ df.select($"key", functions.json_tuple($"jstring", "f1", "f2", "f3", "f4", "f5")),
+ expected)
}
test("json_tuple filter and group") {
val df: DataFrame = tuples.toDF("key", "jstring")
val expr = df
- .selectExpr("json_tuple(jstring, 'f1', 'f2') as jt")
- .where($"jt.c0".isNotNull)
- .groupBy($"jt.c1")
+ .select(functions.json_tuple($"jstring", "f1", "f2"))
+ .where($"c0".isNotNull)
+ .groupBy($"c1")
.count()
val expected = Row(null, 1) ::