aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-03-27 11:40:00 -0700
committerMichael Armbrust <michael@databricks.com>2015-03-27 11:40:00 -0700
commit5d9c37c23d1edd91e6c5561780006b762cde5f66 (patch)
tree9264eada8a790ed62b71cf805277602e5113199e /sql
parentaa2b9917489f9bbb02c8acea5ff43335042e2705 (diff)
downloadspark-5d9c37c23d1edd91e6c5561780006b762cde5f66.tar.gz
spark-5d9c37c23d1edd91e6c5561780006b762cde5f66.tar.bz2
spark-5d9c37c23d1edd91e6c5561780006b762cde5f66.zip
[SPARK-6550][SQL] Use analyzed plan in DataFrame
This is based on bug and test case proposed by viirya. See #5203 for a excellent description of the problem. TLDR; The problem occurs because the function `groupBy(String)` calls `resolve`, which returns an `AttributeReference`. However, this `AttributeReference` is based on an analyzed plan which is thrown away. At execution time, we once again analyze the plan. However, in the case of self-joins, each call to analyze will produce a new tree for the left side of the join, rendering the previously returned `AttributeReference` invalid. As a fix, I propose we keep the analyzed plan instead of the unresolved plan inside of a `DataFrame`. Author: Michael Armbrust <michael@databricks.com> Closes #5217 from marmbrus/preanalyzer and squashes the following commits: 1f98e2d [Michael Armbrust] revert change dd4dec1 [Michael Armbrust] Use the analyzed plan in DataFrame 089c52e [Michael Armbrust] WIP
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala4
2 files changed, 5 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 4c80359cf0..423ef3912b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -146,7 +146,7 @@ class DataFrame private[sql](
_: WriteToFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
- queryExecution.logical
+ queryExecution.analyzed
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index fbc4065a96..5f03805d70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -113,6 +113,10 @@ class DataFrameSuite extends QueryTest {
checkAnswer(
df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
+
+ checkAnswer(
+ df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").count(),
+ Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
}
test("explode") {