diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-02-10 19:40:51 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-10 19:40:51 -0800 |
commit | 45df77b8418873a00d770e435358bf603765595f (patch) | |
tree | 80611475433ff81eeb9d9b2590fd4658d6aed4e6 /sql/core | |
parent | ea60284095cad43aa7ac98256576375d0e91a52a (diff) | |
download | spark-45df77b8418873a00d770e435358bf603765595f.tar.gz spark-45df77b8418873a00d770e435358bf603765595f.tar.bz2 spark-45df77b8418873a00d770e435358bf603765595f.zip |
[SPARK-5709] [SQL] Add EXPLAIN support in DataFrame API for debugging purpose
Author: Cheng Hao <hao.cheng@intel.com>
Closes #4496 from chenghao-intel/df_explain and squashes the following commits:
552aa58 [Cheng Hao] Add explain support for DF
Diffstat (limited to 'sql/core')
4 files changed, 29 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 1011bf0bb5..b0e95908ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -600,6 +600,14 @@ trait Column extends DataFrame { def desc: Column = exprToColumn(SortOrder(expr, Descending), computable = false) def asc: Column = exprToColumn(SortOrder(expr, Ascending), computable = false) + + override def explain(extended: Boolean): Unit = { + if (extended) { + println(expr) + } else { + println(expr.prettyString) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index ca8d552c5f..17900c5ee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -124,6 +124,12 @@ trait DataFrame extends RDDApi[Row] { /** Prints the schema to the console in a nice tree format. */ def printSchema(): Unit + /** Prints the plans (logical and physical) to the console for debugging purpose. */ + def explain(extended: Boolean): Unit + + /** Only prints the physical plan to the console for debugging purpose. */ + def explain(): Unit = explain(false) + /** * Returns true if the `collect` and `take` methods can be run locally * (without any Spark executors). diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 0134b038f3..9638ce0865 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -30,12 +30,11 @@ import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection} -import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, ResolvedStar, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.sideBySide -import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} +import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{NumericType, StructType} @@ -115,6 +114,14 @@ private[sql] class DataFrameImpl protected[sql]( override def printSchema(): Unit = println(schema.treeString) + override def explain(extended: Boolean): Unit = { + ExplainCommand( + logicalPlan, + extended = extended).queryExecution.executedPlan.executeCollect().map { + r => println(r.getString(0)) + } + } + override def isLocal: Boolean = { logicalPlan.isInstanceOf[LocalRelation] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 335757087d..2b1726ad4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import scala.collection.mutable.ArrayBuffer @@ -116,7 +117,9 @@ case class SetCommand( @DeveloperApi case class ExplainCommand( logicalPlan: LogicalPlan, - override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand { + override val output: Seq[Attribute] = + Seq(AttributeReference("plan", StringType, nullable = false)()), + extended: Boolean = false) extends RunnableCommand { // Run through the optimizer to generate the physical plan. override def run(sqlContext: SQLContext) = try { |