diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-07-08 16:44:53 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-07-08 16:44:53 +0800 |
commit | dff73bfa5e08c4c065584cfa9655a7839d28ad49 (patch) | |
tree | a32f2729fe6b5885c1b5a01286fb5ef3cfa50e6c /sql/catalyst/src/main/scala | |
parent | 5bce4580939c27876f11cd75f0dc2190fb9fa908 (diff) | |
download | spark-dff73bfa5e08c4c065584cfa9655a7839d28ad49.tar.gz spark-dff73bfa5e08c4c065584cfa9655a7839d28ad49.tar.bz2 spark-dff73bfa5e08c4c065584cfa9655a7839d28ad49.zip |
[SPARK-16052][SQL] Improve `CollapseRepartition` optimizer for Repartition/RepartitionBy
## What changes were proposed in this pull request?
This PR improves `CollapseRepartition` to optimize the adjacent combinations of **Repartition** and **RepartitionBy**. Also, this PR adds a testsuite for this optimizer.
**Target Scenario**
```scala
scala> val dsView1 = spark.range(8).repartition(8, $"id")
scala> dsView1.createOrReplaceTempView("dsView1")
scala> sql("select id from dsView1 distribute by id").explain(true)
```
**Before**
```scala
scala> sql("select id from dsView1 distribute by id").explain(true)
== Parsed Logical Plan ==
'RepartitionByExpression ['id]
+- 'Project ['id]
+- 'UnresolvedRelation `dsView1`
== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [id#0L]
+- Project [id#0L]
+- SubqueryAlias dsview1
+- RepartitionByExpression [id#0L], 8
+- Range (0, 8, splits=8)
== Optimized Logical Plan ==
RepartitionByExpression [id#0L]
+- RepartitionByExpression [id#0L], 8
+- Range (0, 8, splits=8)
== Physical Plan ==
Exchange hashpartitioning(id#0L, 200)
+- Exchange hashpartitioning(id#0L, 8)
+- *Range (0, 8, splits=8)
```
**After**
```scala
scala> sql("select id from dsView1 distribute by id").explain(true)
== Parsed Logical Plan ==
'RepartitionByExpression ['id]
+- 'Project ['id]
+- 'UnresolvedRelation `dsView1`
== Analyzed Logical Plan ==
id: bigint
RepartitionByExpression [id#0L]
+- Project [id#0L]
+- SubqueryAlias dsview1
+- RepartitionByExpression [id#0L], 8
+- Range (0, 8, splits=8)
== Optimized Logical Plan ==
RepartitionByExpression [id#0L]
+- Range (0, 8, splits=8)
== Physical Plan ==
Exchange hashpartitioning(id#0L, 200)
+- *Range (0, 8, splits=8)
```
## How was this patch tested?
Pass the Jenkins tests (including a new testsuite).
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #13765 from dongjoon-hyun/SPARK-16052.
Diffstat (limited to 'sql/catalyst/src/main/scala')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala | 7 | ||||
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 17 |
2 files changed, 21 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 84c9cc8c8e..5181dcc786 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -370,8 +370,11 @@ package object dsl { case plan => SubqueryAlias(alias, plan) } - def distribute(exprs: Expression*): LogicalPlan = - RepartitionByExpression(exprs, logicalPlan) + def repartition(num: Integer): LogicalPlan = + Repartition(num, shuffle = true, logicalPlan) + + def distribute(exprs: Expression*)(n: Int = -1): LogicalPlan = + RepartitionByExpression(exprs, logicalPlan, numPartitions = if (n < 0) None else Some(n)) def analyze: LogicalPlan = EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 03d15eabdd..368e9a5396 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -556,12 +556,27 @@ object CollapseProject extends Rule[LogicalPlan] { } /** - * Combines adjacent [[Repartition]] operators by keeping only the last one. + * Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator combinations + * by keeping only the one. + * 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]]. + * 2. For adjacent [[RepartitionByExpression]]s, collapse into the last [[RepartitionByExpression]]. + * 3. For a combination of [[Repartition]] and [[RepartitionByExpression]], collapse as a single + * [[RepartitionByExpression]] with the expression and last number of partition. */ object CollapseRepartition extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + // Case 1 case Repartition(numPartitions, shuffle, Repartition(_, _, child)) => Repartition(numPartitions, shuffle, child) + // Case 2 + case RepartitionByExpression(exprs, RepartitionByExpression(_, child, _), numPartitions) => + RepartitionByExpression(exprs, child, numPartitions) + // Case 3 + case Repartition(numPartitions, _, r: RepartitionByExpression) => + r.copy(numPartitions = Some(numPartitions)) + // Case 3 + case RepartitionByExpression(exprs, Repartition(_, _, child), numPartitions) => + RepartitionByExpression(exprs, child, numPartitions) } } |