From 7b25dc7b7e5a098552c0d640eee132b83d42db56 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 3 Mar 2016 00:06:46 -0800 Subject: [SPARK-13466] [SQL] Remove projects that become redundant after column pruning rule JIRA: https://issues.apache.org/jira/browse/SPARK-13466 ## What changes were proposed in this pull request? With column pruning rule in optimizer, some Project operators will become redundant. We should remove these redundant Projects. For an example query: val input = LocalRelation('key.int, 'value.string) val query = Project(Seq($"x.key", $"y.key"), Join( SubqueryAlias("x", input), BroadcastHint(SubqueryAlias("y", input)), Inner, None)) After the first run of column pruning, it would like: Project(Seq($"x.key", $"y.key"), Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), Project(Seq($"y.key"), <-- inserted by the rule BroadcastHint(SubqueryAlias("y", input))), Inner, None)) Actually we don't need the outside Project now. This patch will remove it: Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), Project(Seq($"y.key"), BroadcastHint(SubqueryAlias("y", input))), Inner, None) ## How was the this patch tested? Unit test is added into ColumnPruningSuite. Author: Liang-Chi Hsieh Closes #11341 from viirya/remove-redundant-project. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 +++++- .../catalyst/optimizer/ColumnPruningSuite.scala | 23 +++++++++++++++++++++- .../catalyst/optimizer/JoinOptimizationSuite.scala | 9 ++++----- .../sql/execution/metric/SQLMetricsSuite.scala | 8 ++++---- 4 files changed, 35 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 83ea302013..059d8ff87b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -312,6 +312,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { + def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = + output1.size == output2.size && + output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) + def apply(plan: LogicalPlan): LogicalPlan = plan transform { // Prunes the unused columns from project list of Project/Aggregate/Window/Expand case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty => @@ -378,7 +382,7 @@ object ColumnPruning extends Rule[LogicalPlan] { case p @ Project(_, l: LeafNode) => p // Eliminate no-op Projects - case p @ Project(projectList, child) if child.output == p.output => child + case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child // for all other logical plans that inherits the output from it's children case p @ Project(_, child) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 5cab1fc95a..d09601e034 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder} -import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.types.StringType @@ -252,6 +252,27 @@ class ColumnPruningSuite extends PlanTest { comparePlans(Optimize.execute(query), expected) } + test("Remove redundant projects in column pruning rule") { + val input = LocalRelation('key.int, 'value.string) + + val query = + Project(Seq($"x.key", $"y.key"), + Join( + SubqueryAlias("x", input), + BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze + + val optimized = Optimize.execute(query) + + val expected = + Join( + Project(Seq($"x.key"), SubqueryAlias("x", input)), + BroadcastHint( + Project(Seq($"y.key"), SubqueryAlias("y", input))), + Inner, None).analyze + + comparePlans(optimized, expected) + } + implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]() private val func = identity[Iterator[OtherTuple]] _ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 2f382bbda0..e2f8146bee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -105,11 +105,10 @@ class JoinOptimizationSuite extends PlanTest { val optimized = Optimize.execute(query) val expected = - Project(Seq($"x.key", $"y.key"), - Join( - Project(Seq($"x.key"), SubqueryAlias("x", input)), - BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))), - Inner, None)).analyze + Join( + Project(Seq($"x.key"), SubqueryAlias("x", input)), + BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))), + Inner, None).analyze comparePlans(optimized, expected) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 5b4f6f1d24..f754acb761 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -172,7 +172,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = sqlContext.sql( "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a") testSparkPlanMetrics(df, 1, Map( - 1L -> ("SortMergeJoin", Map( + 0L -> ("SortMergeJoin", Map( // It's 4 because we only read 3 rows in the first partition and 1 row in the second one "number of output rows" -> 4L))) ) @@ -190,7 +190,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = sqlContext.sql( "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a") testSparkPlanMetrics(df, 1, Map( - 1L -> ("SortMergeOuterJoin", Map( + 0L -> ("SortMergeOuterJoin", Map( // It's 4 because we only read 3 rows in the first partition and 1 row in the second one "number of output rows" -> 8L))) ) @@ -198,7 +198,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df2 = sqlContext.sql( "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a") testSparkPlanMetrics(df2, 1, Map( - 1L -> ("SortMergeOuterJoin", Map( + 0L -> ("SortMergeOuterJoin", Map( // It's 4 because we only read 3 rows in the first partition and 1 row in the second one "number of output rows" -> 8L))) ) @@ -298,7 +298,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { val df = sqlContext.sql( "SELECT * FROM testData2 JOIN testDataForJoin") testSparkPlanMetrics(df, 1, Map( - 1L -> ("CartesianProduct", Map( + 0L -> ("CartesianProduct", Map( "number of output rows" -> 12L))) ) } -- cgit v1.2.3