aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-06-09 14:24:19 -0700
committerReynold Xin <rxin@apache.org>2014-06-09 14:24:19 -0700
commitc6e041d171e3d9882ab15e2bd7a7217dc19647f6 (patch)
tree513e584258e8ece81ad155cbd72d1bdeaea2939d /sql
parente273447684779a18bd61d733bfe7958b78657ffd (diff)
downloadspark-c6e041d171e3d9882ab15e2bd7a7217dc19647f6.tar.gz
spark-c6e041d171e3d9882ab15e2bd7a7217dc19647f6.tar.bz2
spark-c6e041d171e3d9882ab15e2bd7a7217dc19647f6.zip
[SQL] Simple framework for debugging query execution
Only records number of tuples and unique dataTypes output right now... Example: ```scala scala> import org.apache.spark.sql.execution.debug._ scala> hql("SELECT value FROM src WHERE key > 10").debug(sparkContext) Results returned: 489 == Project [value#1:0] == Tuples output: 489 value StringType: {java.lang.String} == Filter (key#0:1 > 10) == Tuples output: 489 value StringType: {java.lang.String} key IntegerType: {java.lang.Integer} == HiveTableScan [value#1,key#0], (MetastoreRelation default, src, None), None == Tuples output: 500 value StringType: {java.lang.String} key IntegerType: {java.lang.Integer} ``` Author: Michael Armbrust <michael@databricks.com> Closes #1005 from marmbrus/debug and squashes the following commits: dcc3ca6 [Michael Armbrust] Add comments. c9dded2 [Michael Armbrust] Simple framework for debugging query execution
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala119
3 files changed, 119 insertions, 50 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e371c82d81..5626f0da22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -285,11 +285,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
|== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
-
- /**
- * Runs the query after interposing operators that print the result of each intermediate step.
- */
- def debugExec() = DebugQuery(executedPlan).execute().collect()
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala
deleted file mode 100644
index a0d29100f5..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-private[sql] object DebugQuery {
- def apply(plan: SparkPlan): SparkPlan = {
- val visited = new collection.mutable.HashSet[Long]()
- plan transform {
- case s: SparkPlan if !visited.contains(s.id) =>
- visited += s.id
- DebugNode(s)
- }
- }
-}
-
-private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
- def references = Set.empty
- def output = child.output
- def execute() = {
- val childRdd = child.execute()
- println(
- s"""
- |=========================
- |${child.simpleString}
- |=========================
- """.stripMargin)
- childRdd.foreach(println(_))
- childRdd
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
new file mode 100644
index 0000000000..c6fbd6d2f6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.SparkContext._
+import org.apache.spark.sql.{SchemaRDD, Row}
+
+/**
+ * :: DeveloperApi ::
+ * Contains methods for debugging query execution.
+ *
+ * Usage:
+ * {{{
+ * sql("SELECT key FROM src").debug
+ * }}}
+ */
+package object debug {
+
+ /**
+ * :: DeveloperApi ::
+ * Augments SchemaRDDs with debug methods.
+ */
+ @DeveloperApi
+ implicit class DebugQuery(query: SchemaRDD) {
+ def debug(implicit sc: SparkContext): Unit = {
+ val plan = query.queryExecution.executedPlan
+ val visited = new collection.mutable.HashSet[Long]()
+ val debugPlan = plan transform {
+ case s: SparkPlan if !visited.contains(s.id) =>
+ visited += s.id
+ DebugNode(sc, s)
+ }
+ println(s"Results returned: ${debugPlan.execute().count()}")
+ debugPlan.foreach {
+ case d: DebugNode => d.dumpStats()
+ case _ =>
+ }
+ }
+ }
+
+ private[sql] case class DebugNode(
+ @transient sparkContext: SparkContext,
+ child: SparkPlan) extends UnaryNode {
+ def references = Set.empty
+
+ def output = child.output
+
+ implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
+ def zero(initialValue: HashSet[String]): HashSet[String] = {
+ initialValue.clear()
+ initialValue
+ }
+
+ def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
+ v1 ++= v2
+ v1
+ }
+ }
+
+ /**
+ * A collection of stats for each column of output.
+ * @param elementTypes the actual runtime types for the output. Useful when there are bugs
+ * causing the wrong data to be projected.
+ */
+ case class ColumnStat(
+ elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
+ val tupleCount = sparkContext.accumulator[Int](0)
+
+ val numColumns = child.output.size
+ val columnStats = Array.fill(child.output.size)(new ColumnStat())
+
+ def dumpStats(): Unit = {
+ println(s"== ${child.simpleString} ==")
+ println(s"Tuples output: ${tupleCount.value}")
+ child.output.zip(columnStats).foreach { case(attr, stat) =>
+ val actualDataTypes =stat.elementTypes.value.mkString("{", ",", "}")
+ println(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
+ }
+ }
+
+ def execute() = {
+ child.execute().mapPartitions { iter =>
+ new Iterator[Row] {
+ def hasNext = iter.hasNext
+ def next() = {
+ val currentRow = iter.next()
+ tupleCount += 1
+ var i = 0
+ while (i < numColumns) {
+ val value = currentRow(i)
+ columnStats(i).elementTypes += HashSet(value.getClass.getName)
+ i += 1
+ }
+ currentRow
+ }
+ }
+ }
+ }
+ }
+}