diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-08-18 13:24:12 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-08-18 13:24:12 +0800 |
commit | 10204b9d29cd69895f5a606e75510dc64cf2e009 (patch) | |
tree | 3ccdc9e8ba136120e80afe108e397f08b5946f39 /sql/core/src | |
parent | e6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b (diff) | |
download | spark-10204b9d29cd69895f5a606e75510dc64cf2e009.tar.gz spark-10204b9d29cd69895f5a606e75510dc64cf2e009.tar.bz2 spark-10204b9d29cd69895f5a606e75510dc64cf2e009.zip |
[SPARK-16995][SQL] TreeNodeException when flat mapping RelationalGroupedDataset created from DataFrame containing a column created with lit/expr
## What changes were proposed in this pull request?
A TreeNodeException is thrown when executing the following minimal example in Spark 2.0.
import spark.implicits._
case class test (x: Int, q: Int)
val d = Seq(1).toDF("x")
d.withColumn("q", lit(0)).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show
d.withColumn("q", expr("0")).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show
The problem is at `FoldablePropagation`. The rule will do `transformExpressions` on `LogicalPlan`. The query above contains a `MapGroups` which has a parameter `dataAttributes:Seq[Attribute]`. One attributes in `dataAttributes` will be transformed to an `Alias(literal(0), _)` in `FoldablePropagation`. `Alias` is not an `Attribute` and causes the error.
We can't easily detect such type inconsistency during transforming expressions. A direct approach to this problem is to skip doing `FoldablePropagation` on object operators as they should not contain such expressions.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #14648 from viirya/flat-mapping.
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 88fb1472b6..8ce6ea66b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -878,6 +878,19 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val ds = spark.createDataset(data)(enc) checkDataset(ds, (("a", "b"), "c"), (null, "d")) } + + test("SPARK-16995: flat mapping on Dataset containing a column created with lit/expr") { + val df = Seq("1").toDF("a") + + import df.sparkSession.implicits._ + + checkDataset( + df.withColumn("b", lit(0)).as[ClassData] + .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }) + checkDataset( + df.withColumn("b", expr("0")).as[ClassData] + .groupByKey(_.a).flatMapGroups { case (x, iter) => List[Int]() }) + } } case class Generic[T](id: T, value: Double) |