aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-10-13 13:46:34 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-13 13:46:34 -0700
commit371321cadee7df39258bd374eb59c1e32451d96b (patch)
tree6b5fae061f90f4acb94ed8e58554ec91cccc88a3
parente10d71e7e58bf2ec0f1942cb2f0602396ab866b4 (diff)
downloadspark-371321cadee7df39258bd374eb59c1e32451d96b.tar.gz
spark-371321cadee7df39258bd374eb59c1e32451d96b.tar.bz2
spark-371321cadee7df39258bd374eb59c1e32451d96b.zip
[SQL] Add type checking debugging functions
Adds some functions that were very useful when trying to track down the bug from #2656. This change also changes the tree output for query plans to include the `'` prefix to unresolved nodes and `!` prefix to nodes that refer to non-existent attributes. Author: Michael Armbrust <michael@databricks.com> Closes #2657 from marmbrus/debugging and squashes the following commits: 654b926 [Michael Armbrust] Clean-up, add tests 763af15 [Michael Armbrust] Add typeChecking debugging functions 8c69303 [Michael Armbrust] Add inputSet, references to QueryPlan. Improve tree string with a prefix to denote invalid or unresolved nodes. fbeab54 [Michael Armbrust] Better toString, factories for AttributeSet.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala85
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala33
8 files changed, 163 insertions, 20 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index c3a08bbdb6..2b4969b7cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -17,19 +17,26 @@
package org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.analysis.Star
+
protected class AttributeEquals(val a: Attribute) {
override def hashCode() = a.exprId.hashCode()
- override def equals(other: Any) = other match {
- case otherReference: AttributeEquals => a.exprId == otherReference.a.exprId
- case otherAttribute => false
+ override def equals(other: Any) = (a, other.asInstanceOf[AttributeEquals].a) match {
+ case (a1: AttributeReference, a2: AttributeReference) => a1.exprId == a2.exprId
+ case (a1, a2) => a1 == a2
}
}
object AttributeSet {
- /** Constructs a new [[AttributeSet]] given a sequence of [[Attribute Attributes]]. */
- def apply(baseSet: Seq[Attribute]) = {
- new AttributeSet(baseSet.map(new AttributeEquals(_)).toSet)
- }
+ def apply(a: Attribute) =
+ new AttributeSet(Set(new AttributeEquals(a)))
+
+ /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */
+ def apply(baseSet: Seq[Expression]) =
+ new AttributeSet(
+ baseSet
+ .flatMap(_.references)
+ .map(new AttributeEquals(_)).toSet)
}
/**
@@ -103,4 +110,6 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
// We must force toSeq to not be strict otherwise we end up with a [[Stream]] that captures all
// sorts of things in its closure.
override def toSeq: Seq[Attribute] = baseSet.map(_.a).toArray.toSeq
+
+ override def toString = "{" + baseSet.map(_.a).mkString(", ") + "}"
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 204904ecf0..e7e81a21fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -39,6 +39,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
}
new GenericRow(outputArray)
}
+
+ override def toString = s"Row => [${exprArray.mkString(",")}]"
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index e5a958d599..d023db44d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -57,6 +57,8 @@ abstract class NamedExpression extends Expression {
abstract class Attribute extends NamedExpression {
self: Product =>
+ override def references = AttributeSet(this)
+
def withNullability(newNullability: Boolean): Attribute
def withQualifiers(newQualifiers: Seq[String]): Attribute
def withName(newName: String): Attribute
@@ -116,8 +118,6 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends Attribute with trees.LeafNode[Expression] {
- override def references = AttributeSet(this :: Nil)
-
override def equals(other: Any) = other match {
case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
case _ => false
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index af9e4d86e9..dcbbb62c0a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -32,6 +32,25 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
def outputSet: AttributeSet = AttributeSet(output)
/**
+ * All Attributes that appear in expressions from this operator. Note that this set does not
+ * include attributes that are implicitly referenced by being passed through to the output tuple.
+ */
+ def references: AttributeSet = AttributeSet(expressions.flatMap(_.references))
+
+ /**
+ * The set of all attributes that are input to this operator by its children.
+ */
+ def inputSet: AttributeSet =
+ AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))
+
+ /**
+ * Attributes that are referenced by expressions but not provided by this nodes children.
+ * Subclasses should override this method if they produce attributes internally as it is used by
+ * assertions designed to prevent the construction of invalid plans.
+ */
+ def missingInput: AttributeSet = references -- inputSet
+
+ /**
* Runs [[transform]] with `rule` on all expressions present in this query operator.
* Users should not expect a specific directionality. If a specific directionality is needed,
* transformExpressionsDown or transformExpressionsUp should be used.
@@ -132,4 +151,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)
+
+ protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
+
+ override def simpleString = statePrefix + super.simpleString
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 4f8ad8a7e0..882e9c6110 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -54,12 +54,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
}
/**
- * Returns the set of attributes that this node takes as
- * input from its children.
- */
- lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))
-
- /**
* Returns true if this expression and all its children have been resolved to a specific schema
* and false if it still contains any unresolved placeholders. Implementations of LogicalPlan
* can override this (e.g.
@@ -68,6 +62,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
*/
lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
+ override protected def statePrefix = if (!resolved) "'" else super.statePrefix
+
/**
* Returns true if all its children of this query plan have been resolved.
*/
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index f8e9930ac2..14b03c7445 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -138,11 +138,6 @@ case class Aggregate(
child: LogicalPlan)
extends UnaryNode {
- /** The set of all AttributeReferences required for this aggregation. */
- def references =
- AttributeSet(
- groupingExpressions.flatMap(_.references) ++ aggregateExpressions.flatMap(_.references))
-
override def output = aggregateExpressions.map(_.toAttribute)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index a9535a750b..61be5ed2db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{SchemaRDD, Row}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.catalyst.types._
/**
* :: DeveloperApi ::
@@ -56,6 +57,23 @@ package object debug {
case _ =>
}
}
+
+ def typeCheck(): Unit = {
+ val plan = query.queryExecution.executedPlan
+ val visited = new collection.mutable.HashSet[TreeNodeRef]()
+ val debugPlan = plan transform {
+ case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
+ visited += new TreeNodeRef(s)
+ TypeCheck(s)
+ }
+ try {
+ println(s"Results returned: ${debugPlan.execute().count()}")
+ } catch {
+ case e: Exception =>
+ def unwrap(e: Throwable): Throwable = if (e.getCause == null) e else unwrap(e.getCause)
+ println(s"Deepest Error: ${unwrap(e)}")
+ }
+ }
}
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
@@ -115,4 +133,71 @@ package object debug {
}
}
}
+
+ /**
+ * :: DeveloperApi ::
+ * Helper functions for checking that runtime types match a given schema.
+ */
+ @DeveloperApi
+ object TypeCheck {
+ def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
+ case (null, _) =>
+
+ case (row: Row, StructType(fields)) =>
+ row.zip(fields.map(_.dataType)).foreach { case(d,t) => typeCheck(d,t) }
+ case (s: Seq[_], ArrayType(elemType, _)) =>
+ s.foreach(typeCheck(_, elemType))
+ case (m: Map[_, _], MapType(keyType, valueType, _)) =>
+ m.keys.foreach(typeCheck(_, keyType))
+ m.values.foreach(typeCheck(_, valueType))
+
+ case (_: Long, LongType) =>
+ case (_: Int, IntegerType) =>
+ case (_: String, StringType) =>
+ case (_: Float, FloatType) =>
+ case (_: Byte, ByteType) =>
+ case (_: Short, ShortType) =>
+ case (_: Boolean, BooleanType) =>
+ case (_: Double, DoubleType) =>
+
+ case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t")
+ }
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Augments SchemaRDDs with debug methods.
+ */
+ @DeveloperApi
+ private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
+ import TypeCheck._
+
+ override def nodeName = ""
+
+ /* Only required when defining this class in a REPL.
+ override def makeCopy(args: Array[Object]): this.type =
+ TypeCheck(args(0).asInstanceOf[SparkPlan]).asInstanceOf[this.type]
+ */
+
+ def output = child.output
+
+ def children = child :: Nil
+
+ def execute() = {
+ child.execute().map { row =>
+ try typeCheck(row, child.schema) catch {
+ case e: Exception =>
+ sys.error(
+ s"""
+ |ERROR WHEN TYPE CHECKING QUERY
+ |==============================
+ |$e
+ |======== BAD TREE ============
+ |$child
+ """.stripMargin)
+ }
+ row
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
new file mode 100644
index 0000000000..87c28c334d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.debug
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.test.TestSQLContext._
+
+class DebuggingSuite extends FunSuite {
+ test("SchemaRDD.debug()") {
+ testData.debug()
+ }
+
+ test("SchemaRDD.typeCheck()") {
+ testData.typeCheck()
+ }
+} \ No newline at end of file