aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-10 13:14:01 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-10 13:14:01 -0800
commitde80b1ba4d3c4b1b3316d482d62e4668b996f6ac (patch)
tree340a893e186501534326e74f83f448fe265088e4 /sql
parentc49a4049845c91b225e70fd630cdf6ddc055faf8 (diff)
downloadspark-de80b1ba4d3c4b1b3316d482d62e4668b996f6ac.tar.gz
spark-de80b1ba4d3c4b1b3316d482d62e4668b996f6ac.tar.bz2
spark-de80b1ba4d3c4b1b3316d482d62e4668b996f6ac.zip
[SQL] Add toString to DataFrame/Column
Author: Michael Armbrust <michael@databricks.com> Closes #4436 from marmbrus/dfToString and squashes the following commits: 8a3c35f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into dfToString b72a81b [Michael Armbrust] add toString
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala29
7 files changed, 85 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index cf14992ef8..c32a4b886e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.trees.TreeNode
@@ -67,6 +68,17 @@ abstract class Expression extends TreeNode[Expression] {
def childrenResolved = !children.exists(!_.resolved)
/**
+ * Returns a string representation of this expression that does not have developer centric
+ * debugging information like the expression id.
+ */
+ def prettyString: String = {
+ transform {
+ case a: AttributeReference => PrettyAttribute(a.name)
+ case u: UnresolvedAttribute => PrettyAttribute(u.name)
+ }.toString
+ }
+
+ /**
* A set of helper functions that return the correct descendant of `scala.math.Numeric[T]` type
* and do any casting necessary of child evaluation.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index e6ab1fd8d7..7f122e9d55 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -190,6 +190,26 @@ case class AttributeReference(
override def toString: String = s"$name#${exprId.id}$typeSuffix"
}
+/**
+ * A place holder used when printing expressions without debugging information such as the
+ * expression id or the unresolved indicator.
+ */
+case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[Expression] {
+ type EvaluatedType = Any
+
+ override def toString = name
+
+ override def withNullability(newNullability: Boolean): Attribute = ???
+ override def newInstance(): Attribute = ???
+ override def withQualifiers(newQualifiers: Seq[String]): Attribute = ???
+ override def withName(newName: String): Attribute = ???
+ override def qualifiers: Seq[String] = ???
+ override def exprId: ExprId = ???
+ override def eval(input: Row): EvaluatedType = ???
+ override def nullable: Boolean = ???
+ override def dataType: DataType = ???
+}
+
object VirtualColumn {
val groupingIdName = "grouping__id"
def newGroupingId = AttributeReference(groupingIdName, IntegerType, false)()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 6abfb7853c..04e0d09947 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
+import scala.util.control.NonFatal
+
private[sql] object DataFrame {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
@@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] {
*/
def toDataFrame: DataFrame = this
+ override def toString =
+ try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch {
+ case NonFatal(e) =>
+ s"Invalid tree; ${e.getMessage}:\n$queryExecution"
+ }
+
/**
* Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion
* from a RDD of tuples into a [[DataFrame]] with meaningful names. For example:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 73393295ab..1ee16ad516 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql](
override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)
override def select(cols: Column*): DataFrame = {
- val exprs = cols.zipWithIndex.map {
- case (Column(expr: NamedExpression), _) =>
- expr
- case (Column(expr: Expression), _) =>
- Alias(expr, expr.toString)()
+ val namedExpressions = cols.map {
+ case Column(expr: NamedExpression) => expr
+ case Column(expr: Expression) => Alias(expr, expr.prettyString)()
}
- Project(exprs.toSeq, logicalPlan)
+ Project(namedExpressions.toSeq, logicalPlan)
}
override def select(col: String, cols: String*): DataFrame = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 0600dcc226..ce0557b881 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
throw new UnsupportedOperationException("Cannot run this method on an UncomputableColumn")
}
+ override def toString = expr.prettyString
+
override def isComputable: Boolean = false
override val sqlContext: SQLContext = null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 5cc67cdd13..acef49aabf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext._
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.types._
@@ -38,6 +38,15 @@ import org.apache.spark.sql.types._
package object debug {
/**
+ * Augments [[SQLContext]] with debug methods.
+ */
+ implicit class DebugSQLContext(sqlContext: SQLContext) {
+ def debug() = {
+ sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+ }
+ }
+
+ /**
* :: DeveloperApi ::
* Augments [[DataFrame]]s with debug methods.
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 5aa3db720c..02623f73c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import org.apache.spark.sql.TestData._
+
import scala.language.postfixOps
import org.apache.spark.sql.Dsl._
@@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest {
TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
}
+ test("dataframe toString") {
+ assert(testData.toString === "[key: int, value: string]")
+ assert(testData("key").toString === "[key: int]")
+ }
+
+ test("incomputable toString") {
+ assert($"test".toString === "test")
+ }
+
+ test("invalid plan toString, debug mode") {
+ val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis
+ TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+
+ // Turn on debug mode so we can see invalid query plans.
+ import org.apache.spark.sql.execution.debug._
+ TestSQLContext.debug()
+
+ val badPlan = testData.select('badColumn)
+
+ assert(badPlan.toString contains badPlan.queryExecution.toString,
+ "toString on bad query plans should include the query execution but was:\n" +
+ badPlan.toString)
+
+ // Set the flag back to original value before this test.
+ TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
+ }
+
test("table scan") {
checkAnswer(
testData,