aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2016-03-03 00:06:46 -0800
committerDavies Liu <davies.liu@gmail.com>2016-03-03 00:06:46 -0800
commit7b25dc7b7e5a098552c0d640eee132b83d42db56 (patch)
tree205ce3b1300420b6d28ca6fb39d4985cc5ad6bdc
parent1085bd862a3f69e14e00cec11077e54ab153098b (diff)
downloadspark-7b25dc7b7e5a098552c0d640eee132b83d42db56.tar.gz
spark-7b25dc7b7e5a098552c0d640eee132b83d42db56.tar.bz2
spark-7b25dc7b7e5a098552c0d640eee132b83d42db56.zip
[SPARK-13466] [SQL] Remove projects that become redundant after column pruning rule
JIRA: https://issues.apache.org/jira/browse/SPARK-13466 ## What changes were proposed in this pull request? With column pruning rule in optimizer, some Project operators will become redundant. We should remove these redundant Projects. For an example query: val input = LocalRelation('key.int, 'value.string) val query = Project(Seq($"x.key", $"y.key"), Join( SubqueryAlias("x", input), BroadcastHint(SubqueryAlias("y", input)), Inner, None)) After the first run of column pruning, it would like: Project(Seq($"x.key", $"y.key"), Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), Project(Seq($"y.key"), <-- inserted by the rule BroadcastHint(SubqueryAlias("y", input))), Inner, None)) Actually we don't need the outside Project now. This patch will remove it: Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), Project(Seq($"y.key"), BroadcastHint(SubqueryAlias("y", input))), Inner, None) ## How was the this patch tested? Unit test is added into ColumnPruningSuite. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #11341 from viirya/remove-redundant-project.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala23
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala8
4 files changed, 35 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 83ea302013..059d8ff87b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -312,6 +312,10 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper {
* - LeftSemiJoin
*/
object ColumnPruning extends Rule[LogicalPlan] {
+ def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
+ output1.size == output2.size &&
+ output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
+
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Prunes the unused columns from project list of Project/Aggregate/Window/Expand
case p @ Project(_, p2: Project) if (p2.outputSet -- p.references).nonEmpty =>
@@ -378,7 +382,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
case p @ Project(_, l: LeafNode) => p
// Eliminate no-op Projects
- case p @ Project(projectList, child) if child.output == p.output => child
+ case p @ Project(projectList, child) if sameOutput(child.output, p.output) => child
// for all other logical plans that inherits the output from it's children
case p @ Project(_, child) =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index 5cab1fc95a..d09601e034 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Ascending, Explode, Literal, SortOrder}
-import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.StringType
@@ -252,6 +252,27 @@ class ColumnPruningSuite extends PlanTest {
comparePlans(Optimize.execute(query), expected)
}
+ test("Remove redundant projects in column pruning rule") {
+ val input = LocalRelation('key.int, 'value.string)
+
+ val query =
+ Project(Seq($"x.key", $"y.key"),
+ Join(
+ SubqueryAlias("x", input),
+ BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
+
+ val optimized = Optimize.execute(query)
+
+ val expected =
+ Join(
+ Project(Seq($"x.key"), SubqueryAlias("x", input)),
+ BroadcastHint(
+ Project(Seq($"y.key"), SubqueryAlias("y", input))),
+ Inner, None).analyze
+
+ comparePlans(optimized, expected)
+ }
+
implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
private val func = identity[Iterator[OtherTuple]] _
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index 2f382bbda0..e2f8146bee 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -105,11 +105,10 @@ class JoinOptimizationSuite extends PlanTest {
val optimized = Optimize.execute(query)
val expected =
- Project(Seq($"x.key", $"y.key"),
- Join(
- Project(Seq($"x.key"), SubqueryAlias("x", input)),
- BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
- Inner, None)).analyze
+ Join(
+ Project(Seq($"x.key"), SubqueryAlias("x", input)),
+ BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
+ Inner, None).analyze
comparePlans(optimized, expected)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 5b4f6f1d24..f754acb761 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -172,7 +172,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = sqlContext.sql(
"SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df, 1, Map(
- 1L -> ("SortMergeJoin", Map(
+ 0L -> ("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
"number of output rows" -> 4L)))
)
@@ -190,7 +190,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = sqlContext.sql(
"SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df, 1, Map(
- 1L -> ("SortMergeOuterJoin", Map(
+ 0L -> ("SortMergeOuterJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
"number of output rows" -> 8L)))
)
@@ -198,7 +198,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df2 = sqlContext.sql(
"SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df2, 1, Map(
- 1L -> ("SortMergeOuterJoin", Map(
+ 0L -> ("SortMergeOuterJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
"number of output rows" -> 8L)))
)
@@ -298,7 +298,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
val df = sqlContext.sql(
"SELECT * FROM testData2 JOIN testDataForJoin")
testSparkPlanMetrics(df, 1, Map(
- 1L -> ("CartesianProduct", Map(
+ 0L -> ("CartesianProduct", Map(
"number of output rows" -> 12L)))
)
}