aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-07-08 16:44:53 +0800
committerWenchen Fan <wenchen@databricks.com>2016-07-08 16:44:53 +0800
commitdff73bfa5e08c4c065584cfa9655a7839d28ad49 (patch)
treea32f2729fe6b5885c1b5a01286fb5ef3cfa50e6c /sql/catalyst
parent5bce4580939c27876f11cd75f0dc2190fb9fa908 (diff)
downloadspark-dff73bfa5e08c4c065584cfa9655a7839d28ad49.tar.gz
spark-dff73bfa5e08c4c065584cfa9655a7839d28ad49.tar.bz2
spark-dff73bfa5e08c4c065584cfa9655a7839d28ad49.zip
[SPARK-16052][SQL] Improve `CollapseRepartition` optimizer for Repartition/RepartitionBy
## What changes were proposed in this pull request? This PR improves `CollapseRepartition` to optimize the adjacent combinations of **Repartition** and **RepartitionBy**. Also, this PR adds a testsuite for this optimizer. **Target Scenario** ```scala scala> val dsView1 = spark.range(8).repartition(8, $"id") scala> dsView1.createOrReplaceTempView("dsView1") scala> sql("select id from dsView1 distribute by id").explain(true) ``` **Before** ```scala scala> sql("select id from dsView1 distribute by id").explain(true) == Parsed Logical Plan == 'RepartitionByExpression ['id] +- 'Project ['id] +- 'UnresolvedRelation `dsView1` == Analyzed Logical Plan == id: bigint RepartitionByExpression [id#0L] +- Project [id#0L] +- SubqueryAlias dsview1 +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Optimized Logical Plan == RepartitionByExpression [id#0L] +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Physical Plan == Exchange hashpartitioning(id#0L, 200) +- Exchange hashpartitioning(id#0L, 8) +- *Range (0, 8, splits=8) ``` **After** ```scala scala> sql("select id from dsView1 distribute by id").explain(true) == Parsed Logical Plan == 'RepartitionByExpression ['id] +- 'Project ['id] +- 'UnresolvedRelation `dsView1` == Analyzed Logical Plan == id: bigint RepartitionByExpression [id#0L] +- Project [id#0L] +- SubqueryAlias dsview1 +- RepartitionByExpression [id#0L], 8 +- Range (0, 8, splits=8) == Optimized Logical Plan == RepartitionByExpression [id#0L] +- Range (0, 8, splits=8) == Physical Plan == Exchange hashpartitioning(id#0L, 200) +- *Range (0, 8, splits=8) ``` ## How was this patch tested? Pass the Jenkins tests (including a new testsuite). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13765 from dongjoon-hyun/SPARK-16052.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala17
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala78
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala6
4 files changed, 102 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 84c9cc8c8e..5181dcc786 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -370,8 +370,11 @@ package object dsl {
case plan => SubqueryAlias(alias, plan)
}
- def distribute(exprs: Expression*): LogicalPlan =
- RepartitionByExpression(exprs, logicalPlan)
+ def repartition(num: Integer): LogicalPlan =
+ Repartition(num, shuffle = true, logicalPlan)
+
+ def distribute(exprs: Expression*)(n: Int = -1): LogicalPlan =
+ RepartitionByExpression(exprs, logicalPlan, numPartitions = if (n < 0) None else Some(n))
def analyze: LogicalPlan =
EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 03d15eabdd..368e9a5396 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -556,12 +556,27 @@ object CollapseProject extends Rule[LogicalPlan] {
}
/**
- * Combines adjacent [[Repartition]] operators by keeping only the last one.
+ * Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator combinations
+ * by keeping only the one.
+ * 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]].
+ * 2. For adjacent [[RepartitionByExpression]]s, collapse into the last [[RepartitionByExpression]].
+ * 3. For a combination of [[Repartition]] and [[RepartitionByExpression]], collapse as a single
+ * [[RepartitionByExpression]] with the expression and last number of partition.
*/
object CollapseRepartition extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ // Case 1
case Repartition(numPartitions, shuffle, Repartition(_, _, child)) =>
Repartition(numPartitions, shuffle, child)
+ // Case 2
+ case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), numPartitions) =>
+ RepartitionByExpression(exprs, child, numPartitions)
+ // Case 3
+ case Repartition(numPartitions, _, r: RepartitionByExpression) =>
+ r.copy(numPartitions = Some(numPartitions))
+ // Case 3
+ case RepartitionByExpression(exprs, Repartition(_, _, child), numPartitions) =>
+ RepartitionByExpression(exprs, child, numPartitions)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
new file mode 100644
index 0000000000..8952c72fe4
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseRepartitionSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class CollapseRepartitionSuite extends PlanTest {
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("CollapseRepartition", FixedPoint(10),
+ CollapseRepartition) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int, 'b.int)
+
+ test("collapse two adjacent repartitions into one") {
+ val query = testRelation
+ .repartition(10)
+ .repartition(20)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = testRelation.repartition(20).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("collapse repartition and repartitionBy into one") {
+ val query = testRelation
+ .repartition(10)
+ .distribute('a)(20)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = testRelation.distribute('a)(20).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("collapse repartitionBy and repartition into one") {
+ val query = testRelation
+ .distribute('a)(20)
+ .repartition(10)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = testRelation.distribute('a)(10).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+
+ test("collapse two adjacent repartitionBys into one") {
+ val query = testRelation
+ .distribute('b)(10)
+ .distribute('a)(20)
+
+ val optimized = Optimize.execute(query.analyze)
+ val correctAnswer = testRelation.distribute('a)(20).analyze
+
+ comparePlans(optimized, correctAnswer)
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 456948d645..fbe236e196 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -151,9 +151,9 @@ class PlanParserSuite extends PlanTest {
("", basePlan),
(" order by a, b desc", basePlan.orderBy('a.asc, 'b.desc)),
(" sort by a, b desc", basePlan.sortBy('a.asc, 'b.desc)),
- (" distribute by a, b", basePlan.distribute('a, 'b)),
- (" distribute by a sort by b", basePlan.distribute('a).sortBy('b.asc)),
- (" cluster by a, b", basePlan.distribute('a, 'b).sortBy('a.asc, 'b.asc))
+ (" distribute by a, b", basePlan.distribute('a, 'b)()),
+ (" distribute by a sort by b", basePlan.distribute('a)().sortBy('b.asc)),
+ (" cluster by a, b", basePlan.distribute('a, 'b)().sortBy('a.asc, 'b.asc))
)
orderSortDistrClusterClauses.foreach {