From c6e041d171e3d9882ab15e2bd7a7217dc19647f6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 9 Jun 2014 14:24:19 -0700 Subject: [SQL] Simple framework for debugging query execution Only records number of tuples and unique dataTypes output right now... Example: ```scala scala> import org.apache.spark.sql.execution.debug._ scala> hql("SELECT value FROM src WHERE key > 10").debug(sparkContext) Results returned: 489 == Project [value#1:0] == Tuples output: 489 value StringType: {java.lang.String} == Filter (key#0:1 > 10) == Tuples output: 489 value StringType: {java.lang.String} key IntegerType: {java.lang.Integer} == HiveTableScan [value#1,key#0], (MetastoreRelation default, src, None), None == Tuples output: 500 value StringType: {java.lang.String} key IntegerType: {java.lang.Integer} ``` Author: Michael Armbrust Closes #1005 from marmbrus/debug and squashes the following commits: dcc3ca6 [Michael Armbrust] Add comments. c9dded2 [Michael Armbrust] Simple framework for debugging query execution --- .../scala/org/apache/spark/sql/SQLContext.scala | 5 - .../org/apache/spark/sql/execution/debug.scala | 45 -------- .../apache/spark/sql/execution/debug/package.scala | 119 +++++++++++++++++++++ 3 files changed, 119 insertions(+), 50 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e371c82d81..5626f0da22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -285,11 +285,6 @@ class SQLContext(@transient val sparkContext: SparkContext) |== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim - - /** - * Runs the query after interposing operators that print the result of each intermediate step. - */ - def debugExec() = DebugQuery(executedPlan).execute().collect() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala deleted file mode 100644 index a0d29100f5..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -private[sql] object DebugQuery { - def apply(plan: SparkPlan): SparkPlan = { - val visited = new collection.mutable.HashSet[Long]() - plan transform { - case s: SparkPlan if !visited.contains(s.id) => - visited += s.id - DebugNode(s) - } - } -} - -private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode { - def references = Set.empty - def output = child.output - def execute() = { - val childRdd = child.execute() - println( - s""" - |========================= - |${child.simpleString} - |========================= - """.stripMargin) - childRdd.foreach(println(_)) - childRdd - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala new file mode 100644 index 0000000000..c6fbd6d2f6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.HashSet + +import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.SparkContext._ +import org.apache.spark.sql.{SchemaRDD, Row} + +/** + * :: DeveloperApi :: + * Contains methods for debugging query execution. + * + * Usage: + * {{{ + * sql("SELECT key FROM src").debug + * }}} + */ +package object debug { + + /** + * :: DeveloperApi :: + * Augments SchemaRDDs with debug methods. + */ + @DeveloperApi + implicit class DebugQuery(query: SchemaRDD) { + def debug(implicit sc: SparkContext): Unit = { + val plan = query.queryExecution.executedPlan + val visited = new collection.mutable.HashSet[Long]() + val debugPlan = plan transform { + case s: SparkPlan if !visited.contains(s.id) => + visited += s.id + DebugNode(sc, s) + } + println(s"Results returned: ${debugPlan.execute().count()}") + debugPlan.foreach { + case d: DebugNode => d.dumpStats() + case _ => + } + } + } + + private[sql] case class DebugNode( + @transient sparkContext: SparkContext, + child: SparkPlan) extends UnaryNode { + def references = Set.empty + + def output = child.output + + implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] { + def zero(initialValue: HashSet[String]): HashSet[String] = { + initialValue.clear() + initialValue + } + + def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = { + v1 ++= v2 + v1 + } + } + + /** + * A collection of stats for each column of output. + * @param elementTypes the actual runtime types for the output. Useful when there are bugs + * causing the wrong data to be projected. + */ + case class ColumnStat( + elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty)) + val tupleCount = sparkContext.accumulator[Int](0) + + val numColumns = child.output.size + val columnStats = Array.fill(child.output.size)(new ColumnStat()) + + def dumpStats(): Unit = { + println(s"== ${child.simpleString} ==") + println(s"Tuples output: ${tupleCount.value}") + child.output.zip(columnStats).foreach { case(attr, stat) => + val actualDataTypes =stat.elementTypes.value.mkString("{", ",", "}") + println(s" ${attr.name} ${attr.dataType}: $actualDataTypes") + } + } + + def execute() = { + child.execute().mapPartitions { iter => + new Iterator[Row] { + def hasNext = iter.hasNext + def next() = { + val currentRow = iter.next() + tupleCount += 1 + var i = 0 + while (i < numColumns) { + val value = currentRow(i) + columnStats(i).elementTypes += HashSet(value.getClass.getName) + i += 1 + } + currentRow + } + } + } + } + } +} -- cgit v1.2.3