aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-06-10 13:06:18 -0700
committerMichael Armbrust <michael@databricks.com>2016-06-10 13:06:18 -0700
commit667d4ea7b35f285954ea7cb719b7c80581e31f4d (patch)
tree76903b522534838267b6d96a0694aeb66f6231a4 /sql/catalyst
parentfb219029dd1b8d2783c3e202361401048296595c (diff)
downloadspark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.gz
spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.bz2
spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.zip
[SPARK-6320][SQL] Move planLater method into GenericStrategy.
## What changes were proposed in this pull request? This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #13147 from ueshin/issues/SPARK-6320.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala58
1 files changed, 48 insertions, 10 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 8b1a34f79c..5f694f44b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
* empty list should be returned.
*/
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
+
+ /**
+ * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
+ * filled in automatically by the QueryPlanner using the other execution strategies that are
+ * available.
+ */
+ protected def planLater(plan: LogicalPlan): PhysicalPlan
+
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
}
@@ -47,17 +55,47 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
/** A list of execution strategies that can be used by the planner */
def strategies: Seq[GenericStrategy[PhysicalPlan]]
- /**
- * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
- * filled in automatically by the QueryPlanner using the other execution strategies that are
- * available.
- */
- protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next()
-
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
- val iter = strategies.view.flatMap(_(plan)).toIterator
- assert(iter.hasNext, s"No plan for $plan")
- iter
+
+ // Collect physical plan candidates.
+ val candidates = strategies.iterator.flatMap(_(plan))
+
+ // The candidates may contain placeholders marked as [[planLater]],
+ // so try to replace them by their child plans.
+ val plans = candidates.flatMap { candidate =>
+ val placeholders = collectPlaceholders(candidate)
+
+ if (placeholders.isEmpty) {
+ // Take the candidate as is because it does not contain placeholders.
+ Iterator(candidate)
+ } else {
+ // Plan the logical plan marked as [[planLater]] and replace the placeholders.
+ placeholders.iterator.foldLeft(Iterator(candidate)) {
+ case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
+ // Plan the logical plan for the placeholder.
+ val childPlans = this.plan(logicalPlan)
+
+ candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
+ childPlans.map { childPlan =>
+ // Replace the placeholder by the child plan
+ candidateWithPlaceholders.transformUp {
+ case p if p == placeholder => childPlan
+ }
+ }
+ }
+ }
+ }
+ }
+
+ val pruned = prunePlans(plans)
+ assert(pruned.hasNext, s"No plan for $plan")
+ pruned
}
+
+ /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */
+ protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)]
+
+ /** Prunes bad plans to prevent combinatorial explosion. */
+ protected def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan]
}