diff options
Diffstat (limited to 'sql')
7 files changed, 85 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index cf14992ef8..c32a4b886e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.trees.TreeNode @@ -67,6 +68,17 @@ abstract class Expression extends TreeNode[Expression] { def childrenResolved = !children.exists(!_.resolved) /** + * Returns a string representation of this expression that does not have developer centric + * debugging information like the expression id. + */ + def prettyString: String = { + transform { + case a: AttributeReference => PrettyAttribute(a.name) + case u: UnresolvedAttribute => PrettyAttribute(u.name) + }.toString + } + + /** * A set of helper functions that return the correct descendant of `scala.math.Numeric[T]` type * and do any casting necessary of child evaluation. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index e6ab1fd8d7..7f122e9d55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -190,6 +190,26 @@ case class AttributeReference( override def toString: String = s"$name#${exprId.id}$typeSuffix" } +/** + * A place holder used when printing expressions without debugging information such as the + * expression id or the unresolved indicator. + */ +case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[Expression] { + type EvaluatedType = Any + + override def toString = name + + override def withNullability(newNullability: Boolean): Attribute = ??? + override def newInstance(): Attribute = ??? + override def withQualifiers(newQualifiers: Seq[String]): Attribute = ??? + override def withName(newName: String): Attribute = ??? + override def qualifiers: Seq[String] = ??? + override def exprId: ExprId = ??? + override def eval(input: Row): EvaluatedType = ??? + override def nullable: Boolean = ??? + override def dataType: DataType = ??? +} + object VirtualColumn { val groupingIdName = "grouping__id" def newGroupingId = AttributeReference(groupingIdName, IntegerType, false)() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6abfb7853c..04e0d09947 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[sql] object DataFrame { def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { @@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] { */ def toDataFrame: DataFrame = this + override def toString = + try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch { + case NonFatal(e) => + s"Invalid tree; ${e.getMessage}:\n$queryExecution" + } + /** * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 73393295ab..1ee16ad516 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql]( override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) override def select(cols: Column*): DataFrame = { - val exprs = cols.zipWithIndex.map { - case (Column(expr: NamedExpression), _) => - expr - case (Column(expr: Expression), _) => - Alias(expr, expr.toString)() + val namedExpressions = cols.map { + case Column(expr: NamedExpression) => expr + case Column(expr: Expression) => Alias(expr, expr.prettyString)() } - Project(exprs.toSeq, logicalPlan) + Project(namedExpressions.toSeq, logicalPlan) } override def select(col: String, cols: String*): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 0600dcc226..ce0557b881 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten throw new UnsupportedOperationException("Cannot run this method on an UncomputableColumn") } + override def toString = expr.prettyString + override def isComputable: Boolean = false override val sqlContext: SQLContext = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 5cc67cdd13..acef49aabf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext._ -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row} import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.types._ @@ -38,6 +38,15 @@ import org.apache.spark.sql.types._ package object debug { /** + * Augments [[SQLContext]] with debug methods. + */ + implicit class DebugSQLContext(sqlContext: SQLContext) { + def debug() = { + sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") + } + } + + /** * :: DeveloperApi :: * Augments [[DataFrame]]s with debug methods. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5aa3db720c..02623f73c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.TestData._ + import scala.language.postfixOps import org.apache.spark.sql.Dsl._ @@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest { TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) } + test("dataframe toString") { + assert(testData.toString === "[key: int, value: string]") + assert(testData("key").toString === "[key: int]") + } + + test("incomputable toString") { + assert($"test".toString === "test") + } + + test("invalid plan toString, debug mode") { + val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true") + + // Turn on debug mode so we can see invalid query plans. + import org.apache.spark.sql.execution.debug._ + TestSQLContext.debug() + + val badPlan = testData.select('badColumn) + + assert(badPlan.toString contains badPlan.queryExecution.toString, + "toString on bad query plans should include the query execution but was:\n" + + badPlan.toString) + + // Set the flag back to original value before this test. + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) + } + test("table scan") { checkAnswer( testData, |