From de80b1ba4d3c4b1b3316d482d62e4668b996f6ac Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 10 Feb 2015 13:14:01 -0800 Subject: [SQL] Add toString to DataFrame/Column Author: Michael Armbrust Closes #4436 from marmbrus/dfToString and squashes the following commits: 8a3c35f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into dfToString b72a81b [Michael Armbrust] add toString --- .../scala/org/apache/spark/sql/DataFrame.scala | 8 ++++++ .../scala/org/apache/spark/sql/DataFrameImpl.scala | 10 +++----- .../org/apache/spark/sql/IncomputableColumn.scala | 2 ++ .../apache/spark/sql/execution/debug/package.scala | 11 +++++++- .../org/apache/spark/sql/DataFrameSuite.scala | 29 ++++++++++++++++++++++ 5 files changed, 53 insertions(+), 7 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6abfb7853c..04e0d09947 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[sql] object DataFrame { def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { @@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] { */ def toDataFrame: DataFrame = this + override def toString = + try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch { + case NonFatal(e) => + s"Invalid tree; ${e.getMessage}:\n$queryExecution" + } + /** * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 73393295ab..1ee16ad516 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql]( override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) override def select(cols: Column*): DataFrame = { - val exprs = cols.zipWithIndex.map { - case (Column(expr: NamedExpression), _) => - expr - case (Column(expr: Expression), _) => - Alias(expr, expr.toString)() + val namedExpressions = cols.map { + case Column(expr: NamedExpression) => expr + case Column(expr: Expression) => Alias(expr, expr.prettyString)() } - Project(exprs.toSeq, logicalPlan) + Project(namedExpressions.toSeq, logicalPlan) } override def select(col: String, cols: String*): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 0600dcc226..ce0557b881 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten throw new UnsupportedOperationException("Cannot run this method on an UncomputableColumn") } + override def toString = expr.prettyString + override def isComputable: Boolean = false override val sqlContext: SQLContext = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 5cc67cdd13..acef49aabf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext._ -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row} import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.types._ @@ -37,6 +37,15 @@ import org.apache.spark.sql.types._ */ package object debug { + /** + * Augments [[SQLContext]] with debug methods. + */ + implicit class DebugSQLContext(sqlContext: SQLContext) { + def debug() = { + sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") + } + } + /** * :: DeveloperApi :: * Augments [[DataFrame]]s with debug methods. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5aa3db720c..02623f73c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.TestData._ + import scala.language.postfixOps import org.apache.spark.sql.Dsl._ @@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest { TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) } + test("dataframe toString") { + assert(testData.toString === "[key: int, value: string]") + assert(testData("key").toString === "[key: int]") + } + + test("incomputable toString") { + assert($"test".toString === "test") + } + + test("invalid plan toString, debug mode") { + val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true") + + // Turn on debug mode so we can see invalid query plans. + import org.apache.spark.sql.execution.debug._ + TestSQLContext.debug() + + val badPlan = testData.select('badColumn) + + assert(badPlan.toString contains badPlan.queryExecution.toString, + "toString on bad query plans should include the query execution but was:\n" + + badPlan.toString) + + // Set the flag back to original value before this test. + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) + } + test("table scan") { checkAnswer( testData, -- cgit v1.2.3