aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEdoardo Vacchi <uncommonnonsense@gmail.com>2015-09-14 14:56:04 -0700
committerMichael Armbrust <michael@databricks.com>2015-09-14 14:56:04 -0700
commit64f04154e3078ec7340da97e3c2b07cf24e89098 (patch)
tree62bb84c05f09ca6360238e4a79568cd698c0278d
parent7e32387ae6303fd1cd32389d47df87170b841c67 (diff)
downloadspark-64f04154e3078ec7340da97e3c2b07cf24e89098.tar.gz
spark-64f04154e3078ec7340da97e3c2b07cf24e89098.tar.bz2
spark-64f04154e3078ec7340da97e3c2b07cf24e89098.zip
[SPARK-6981] [SQL] Factor out SparkPlanner and QueryExecution from SQLContext
Alternative to PR #6122; in this case the refactored out classes are replaced by inner classes with the same name for backwards binary compatibility * process in a lighter-weight, backwards-compatible way Author: Edoardo Vacchi <uncommonnonsense@gmail.com> Closes #6356 from evacchi/sqlctx-refactoring-lite.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala138
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala85
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala92
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
6 files changed, 195 insertions, 128 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 1a687b2374..3e61123c14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, SQLExecution}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
@@ -114,7 +114,7 @@ private[sql] object DataFrame {
@Experimental
class DataFrame private[sql](
@transient val sqlContext: SQLContext,
- @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends Serializable {
+ @DeveloperApi @transient val queryExecution: QueryExecution) extends Serializable {
// Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4e8414af50..e3fdd782e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -38,6 +38,10 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
+import org.apache.spark.sql.execution.{Filter, _}
+import org.apache.spark.sql.{execution => sparkexecution}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.sources._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
@@ -188,9 +192,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
- protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
+ protected[sql] def executeSql(sql: String):
+ org.apache.spark.sql.execution.QueryExecution = executePlan(parseSql(sql))
- protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
+ protected[sql] def executePlan(plan: LogicalPlan) =
+ new sparkexecution.QueryExecution(this, plan)
@transient
protected[sql] val tlSession = new ThreadLocal[SQLSession]() {
@@ -781,77 +787,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}.toArray
}
- protected[sql] class SparkPlanner extends SparkStrategies {
- val sparkContext: SparkContext = self.sparkContext
-
- val sqlContext: SQLContext = self
-
- def codegenEnabled: Boolean = self.conf.codegenEnabled
-
- def unsafeEnabled: Boolean = self.conf.unsafeEnabled
-
- def numPartitions: Int = self.conf.numShufflePartitions
-
- def strategies: Seq[Strategy] =
- experimental.extraStrategies ++ (
- DataSourceStrategy ::
- DDLStrategy ::
- TakeOrderedAndProject ::
- HashAggregation ::
- Aggregation ::
- LeftSemiJoin ::
- EquiJoinSelection ::
- InMemoryScans ::
- BasicOperators ::
- CartesianProduct ::
- BroadcastNestedLoopJoin :: Nil)
-
- /**
- * Used to build table scan operators where complex projection and filtering are done using
- * separate physical operators. This function returns the given scan operator with Project and
- * Filter nodes added only when needed. For example, a Project operator is only used when the
- * final desired output requires complex expressions to be evaluated or when columns can be
- * further eliminated out after filtering has been done.
- *
- * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
- * away by the filter pushdown optimization.
- *
- * The required attributes for both filtering and expression evaluation are passed to the
- * provided `scanBuilder` function so that it can avoid unnecessary column materialization.
- */
- def pruneFilterProject(
- projectList: Seq[NamedExpression],
- filterPredicates: Seq[Expression],
- prunePushedDownFilters: Seq[Expression] => Seq[Expression],
- scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
-
- val projectSet = AttributeSet(projectList.flatMap(_.references))
- val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
- val filterCondition =
- prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
-
- // Right now we still use a projection even if the only evaluation is applying an alias
- // to a column. Since this is a no-op, it could be avoided. However, using this
- // optimization with the current implementation would change the output schema.
- // TODO: Decouple final output schema from expression evaluation so this copy can be
- // avoided safely.
-
- if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
- filterSet.subsetOf(projectSet)) {
- // When it is possible to just use column pruning to get the right projection and
- // when the columns of this projection are enough to evaluate all filter conditions,
- // just do a scan followed by a filter, with no extra project.
- val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
- filterCondition.map(Filter(_, scan)).getOrElse(scan)
- } else {
- val scan = scanBuilder((projectSet ++ filterSet).toSeq)
- Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
- }
- }
- }
+ @deprecated("use org.apache.spark.sql.SparkPlanner", "1.6.0")
+ protected[sql] class SparkPlanner extends sparkexecution.SparkPlanner(this)
@transient
- protected[sql] val planner = new SparkPlanner
+ protected[sql] val planner: sparkexecution.SparkPlanner = new sparkexecution.SparkPlanner(this)
@transient
protected[sql] lazy val emptyResult = sparkContext.parallelize(Seq.empty[InternalRow], 1)
@@ -898,59 +838,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] lazy val conf: SQLConf = new SQLConf
}
- /**
- * :: DeveloperApi ::
- * The primary workflow for executing relational queries using Spark. Designed to allow easy
- * access to the intermediate phases of query execution for developers.
- */
- @DeveloperApi
- protected[sql] class QueryExecution(val logical: LogicalPlan) {
- def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
-
- lazy val analyzed: LogicalPlan = analyzer.execute(logical)
- lazy val withCachedData: LogicalPlan = {
- assertAnalyzed()
- cacheManager.useCachedData(analyzed)
- }
- lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
-
- // TODO: Don't just pick the first one...
- lazy val sparkPlan: SparkPlan = {
- SparkPlan.currentContext.set(self)
- planner.plan(optimizedPlan).next()
- }
- // executedPlan should not be used to initialize any SparkPlan. It should be
- // only used for execution.
- lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
-
- /** Internal version of the RDD. Avoids copies and has no schema */
- lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
-
- protected def stringOrError[A](f: => A): String =
- try f.toString catch { case e: Throwable => e.toString }
-
- def simpleString: String =
- s"""== Physical Plan ==
- |${stringOrError(executedPlan)}
- """.stripMargin.trim
-
- override def toString: String = {
- def output =
- analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
-
- s"""== Parsed Logical Plan ==
- |${stringOrError(logical)}
- |== Analyzed Logical Plan ==
- |${stringOrError(output)}
- |${stringOrError(analyzed)}
- |== Optimized Logical Plan ==
- |${stringOrError(optimizedPlan)}
- |== Physical Plan ==
- |${stringOrError(executedPlan)}
- |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
- """.stripMargin.trim
- }
- }
+ @deprecated("use org.apache.spark.sql.QueryExecution", "1.6.0")
+ protected[sql] class QueryExecution(logical: LogicalPlan)
+ extends sparkexecution.QueryExecution(this, logical)
/**
* Parses the data type in our internal string representation. The data type string should
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
new file mode 100644
index 0000000000..7bb4133a29
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.{Experimental, DeveloperApi}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.{InternalRow, optimizer}
+import org.apache.spark.sql.{SQLContext, Row}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * :: DeveloperApi ::
+ * The primary workflow for executing relational queries using Spark. Designed to allow easy
+ * access to the intermediate phases of query execution for developers.
+ */
+@DeveloperApi
+class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
+ val analyzer = sqlContext.analyzer
+ val optimizer = sqlContext.optimizer
+ val planner = sqlContext.planner
+ val cacheManager = sqlContext.cacheManager
+ val prepareForExecution = sqlContext.prepareForExecution
+
+ def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
+
+ lazy val analyzed: LogicalPlan = analyzer.execute(logical)
+ lazy val withCachedData: LogicalPlan = {
+ assertAnalyzed()
+ cacheManager.useCachedData(analyzed)
+ }
+ lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
+
+ // TODO: Don't just pick the first one...
+ lazy val sparkPlan: SparkPlan = {
+ SparkPlan.currentContext.set(sqlContext)
+ planner.plan(optimizedPlan).next()
+ }
+ // executedPlan should not be used to initialize any SparkPlan. It should be
+ // only used for execution.
+ lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
+
+ /** Internal version of the RDD. Avoids copies and has no schema */
+ lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
+
+ protected def stringOrError[A](f: => A): String =
+ try f.toString catch { case e: Throwable => e.toString }
+
+ def simpleString: String =
+ s"""== Physical Plan ==
+ |${stringOrError(executedPlan)}
+ """.stripMargin.trim
+
+
+ override def toString: String = {
+ def output =
+ analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
+
+ s"""== Parsed Logical Plan ==
+ |${stringOrError(logical)}
+ |== Analyzed Logical Plan ==
+ |${stringOrError(output)}
+ |${stringOrError(analyzed)}
+ |== Optimized Logical Plan ==
+ |${stringOrError(optimizedPlan)}
+ |== Physical Plan ==
+ |${stringOrError(executedPlan)}
+ |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
+ """.stripMargin.trim
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index cee58218a8..1422e15549 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -37,7 +37,7 @@ private[sql] object SQLExecution {
* we can connect them with an execution.
*/
def withNewExecutionId[T](
- sqlContext: SQLContext, queryExecution: SQLContext#QueryExecution)(body: => T): T = {
+ sqlContext: SQLContext, queryExecution: QueryExecution)(body: => T): T = {
val sc = sqlContext.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
if (oldExecutionId == null) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
new file mode 100644
index 0000000000..b346f43fae
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+
+@Experimental
+class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
+ val sparkContext: SparkContext = sqlContext.sparkContext
+
+ def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled
+
+ def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled
+
+ def numPartitions: Int = sqlContext.conf.numShufflePartitions
+
+ def strategies: Seq[Strategy] =
+ sqlContext.experimental.extraStrategies ++ (
+ DataSourceStrategy ::
+ DDLStrategy ::
+ TakeOrderedAndProject ::
+ HashAggregation ::
+ Aggregation ::
+ LeftSemiJoin ::
+ EquiJoinSelection ::
+ InMemoryScans ::
+ BasicOperators ::
+ CartesianProduct ::
+ BroadcastNestedLoopJoin :: Nil)
+
+ /**
+ * Used to build table scan operators where complex projection and filtering are done using
+ * separate physical operators. This function returns the given scan operator with Project and
+ * Filter nodes added only when needed. For example, a Project operator is only used when the
+ * final desired output requires complex expressions to be evaluated or when columns can be
+ * further eliminated out after filtering has been done.
+ *
+ * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
+ * away by the filter pushdown optimization.
+ *
+ * The required attributes for both filtering and expression evaluation are passed to the
+ * provided `scanBuilder` function so that it can avoid unnecessary column materialization.
+ */
+ def pruneFilterProject(
+ projectList: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ prunePushedDownFilters: Seq[Expression] => Seq[Expression],
+ scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {
+
+ val projectSet = AttributeSet(projectList.flatMap(_.references))
+ val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+ val filterCondition =
+ prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And)
+
+ // Right now we still use a projection even if the only evaluation is applying an alias
+ // to a column. Since this is a no-op, it could be avoided. However, using this
+ // optimization with the current implementation would change the output schema.
+ // TODO: Decouple final output schema from expression evaluation so this copy can be
+ // avoided safely.
+
+ if (AttributeSet(projectList.map(_.toAttribute)) == projectSet &&
+ filterSet.subsetOf(projectSet)) {
+ // When it is possible to just use column pruning to get the right projection and
+ // when the columns of this projection are enough to evaluate all filter conditions,
+ // just do a scan followed by a filter, with no extra project.
+ val scan = scanBuilder(projectList.asInstanceOf[Seq[Attribute]])
+ filterCondition.map(Filter(_, scan)).getOrElse(scan)
+ } else {
+ val scan = scanBuilder((projectSet ++ filterSet).toSeq)
+ Project(projectList, filterCondition.map(Filter(_, scan)).getOrElse(scan))
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4572d5efc9..5e40d77689 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{SQLContext, Strategy, execution}
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
- self: SQLContext#SparkPlanner =>
+ self: SparkPlanner =>
object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {