diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2016-06-10 13:06:18 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-06-10 13:06:18 -0700 |
commit | 667d4ea7b35f285954ea7cb719b7c80581e31f4d (patch) | |
tree | 76903b522534838267b6d96a0694aeb66f6231a4 | |
parent | fb219029dd1b8d2783c3e202361401048296595c (diff) | |
download | spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.gz spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.bz2 spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.zip |
[SPARK-6320][SQL] Move planLater method into GenericStrategy.
## What changes were proposed in this pull request?
This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #13147 from ueshin/issues/SPARK-6320.
6 files changed, 151 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 8b1a34f79c..5f694f44b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } @@ -47,17 +55,47 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] - /** - * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be - * filled in automatically by the QueryPlanner using the other execution strategies that are - * available. - */ - protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() - def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... - val iter = strategies.view.flatMap(_(plan)).toIterator - assert(iter.hasNext, s"No plan for $plan") - iter + + // Collect physical plan candidates. + val candidates = strategies.iterator.flatMap(_(plan)) + + // The candidates may contain placeholders marked as [[planLater]], + // so try to replace them by their child plans. + val plans = candidates.flatMap { candidate => + val placeholders = collectPlaceholders(candidate) + + if (placeholders.isEmpty) { + // Take the candidate as is because it does not contain placeholders. + Iterator(candidate) + } else { + // Plan the logical plan marked as [[planLater]] and replace the placeholders. + placeholders.iterator.foldLeft(Iterator(candidate)) { + case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => + // Plan the logical plan for the placeholder. + val childPlans = this.plan(logicalPlan) + + candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => + childPlans.map { childPlan => + // Replace the placeholder by the child plan + candidateWithPlaceholders.transformUp { + case p if p == placeholder => childPlan + } + } + } + } + } + } + + val pruned = prunePlans(plans) + assert(pruned.hasNext, s"No plan for $plan") + pruned } + + /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */ + protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] + + /** Prunes bad plans to prevent combinatorial explosion. */ + protected def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index a2d45026e0..e6dc50a40e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -75,6 +75,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) + // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, + // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index de832ec70b..73e2ffdf00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} import org.apache.spark.sql.internal.SQLConf @@ -42,6 +43,18 @@ class SparkPlanner( InMemoryScans :: BasicOperators :: Nil) + override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { + plan.collect { + case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan + } + } + + override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = { + // TODO: We will need to prune bad plans when we improve plan space exploration + // to prevent combinatorial explosion. + plans + } + /** * Used to build table scan operators where complex projection and filtering are done using * separate physical operators. This function returns the given scan operator with Project and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index a36fe78a49..d1261dd6ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder @@ -35,6 +37,27 @@ import org.apache.spark.sql.execution.streaming.MemoryPlan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.ContinuousQuery +/** + * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting + * with the query planner and is not designed to be stable across spark releases. Developers + * writing libraries should instead consider using the stable APIs provided in + * [[org.apache.spark.sql.sources]] + */ +@DeveloperApi +abstract class SparkStrategy extends GenericStrategy[SparkPlan] { + + override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan) +} + +private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode { + + override def output: Seq[Attribute] = plan.output + + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException() + } +} + private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 97e35bb104..28d8bc3de6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -18,7 +18,7 @@ package org.apache.spark import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} /** * Allows the execution of relational queries, including those expressed in SQL using Spark. @@ -40,7 +40,7 @@ package object sql { * [[org.apache.spark.sql.sources]] */ @DeveloperApi - type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan] + type Strategy = SparkStrategy type DataFrame = Dataset[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala new file mode 100644 index 0000000000..aecfd30621 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, ReturnAnswer, Union} +import org.apache.spark.sql.test.SharedSQLContext + +class SparkPlannerSuite extends SharedSQLContext { + import testImplicits._ + + test("Ensure to go down only the first branch, not any other possible branches") { + + case object NeverPlanned extends LeafNode { + override def output: Seq[Attribute] = Nil + } + + var planned = 0 + object TestStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReturnAnswer(child) => + planned += 1 + planLater(child) :: planLater(NeverPlanned) :: Nil + case Union(children) => + planned += 1 + UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil + case LocalRelation(output, data) => + planned += 1 + LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil + case NeverPlanned => + fail("QueryPlanner should not go down to this branch.") + case _ => Nil + } + } + + try { + spark.experimental.extraStrategies = TestStrategy :: Nil + + val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS()) + + assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f")) + assert(planned === 4) + } finally { + spark.experimental.extraStrategies = Nil + } + } +} |