From 45df77b8418873a00d770e435358bf603765595f Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Tue, 10 Feb 2015 19:40:51 -0800 Subject: [SPARK-5709] [SQL] Add EXPLAIN support in DataFrame API for debugging purpose Author: Cheng Hao Closes #4496 from chenghao-intel/df_explain and squashes the following commits: 552aa58 [Cheng Hao] Add explain support for DF --- sql/core/src/main/scala/org/apache/spark/sql/Column.scala | 8 ++++++++ .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 6 ++++++ .../src/main/scala/org/apache/spark/sql/DataFrameImpl.scala | 13 ++++++++++--- .../scala/org/apache/spark/sql/execution/commands.scala | 7 +++++-- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 8 +++----- 5 files changed, 32 insertions(+), 10 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 1011bf0bb5..b0e95908ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -600,6 +600,14 @@ trait Column extends DataFrame { def desc: Column = exprToColumn(SortOrder(expr, Descending), computable = false) def asc: Column = exprToColumn(SortOrder(expr, Ascending), computable = false) + + override def explain(extended: Boolean): Unit = { + if (extended) { + println(expr) + } else { + println(expr.prettyString) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index ca8d552c5f..17900c5ee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -124,6 +124,12 @@ trait DataFrame extends RDDApi[Row] { /** Prints the schema to the console in a nice tree format. */ def printSchema(): Unit + /** Prints the plans (logical and physical) to the console for debugging purpose. */ + def explain(extended: Boolean): Unit + + /** Only prints the physical plan to the console for debugging purpose. */ + def explain(): Unit = explain(false) + /** * Returns true if the `collect` and `take` methods can be run locally * (without any Spark executors). diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 0134b038f3..9638ce0865 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -30,12 +30,11 @@ import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection} -import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, ResolvedStar, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.sideBySide -import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} +import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{NumericType, StructType} @@ -115,6 +114,14 @@ private[sql] class DataFrameImpl protected[sql]( override def printSchema(): Unit = println(schema.treeString) + override def explain(extended: Boolean): Unit = { + ExplainCommand( + logicalPlan, + extended = extended).queryExecution.executedPlan.executeCollect().map { + r => println(r.getString(0)) + } + } + override def isLocal: Boolean = { logicalPlan.isInstanceOf[LocalRelation] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 335757087d..2b1726ad4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import scala.collection.mutable.ArrayBuffer @@ -116,7 +117,9 @@ case class SetCommand( @DeveloperApi case class ExplainCommand( logicalPlan: LogicalPlan, - override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand { + override val output: Seq[Attribute] = + Seq(AttributeReference("plan", StringType, nullable = false)()), + extended: Boolean = false) extends RunnableCommand { // Run through the optimizer to generate the physical plan. override def run(sqlContext: SQLContext) = try { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 8618301ba8..f3c9e63652 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -466,23 +466,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C // Just fake explain for any of the native commands. case Token("TOK_EXPLAIN", explainArgs) if noExplainCommands.contains(explainArgs.head.getText) => - ExplainCommand(NoRelation, Seq(AttributeReference("plan", StringType, nullable = false)())) + ExplainCommand(NoRelation) case Token("TOK_EXPLAIN", explainArgs) if "TOK_CREATETABLE" == explainArgs.head.getText => val Some(crtTbl) :: _ :: extended :: Nil = getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs) ExplainCommand( nodeToPlan(crtTbl), - Seq(AttributeReference("plan", StringType,nullable = false)()), - extended != None) + extended = extended.isDefined) case Token("TOK_EXPLAIN", explainArgs) => // Ignore FORMATTED if present. val Some(query) :: _ :: extended :: Nil = getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs) ExplainCommand( nodeToPlan(query), - Seq(AttributeReference("plan", StringType, nullable = false)()), - extended != None) + extended = extended.isDefined) case Token("TOK_DESCTABLE", describeArgs) => // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL -- cgit v1.2.3