aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-02-04 11:08:50 -0800
committerAndrew Or <andrew@databricks.com>2016-02-04 11:08:50 -0800
commit33212cb9a13a6012b4c19ccfc0fb3db75de304da (patch)
tree80761a91514012b05729b24a852e0374929c9140
parent085f510ae554e2739a38ee0bc7210c4ece902f3f (diff)
downloadspark-33212cb9a13a6012b4c19ccfc0fb3db75de304da.tar.gz
spark-33212cb9a13a6012b4c19ccfc0fb3db75de304da.tar.bz2
spark-33212cb9a13a6012b4c19ccfc0fb3db75de304da.zip
[SPARK-13168][SQL] Collapse adjacent repartition operators
Spark SQL should collapse adjacent `Repartition` operators and only keep the last one. Author: Josh Rosen <joshrosen@databricks.com> Closes #11064 from JoshRosen/collapse-repartition.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala16
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala (renamed from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ProjectCollapsingSuite.scala)4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala2
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala4
6 files changed, 33 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4ecee75048..a1ac930739 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -68,7 +68,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
PushPredicateThroughAggregate,
ColumnPruning,
// Operator combine
- ProjectCollapsing,
+ CollapseRepartition,
+ CollapseProject,
CombineFilters,
CombineLimits,
CombineUnions,
@@ -322,7 +323,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
* Combines two adjacent [[Project]] operators into one and perform alias substitution,
* merging the expressions into one single expression.
*/
-object ProjectCollapsing extends Rule[LogicalPlan] {
+object CollapseProject extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p @ Project(projectList1, Project(projectList2, child)) =>
@@ -391,6 +392,16 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
}
/**
+ * Combines adjacent [[Repartition]] operators by keeping only the last one.
+ */
+object CollapseRepartition extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case r @ Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
+ Repartition(numPartitions, shuffle, child)
+ }
+}
+
+/**
* Simplifies LIKE expressions that do not need full regular expressions to evaluate the condition.
* For example, when the expression is just checking to see if a string starts with a given
* pattern.
@@ -857,6 +868,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
* Splits join condition expressions into three categories based on the attributes required
* to evaluate them.
+ *
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ProjectCollapsingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
index 85b6530481..f5fd5ca6be 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ProjectCollapsingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
@@ -25,11 +25,11 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
-class ProjectCollapsingSuite extends PlanTest {
+class CollapseProjectSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", FixedPoint(10), EliminateSubQueries) ::
- Batch("ProjectCollapsing", Once, ProjectCollapsing) :: Nil
+ Batch("CollapseProject", Once, CollapseProject) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index f9f3bd55aa..b49ca928b6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -42,7 +42,7 @@ class FilterPushdownSuite extends PlanTest {
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
- ProjectCollapsing) :: Nil
+ CollapseProject) :: Nil
}
val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
index 9b1e16c727..858a0d8fde 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
@@ -43,7 +43,7 @@ class JoinOrderSuite extends PlanTest {
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
- ProjectCollapsing) :: Nil
+ CollapseProject) :: Nil
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 8fca5e2167..adaeb513bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -21,8 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, Row, SQLConf}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder}
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
import org.apache.spark.sql.functions._
@@ -223,6 +222,18 @@ class PlannerSuite extends SharedSQLContext {
}
}
+ test("collapse adjacent repartitions") {
+ val doubleRepartitioned = testData.repartition(10).repartition(20).coalesce(5)
+ def countRepartitions(plan: LogicalPlan): Int = plan.collect { case r: Repartition => r }.length
+ assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
+ assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 1)
+ doubleRepartitioned.queryExecution.optimizedPlan match {
+ case r: Repartition =>
+ assert(r.numPartitions === 5)
+ assert(r.shuffle === false)
+ }
+ }
+
// --- Unit tests of EnsureRequirements ---------------------------------------------------------
// When it comes to testing whether EnsureRequirements properly ensures distribution requirements,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 1654594538..fc5725d691 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
-import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
+import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -188,7 +188,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
// The `WidenSetOperationTypes` analysis rule may introduce extra `Project`s over
// `Aggregate`s to perform type casting. This rule merges these `Project`s into
// `Aggregate`s.
- ProjectCollapsing,
+ CollapseProject,
// Used to handle other auxiliary `Project`s added by analyzer (e.g.
// `ResolveAggregateFunctions` rule)