diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-08-18 13:24:12 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-08-18 13:24:12 +0800 |
commit | 10204b9d29cd69895f5a606e75510dc64cf2e009 (patch) | |
tree | 3ccdc9e8ba136120e80afe108e397f08b5946f39 /sql/catalyst/src/main/scala/org | |
parent | e6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b (diff) | |
download | spark-10204b9d29cd69895f5a606e75510dc64cf2e009.tar.gz spark-10204b9d29cd69895f5a606e75510dc64cf2e009.tar.bz2 spark-10204b9d29cd69895f5a606e75510dc64cf2e009.zip |
[SPARK-16995][SQL] TreeNodeException when flat mapping RelationalGroupedDataset created from DataFrame containing a column created with lit/expr
## What changes were proposed in this pull request?
A TreeNodeException is thrown when executing the following minimal example in Spark 2.0.
import spark.implicits._
case class test (x: Int, q: Int)
val d = Seq(1).toDF("x")
d.withColumn("q", lit(0)).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show
d.withColumn("q", expr("0")).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show
The problem is at `FoldablePropagation`. The rule will do `transformExpressions` on `LogicalPlan`. The query above contains a `MapGroups` which has a parameter `dataAttributes:Seq[Attribute]`. One attributes in `dataAttributes` will be transformed to an `Alias(literal(0), _)` in `FoldablePropagation`. `Alias` is not an `Attribute` and causes the error.
We can't easily detect such type inconsistency during transforming expressions. A direct approach to this problem is to skip doing `FoldablePropagation` on object operators as they should not contain such expressions.
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #14648 from viirya/flat-mapping.
Diffstat (limited to 'sql/catalyst/src/main/scala/org')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa15f4a823..b53c0b5bec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -727,6 +727,19 @@ object FoldablePropagation extends Rule[LogicalPlan] { case j @ Join(_, _, LeftOuter | RightOuter | FullOuter, _) => stop = true j + + // These 3 operators take attributes as constructor parameters, and these attributes + // can't be replaced by alias. + case m: MapGroups => + stop = true + m + case f: FlatMapGroupsInR => + stop = true + f + case c: CoGroup => + stop = true + c + case p: LogicalPlan if !stop => p.transformExpressions { case a: AttributeReference if foldableMap.contains(a) => foldableMap(a) |