aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-06-10 13:06:18 -0700
committerMichael Armbrust <michael@databricks.com>2016-06-10 13:06:18 -0700
commit667d4ea7b35f285954ea7cb719b7c80581e31f4d (patch)
tree76903b522534838267b6d96a0694aeb66f6231a4
parentfb219029dd1b8d2783c3e202361401048296595c (diff)
downloadspark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.gz
spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.tar.bz2
spark-667d4ea7b35f285954ea7cb719b7c80581e31f4d.zip
[SPARK-6320][SQL] Move planLater method into GenericStrategy.
## What changes were proposed in this pull request? This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for extra strategies to be able to use `planLater` in its strategy. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #13147 from ueshin/issues/SPARK-6320.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala58
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/package.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala63
6 files changed, 151 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 8b1a34f79c..5f694f44b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
* empty list should be returned.
*/
abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging {
+
+ /**
+ * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
+ * filled in automatically by the QueryPlanner using the other execution strategies that are
+ * available.
+ */
+ protected def planLater(plan: LogicalPlan): PhysicalPlan
+
def apply(plan: LogicalPlan): Seq[PhysicalPlan]
}
@@ -47,17 +55,47 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
/** A list of execution strategies that can be used by the planner */
def strategies: Seq[GenericStrategy[PhysicalPlan]]
- /**
- * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be
- * filled in automatically by the QueryPlanner using the other execution strategies that are
- * available.
- */
- protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next()
-
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
- val iter = strategies.view.flatMap(_(plan)).toIterator
- assert(iter.hasNext, s"No plan for $plan")
- iter
+
+ // Collect physical plan candidates.
+ val candidates = strategies.iterator.flatMap(_(plan))
+
+ // The candidates may contain placeholders marked as [[planLater]],
+ // so try to replace them by their child plans.
+ val plans = candidates.flatMap { candidate =>
+ val placeholders = collectPlaceholders(candidate)
+
+ if (placeholders.isEmpty) {
+ // Take the candidate as is because it does not contain placeholders.
+ Iterator(candidate)
+ } else {
+ // Plan the logical plan marked as [[planLater]] and replace the placeholders.
+ placeholders.iterator.foldLeft(Iterator(candidate)) {
+ case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
+ // Plan the logical plan for the placeholder.
+ val childPlans = this.plan(logicalPlan)
+
+ candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
+ childPlans.map { childPlan =>
+ // Replace the placeholder by the child plan
+ candidateWithPlaceholders.transformUp {
+ case p if p == placeholder => childPlan
+ }
+ }
+ }
+ }
+ }
+ }
+
+ val pruned = prunePlans(plans)
+ assert(pruned.hasNext, s"No plan for $plan")
+ pruned
}
+
+ /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */
+ protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)]
+
+ /** Prunes bad plans to prevent combinatorial explosion. */
+ protected def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index a2d45026e0..e6dc50a40e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -75,6 +75,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
lazy val sparkPlan: SparkPlan = {
SparkSession.setActiveSession(sparkSession)
+ // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
+ // but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index de832ec70b..73e2ffdf00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}
import org.apache.spark.sql.internal.SQLConf
@@ -42,6 +43,18 @@ class SparkPlanner(
InMemoryScans ::
BasicOperators :: Nil)
+ override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
+ plan.collect {
+ case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
+ }
+ }
+
+ override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = {
+ // TODO: We will need to prune bad plans when we improve plan space exploration
+ // to prevent combinatorial explosion.
+ plans
+ }
+
/**
* Used to build table scan operators where complex projection and filtering are done using
* separate physical operators. This function returns the given scan operator with Project and
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index a36fe78a49..d1261dd6ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -35,6 +37,27 @@ import org.apache.spark.sql.execution.streaming.MemoryPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.ContinuousQuery
+/**
+ * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting
+ * with the query planner and is not designed to be stable across spark releases. Developers
+ * writing libraries should instead consider using the stable APIs provided in
+ * [[org.apache.spark.sql.sources]]
+ */
+@DeveloperApi
+abstract class SparkStrategy extends GenericStrategy[SparkPlan] {
+
+ override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan)
+}
+
+private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
+
+ override def output: Seq[Attribute] = plan.output
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException()
+ }
+}
+
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SparkPlanner =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 97e35bb104..28d8bc3de6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -18,7 +18,7 @@
package org.apache.spark
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
/**
* Allows the execution of relational queries, including those expressed in SQL using Spark.
@@ -40,7 +40,7 @@ package object sql {
* [[org.apache.spark.sql.sources]]
*/
@DeveloperApi
- type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan]
+ type Strategy = SparkStrategy
type DataFrame = Dataset[Row]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
new file mode 100644
index 0000000000..aecfd30621
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, ReturnAnswer, Union}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class SparkPlannerSuite extends SharedSQLContext {
+ import testImplicits._
+
+ test("Ensure to go down only the first branch, not any other possible branches") {
+
+ case object NeverPlanned extends LeafNode {
+ override def output: Seq[Attribute] = Nil
+ }
+
+ var planned = 0
+ object TestStrategy extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case ReturnAnswer(child) =>
+ planned += 1
+ planLater(child) :: planLater(NeverPlanned) :: Nil
+ case Union(children) =>
+ planned += 1
+ UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil
+ case LocalRelation(output, data) =>
+ planned += 1
+ LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil
+ case NeverPlanned =>
+ fail("QueryPlanner should not go down to this branch.")
+ case _ => Nil
+ }
+ }
+
+ try {
+ spark.experimental.extraStrategies = TestStrategy :: Nil
+
+ val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS())
+
+ assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f"))
+ assert(planned === 4)
+ } finally {
+ spark.experimental.extraStrategies = Nil
+ }
+ }
+}