aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-08-18 13:24:12 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-18 13:24:12 +0800
commit10204b9d29cd69895f5a606e75510dc64cf2e009 (patch)
tree3ccdc9e8ba136120e80afe108e397f08b5946f39 /sql/catalyst/src/main/scala/org
parente6bef7d52f0e19ec771fb0f3e96c7ddbd1a6a19b (diff)
downloadspark-10204b9d29cd69895f5a606e75510dc64cf2e009.tar.gz
spark-10204b9d29cd69895f5a606e75510dc64cf2e009.tar.bz2
spark-10204b9d29cd69895f5a606e75510dc64cf2e009.zip
[SPARK-16995][SQL] TreeNodeException when flat mapping RelationalGroupedDataset created from DataFrame containing a column created with lit/expr
## What changes were proposed in this pull request? A TreeNodeException is thrown when executing the following minimal example in Spark 2.0. import spark.implicits._ case class test (x: Int, q: Int) val d = Seq(1).toDF("x") d.withColumn("q", lit(0)).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show d.withColumn("q", expr("0")).as[test].groupByKey(_.x).flatMapGroups{case (x, iter) => List[Int]()}.show The problem is at `FoldablePropagation`. The rule will do `transformExpressions` on `LogicalPlan`. The query above contains a `MapGroups` which has a parameter `dataAttributes:Seq[Attribute]`. One attributes in `dataAttributes` will be transformed to an `Alias(literal(0), _)` in `FoldablePropagation`. `Alias` is not an `Attribute` and causes the error. We can't easily detect such type inconsistency during transforming expressions. A direct approach to this problem is to skip doing `FoldablePropagation` on object operators as they should not contain such expressions. ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #14648 from viirya/flat-mapping.
Diffstat (limited to 'sql/catalyst/src/main/scala/org')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala13
1 files changed, 13 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index aa15f4a823..b53c0b5bec 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -727,6 +727,19 @@ object FoldablePropagation extends Rule[LogicalPlan] {
case j @ Join(_, _, LeftOuter | RightOuter | FullOuter, _) =>
stop = true
j
+
+ // These 3 operators take attributes as constructor parameters, and these attributes
+ // can't be replaced by alias.
+ case m: MapGroups =>
+ stop = true
+ m
+ case f: FlatMapGroupsInR =>
+ stop = true
+ f
+ case c: CoGroup =>
+ stop = true
+ c
+
case p: LogicalPlan if !stop => p.transformExpressions {
case a: AttributeReference if foldableMap.contains(a) =>
foldableMap(a)