diff options
author | Michael Armbrust <michael@databricks.com> | 2015-02-10 13:14:01 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-10 13:14:01 -0800 |
commit | de80b1ba4d3c4b1b3316d482d62e4668b996f6ac (patch) | |
tree | 340a893e186501534326e74f83f448fe265088e4 /sql/core | |
parent | c49a4049845c91b225e70fd630cdf6ddc055faf8 (diff) | |
download | spark-de80b1ba4d3c4b1b3316d482d62e4668b996f6ac.tar.gz spark-de80b1ba4d3c4b1b3316d482d62e4668b996f6ac.tar.bz2 spark-de80b1ba4d3c4b1b3316d482d62e4668b996f6ac.zip |
[SQL] Add toString to DataFrame/Column
Author: Michael Armbrust <michael@databricks.com>
Closes #4436 from marmbrus/dfToString and squashes the following commits:
8a3c35f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into dfToString
b72a81b [Michael Armbrust] add toString
Diffstat (limited to 'sql/core')
5 files changed, 53 insertions, 7 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6abfb7853c..04e0d09947 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[sql] object DataFrame { def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { @@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] { */ def toDataFrame: DataFrame = this + override def toString = + try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch { + case NonFatal(e) => + s"Invalid tree; ${e.getMessage}:\n$queryExecution" + } + /** * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 73393295ab..1ee16ad516 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql]( override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) override def select(cols: Column*): DataFrame = { - val exprs = cols.zipWithIndex.map { - case (Column(expr: NamedExpression), _) => - expr - case (Column(expr: Expression), _) => - Alias(expr, expr.toString)() + val namedExpressions = cols.map { + case Column(expr: NamedExpression) => expr + case Column(expr: Expression) => Alias(expr, expr.prettyString)() } - Project(exprs.toSeq, logicalPlan) + Project(namedExpressions.toSeq, logicalPlan) } override def select(col: String, cols: String*): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 0600dcc226..ce0557b881 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten throw new UnsupportedOperationException("Cannot run this method on an UncomputableColumn") } + override def toString = expr.prettyString + override def isComputable: Boolean = false override val sqlContext: SQLContext = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 5cc67cdd13..acef49aabf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext._ -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row} import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.types._ @@ -38,6 +38,15 @@ import org.apache.spark.sql.types._ package object debug { /** + * Augments [[SQLContext]] with debug methods. + */ + implicit class DebugSQLContext(sqlContext: SQLContext) { + def debug() = { + sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") + } + } + + /** * :: DeveloperApi :: * Augments [[DataFrame]]s with debug methods. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5aa3db720c..02623f73c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.TestData._ + import scala.language.postfixOps import org.apache.spark.sql.Dsl._ @@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest { TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) } + test("dataframe toString") { + assert(testData.toString === "[key: int, value: string]") + assert(testData("key").toString === "[key: int]") + } + + test("incomputable toString") { + assert($"test".toString === "test") + } + + test("invalid plan toString, debug mode") { + val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true") + + // Turn on debug mode so we can see invalid query plans. + import org.apache.spark.sql.execution.debug._ + TestSQLContext.debug() + + val badPlan = testData.select('badColumn) + + assert(badPlan.toString contains badPlan.queryExecution.toString, + "toString on bad query plans should include the query execution but was:\n" + + badPlan.toString) + + // Set the flag back to original value before this test. + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) + } + test("table scan") { checkAnswer( testData, |