aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-29 13:45:03 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-29 13:45:03 -0800
commite38b0baa38c6894335f187eaa4c8ea5c02d4563b (patch)
treee531add805da14e0315ff508346f02fb79938711 /sql
parent2b027e9a386fe4009f61ad03b169335af5a9a5c6 (diff)
downloadspark-e38b0baa38c6894335f187eaa4c8ea5c02d4563b.tar.gz
spark-e38b0baa38c6894335f187eaa4c8ea5c02d4563b.tar.bz2
spark-e38b0baa38c6894335f187eaa4c8ea5c02d4563b.zip
[SPARK-13055] SQLHistoryListener throws ClassCastException
This is an existing issue uncovered recently by #10835. The reason for the exception was because the `SQLHistoryListener` gets all sorts of accumulators, not just the ones that represent SQL metrics. For example, the listener gets the `internal.metrics.shuffleRead.remoteBlocksFetched`, which is an Int, then it proceeds to cast the Int to a Long, which fails. The fix is to mark accumulators representing SQL metrics using some internal metadata. Then we can identify which ones are SQL metrics and only process those in the `SQLHistoryListener`. Author: Andrew Or <andrew@databricks.com> Closes #10971 from andrewor14/fix-sql-history.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala24
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala38
4 files changed, 96 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 950dc78162..6b43d273fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.metric
import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext}
+import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
/**
@@ -27,9 +28,16 @@ import org.apache.spark.util.Utils
* An implementation of SQLMetric should override `+=` and `add` to avoid boxing.
*/
private[sql] abstract class SQLMetric[R <: SQLMetricValue[T], T](
- name: String, val param: SQLMetricParam[R, T])
+ name: String,
+ val param: SQLMetricParam[R, T])
extends Accumulable[R, T](param.zero, param, Some(name), internal = true) {
+ // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later
+ override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
+ new AccumulableInfo(id, Some(name), update, value, isInternal, countFailedValues,
+ Some(SQLMetrics.ACCUM_IDENTIFIER))
+ }
+
def reset(): Unit = {
this.value = param.zero
}
@@ -73,6 +81,14 @@ private[sql] class LongSQLMetricValue(private var _value : Long) extends SQLMetr
// Although there is a boxing here, it's fine because it's only called in SQLListener
override def value: Long = _value
+
+ // Needed for SQLListenerSuite
+ override def equals(other: Any): Boolean = {
+ other match {
+ case o: LongSQLMetricValue => value == o.value
+ case _ => false
+ }
+ }
}
/**
@@ -126,6 +142,9 @@ private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
private[sql] object SQLMetrics {
+ // Identifier for distinguishing SQL metrics from other accumulators
+ private[sql] val ACCUM_IDENTIFIER = "sql"
+
private def createLongMetric(
sc: SparkContext,
name: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 544606f116..835e7ba6c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -23,7 +23,7 @@ import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
-import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.metric._
import org.apache.spark.ui.SparkUI
@DeveloperApi
@@ -314,14 +314,17 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
+
+/**
+ * A [[SQLListener]] for rendering the SQL UI in the history server.
+ */
private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
extends SQLListener(conf) {
private var sqlTabAttached = false
- override def onExecutorMetricsUpdate(
- executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
- // Do nothing
+ override def onExecutorMetricsUpdate(u: SparkListenerExecutorMetricsUpdate): Unit = {
+ // Do nothing; these events are not logged
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
@@ -329,9 +332,15 @@ private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
- taskEnd.taskInfo.accumulables.map { a =>
- val newValue = new LongSQLMetricValue(a.update.map(_.asInstanceOf[Long]).getOrElse(0L))
- a.copy(update = Some(newValue))
+ taskEnd.taskInfo.accumulables.flatMap { a =>
+ // Filter out accumulators that are not SQL metrics
+ // For now we assume all SQL metrics are Long's that have been JSON serialized as String's
+ if (a.metadata.exists(_ == SQLMetrics.ACCUM_IDENTIFIER)) {
+ val newValue = new LongSQLMetricValue(a.update.map(_.toString.toLong).getOrElse(0L))
+ Some(a.copy(update = Some(newValue)))
+ } else {
+ None
+ }
},
finishTask = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 82f6811503..2260e48702 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{JsonProtocol, Utils}
class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
@@ -356,6 +356,28 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
}
}
+ test("metrics can be loaded by history server") {
+ val metric = new LongSQLMetric("zanzibar", LongSQLMetricParam)
+ metric += 10L
+ val metricInfo = metric.toInfo(Some(metric.localValue), None)
+ metricInfo.update match {
+ case Some(v: LongSQLMetricValue) => assert(v.value === 10L)
+ case Some(v) => fail(s"metric value was not a LongSQLMetricValue: ${v.getClass.getName}")
+ case _ => fail("metric update is missing")
+ }
+ assert(metricInfo.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
+ // After serializing to JSON, the original value type is lost, but we can still
+ // identify that it's a SQL metric from the metadata
+ val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo)
+ val metricInfoDeser = JsonProtocol.accumulableInfoFromJson(metricInfoJson)
+ metricInfoDeser.update match {
+ case Some(v: String) => assert(v.toLong === 10L)
+ case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}")
+ case _ => fail("deserialized metric update is missing")
+ }
+ assert(metricInfoDeser.metadata === Some(SQLMetrics.ACCUM_IDENTIFIER))
+ }
+
}
private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2c408c8878..085e4a49a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -26,8 +26,9 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
-import org.apache.spark.sql.execution.metric.LongSQLMetricValue
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.ui.SparkUI
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
import testImplicits._
@@ -335,8 +336,43 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
}
+ test("SPARK-13055: history listener only tracks SQL metrics") {
+ val listener = new SQLHistoryListener(sparkContext.conf, mock(classOf[SparkUI]))
+ // We need to post other events for the listener to track our accumulators.
+ // These are largely just boilerplate unrelated to what we're trying to test.
+ val df = createTestDataFrame
+ val executionStart = SparkListenerSQLExecutionStart(
+ 0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
+ val stageInfo = createStageInfo(0, 0)
+ val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
+ val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
+ // This task has both accumulators that are SQL metrics and accumulators that are not.
+ // The listener should only track the ones that are actually SQL metrics.
+ val sqlMetric = SQLMetrics.createLongMetric(sparkContext, "beach umbrella")
+ val nonSqlMetric = sparkContext.accumulator[Int](0, "baseball")
+ val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.localValue), None)
+ val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.localValue), None)
+ val taskInfo = createTaskInfo(0, 0)
+ taskInfo.accumulables ++= Seq(sqlMetricInfo, nonSqlMetricInfo)
+ val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
+ listener.onOtherEvent(executionStart)
+ listener.onJobStart(jobStart)
+ listener.onStageSubmitted(stageSubmitted)
+ // Before SPARK-13055, this throws ClassCastException because the history listener would
+ // assume that the accumulator value is of type Long, but this may not be true for
+ // accumulators that are not SQL metrics.
+ listener.onTaskEnd(taskEnd)
+ val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { stageMetrics =>
+ stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
+ }
+ // Listener tracks only SQL metrics, not other accumulators
+ assert(trackedAccums.size === 1)
+ assert(trackedAccums.head === sqlMetricInfo)
+ }
+
}
+
class SQLListenerMemoryLeakSuite extends SparkFunSuite {
test("no memory leak") {