aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-10-13 13:46:34 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-13 13:46:34 -0700
commit371321cadee7df39258bd374eb59c1e32451d96b (patch)
tree6b5fae061f90f4acb94ed8e58554ec91cccc88a3 /sql/core/src
parente10d71e7e58bf2ec0f1942cb2f0602396ab866b4 (diff)
downloadspark-371321cadee7df39258bd374eb59c1e32451d96b.tar.gz
spark-371321cadee7df39258bd374eb59c1e32451d96b.tar.bz2
spark-371321cadee7df39258bd374eb59c1e32451d96b.zip
[SQL] Add type checking debugging functions
Adds some functions that were very useful when trying to track down the bug from #2656. This change also changes the tree output for query plans to include the `'` prefix to unresolved nodes and `!` prefix to nodes that refer to non-existent attributes. Author: Michael Armbrust <michael@databricks.com> Closes #2657 from marmbrus/debugging and squashes the following commits: 654b926 [Michael Armbrust] Clean-up, add tests 763af15 [Michael Armbrust] Add typeChecking debugging functions 8c69303 [Michael Armbrust] Add inputSet, references to QueryPlan. Improve tree string with a prefix to denote invalid or unresolved nodes. fbeab54 [Michael Armbrust] Better toString, factories for AttributeSet.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala85
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala33
2 files changed, 118 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index a9535a750b..61be5ed2db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{SchemaRDD, Row}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.catalyst.types._
/**
* :: DeveloperApi ::
@@ -56,6 +57,23 @@ package object debug {
case _ =>
}
}
+
+ def typeCheck(): Unit = {
+ val plan = query.queryExecution.executedPlan
+ val visited = new collection.mutable.HashSet[TreeNodeRef]()
+ val debugPlan = plan transform {
+ case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
+ visited += new TreeNodeRef(s)
+ TypeCheck(s)
+ }
+ try {
+ println(s"Results returned: ${debugPlan.execute().count()}")
+ } catch {
+ case e: Exception =>
+ def unwrap(e: Throwable): Throwable = if (e.getCause == null) e else unwrap(e.getCause)
+ println(s"Deepest Error: ${unwrap(e)}")
+ }
+ }
}
private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
@@ -115,4 +133,71 @@ package object debug {
}
}
}
+
+ /**
+ * :: DeveloperApi ::
+ * Helper functions for checking that runtime types match a given schema.
+ */
+ @DeveloperApi
+ object TypeCheck {
+ def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
+ case (null, _) =>
+
+ case (row: Row, StructType(fields)) =>
+ row.zip(fields.map(_.dataType)).foreach { case(d,t) => typeCheck(d,t) }
+ case (s: Seq[_], ArrayType(elemType, _)) =>
+ s.foreach(typeCheck(_, elemType))
+ case (m: Map[_, _], MapType(keyType, valueType, _)) =>
+ m.keys.foreach(typeCheck(_, keyType))
+ m.values.foreach(typeCheck(_, valueType))
+
+ case (_: Long, LongType) =>
+ case (_: Int, IntegerType) =>
+ case (_: String, StringType) =>
+ case (_: Float, FloatType) =>
+ case (_: Byte, ByteType) =>
+ case (_: Short, ShortType) =>
+ case (_: Boolean, BooleanType) =>
+ case (_: Double, DoubleType) =>
+
+ case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t")
+ }
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Augments SchemaRDDs with debug methods.
+ */
+ @DeveloperApi
+ private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
+ import TypeCheck._
+
+ override def nodeName = ""
+
+ /* Only required when defining this class in a REPL.
+ override def makeCopy(args: Array[Object]): this.type =
+ TypeCheck(args(0).asInstanceOf[SparkPlan]).asInstanceOf[this.type]
+ */
+
+ def output = child.output
+
+ def children = child :: Nil
+
+ def execute() = {
+ child.execute().map { row =>
+ try typeCheck(row, child.schema) catch {
+ case e: Exception =>
+ sys.error(
+ s"""
+ |ERROR WHEN TYPE CHECKING QUERY
+ |==============================
+ |$e
+ |======== BAD TREE ============
+ |$child
+ """.stripMargin)
+ }
+ row
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
new file mode 100644
index 0000000000..87c28c334d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.debug
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.test.TestSQLContext._
+
+class DebuggingSuite extends FunSuite {
+ test("SchemaRDD.debug()") {
+ testData.debug()
+ }
+
+ test("SchemaRDD.typeCheck()") {
+ testData.typeCheck()
+ }
+} \ No newline at end of file