aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-03-24 14:08:20 -0700
committerMichael Armbrust <michael@databricks.com>2015-03-24 14:08:20 -0700
commitcbeaf9ebab31a0bcbca884d4db7a791fd9edbff3 (patch)
treead4201beaf624ae03183bdbe90a31dc79759c6c2
parent046c1e2aa459147bf592371bb9fb7a65edb182e7 (diff)
downloadspark-cbeaf9ebab31a0bcbca884d4db7a791fd9edbff3.tar.gz
spark-cbeaf9ebab31a0bcbca884d4db7a791fd9edbff3.tar.bz2
spark-cbeaf9ebab31a0bcbca884d4db7a791fd9edbff3.zip
[SPARK-6376][SQL] Avoid eliminating subqueries until optimization
Previously it was okay to throw away subqueries after analysis, as we would never try to use that tree for resolution again. However, with eager analysis in `DataFrame`s this can cause errors for queries such as: ```scala val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str") df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count() ``` As a result, in this PR we defer the elimination of subqueries until the optimization phase. Author: Michael Armbrust <michael@databricks.com> Closes #5160 from marmbrus/subqueriesInDfs and squashes the following commits: a9bb262 [Michael Armbrust] Update Optimizer.scala 27d25bf [Michael Armbrust] fix hive tests 9137e03 [Michael Armbrust] add type 81cd597 [Michael Armbrust] Avoid eliminating subqueries until optimization
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala2
9 files changed, 34 insertions, 17 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c93af79795..13d2ae4c6f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -64,9 +64,7 @@ class Analyzer(catalog: Catalog,
UnresolvedHavingClauseAttributes ::
TrimGroupingAliases ::
typeCoercionRules ++
- extendedResolutionRules : _*),
- Batch("Remove SubQueries", fixedPoint,
- EliminateSubQueries)
+ extendedResolutionRules : _*)
)
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 51a09ac0e1..7f5f617812 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{TypeTag, typeTag}
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
@@ -289,7 +289,7 @@ package object dsl {
InsertIntoTable(
analysis.UnresolvedRelation(Seq(tableName)), Map.empty, logicalPlan, overwrite)
- def analyze = analysis.SimpleAnalyzer(logicalPlan)
+ def analyze: LogicalPlan = EliminateSubQueries(analysis.SimpleAnalyzer(logicalPlan))
}
object plans { // scalastyle:ignore
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1a75fcf354..74edaacc4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.optimizer
import scala.collection.immutable.HashSet
+import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.FullOuter
@@ -32,6 +33,9 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan]
object DefaultOptimizer extends Optimizer {
val batches =
+ // SubQueries are only needed for analysis and can be removed before execution.
+ Batch("Remove SubQueries", FixedPoint(100),
+ EliminateSubQueries) ::
Batch("Combine Limits", FixedPoint(100),
CombineLimits) ::
Batch("ConstantFolding", FixedPoint(100),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 0f8b144ccc..b01a61d7bf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.Logging
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedGetField, Resolver}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -73,12 +73,16 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
* can do better should override this function.
*/
def sameResult(plan: LogicalPlan): Boolean = {
- plan.getClass == this.getClass &&
- plan.children.size == children.size && {
- logDebug(s"[${cleanArgs.mkString(", ")}] == [${plan.cleanArgs.mkString(", ")}]")
- cleanArgs == plan.cleanArgs
+ val cleanLeft = EliminateSubQueries(this)
+ val cleanRight = EliminateSubQueries(plan)
+
+ cleanLeft.getClass == cleanRight.getClass &&
+ cleanLeft.children.size == cleanRight.children.size && {
+ logDebug(
+ s"[${cleanRight.cleanArgs.mkString(", ")}] == [${cleanLeft.cleanArgs.mkString(", ")}]")
+ cleanRight.cleanArgs == cleanLeft.cleanArgs
} &&
- (plan.children, children).zipped.forall(_ sameResult _)
+ (cleanLeft.children, cleanRight.children).zipped.forall(_ sameResult _)
}
/** Args that have cleaned such that differences in expression id should not affect equality */
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 359aec4a7b..756cd36f05 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -32,9 +32,13 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseInsensitiveCatalog = new SimpleCatalog(false)
val caseSensitiveAnalyzer =
- new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
+ new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) {
+ override val extendedResolutionRules = EliminateSubQueries :: Nil
+ }
val caseInsensitiveAnalyzer =
- new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
+ new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) {
+ override val extendedResolutionRules = EliminateSubQueries :: Nil
+ }
val checkAnalysis = new CheckAnalysis
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index ff441ef26f..c30ed694a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -108,6 +108,13 @@ class DataFrameSuite extends QueryTest {
)
}
+ test("self join with aliases") {
+ val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
+ checkAnswer(
+ df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("x.str").count(),
+ Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
+ }
+
test("explode") {
val df = Seq((1, "a b c"), (2, "a b"), (3, "a")).toDF("number", "letters")
val df2 =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index dd0948ad82..e4dee87849 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -34,7 +34,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
- val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.analyzed
+ val join = x.join(y, $"x.a" === $"y.a", "inner").queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
@@ -109,7 +109,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("multiple-key equi-join is hash-join") {
val x = testData2.as("x")
val y = testData2.as("y")
- val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.analyzed
+ val join = x.join(y, ($"x.a" === $"y.a") && ($"x.b" === $"y.b")).queryExecution.optimizedPlan
val planned = planner.HashJoin(join)
assert(planned.size === 1)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index ff2e6ea9ea..e5ad0bf552 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -579,7 +579,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row(3) :: Row(4) :: Nil
)
- table("test_parquet_ctas").queryExecution.analyzed match {
+ table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index d891c4e890..8a31bd0309 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -292,7 +292,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
Seq(Row(1, "str1"))
)
- table("test_parquet_ctas").queryExecution.analyzed match {
+ table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(