diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2016-06-10 13:06:18 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-06-10 13:06:18 -0700 |
commit | 667d4ea7b35f285954ea7cb719b7c80581e31f4d (patch) | |
tree | 76903b522534838267b6d96a0694aeb66f6231a4 /sql/catalyst | |
parent | fb219029dd1b8d2783c3e202361401048296595c (diff) | |
download | spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.gz spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.bz2 spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.zip |
[SPARK-6320][SQL] Move planLater method into GenericStrategy.
## What changes were proposed in this pull request?
This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #13147 from ueshin/issues/SPARK-6320.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 58 |
1 files changed, 48 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 8b1a34f79c..5f694f44b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } @@ -47,17 +55,47 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] - /** - * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be - * filled in automatically by the QueryPlanner using the other execution strategies that are - * available. - */ - protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() - def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... - val iter = strategies.view.flatMap(_(plan)).toIterator - assert(iter.hasNext, s"No plan for $plan") - iter + + // Collect physical plan candidates. + val candidates = strategies.iterator.flatMap(_(plan)) + + // The candidates may contain placeholders marked as [[planLater]], + // so try to replace them by their child plans. + val plans = candidates.flatMap { candidate => + val placeholders = collectPlaceholders(candidate) + + if (placeholders.isEmpty) { + // Take the candidate as is because it does not contain placeholders. + Iterator(candidate) + } else { + // Plan the logical plan marked as [[planLater]] and replace the placeholders. + placeholders.iterator.foldLeft(Iterator(candidate)) { + case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => + // Plan the logical plan for the placeholder. + val childPlans = this.plan(logicalPlan) + + candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => + childPlans.map { childPlan => + // Replace the placeholder by the child plan + candidateWithPlaceholders.transformUp { + case p if p == placeholder => childPlan + } + } + } + } + } + } + + val pruned = prunePlans(plans) + assert(pruned.hasNext, s"No plan for $plan") + pruned } + + /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */ + protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] + + /** Prunes bad plans to prevent combinatorial explosion. */ + protected def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan] } |