diff options
author | Michael Armbrust <michael@databricks.com> | 2015-03-27 11:40:00 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-03-27 11:40:00 -0700 |
commit | 5d9c37c23d1edd91e6c5561780006b762cde5f66 (patch) | |
tree | 9264eada8a790ed62b71cf805277602e5113199e /sql/core | |
parent | aa2b9917489f9bbb02c8acea5ff43335042e2705 (diff) | |
download | spark-5d9c37c23d1edd91e6c5561780006b762cde5f66.tar.gz spark-5d9c37c23d1edd91e6c5561780006b762cde5f66.tar.bz2 spark-5d9c37c23d1edd91e6c5561780006b762cde5f66.zip |
[SPARK-6550][SQL] Use analyzed plan in DataFrame
This is based on bug and test case proposed by viirya. See #5203 for a excellent description of the problem.
TLDR; The problem occurs because the function `groupBy(String)` calls `resolve`, which returns an `AttributeReference`. However, this `AttributeReference` is based on an analyzed plan which is thrown away. At execution time, we once again analyze the plan. However, in the case of self-joins, each call to analyze will produce a new tree for the left side of the join, rendering the previously returned `AttributeReference` invalid.
As a fix, I propose we keep the analyzed plan instead of the unresolved plan inside of a `DataFrame`.
Author: Michael Armbrust <michael@databricks.com>
Closes #5217 from marmbrus/preanalyzer and squashes the following commits:
1f98e2d [Michael Armbrust] revert change
dd4dec1 [Michael Armbrust] Use the analyzed plan in DataFrame
089c52e [Michael Armbrust] WIP
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 4 |
2 files changed, 5 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 4c80359cf0..423ef3912b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -146,7 +146,7 @@ class DataFrame private[sql]( _: WriteToFile => LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) case _ => - queryExecution.logical + queryExecution.analyzed } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index fbc4065a96..5f03805d70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -113,6 +113,10 @@ class DataFrameSuite extends QueryTest { checkAnswer( df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(), Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) + + checkAnswer( + df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").count(), + Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) } test("explode") { |