aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-01-29 13:45:03 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-29 13:45:03 -0800
commite38b0baa38c6894335f187eaa4c8ea5c02d4563b (patch)
treee531add805da14e0315ff508346f02fb79938711 /core
parent2b027e9a386fe4009f61ad03b169335af5a9a5c6 (diff)
downloadspark-e38b0baa38c6894335f187eaa4c8ea5c02d4563b.tar.gz
spark-e38b0baa38c6894335f187eaa4c8ea5c02d4563b.tar.bz2
spark-e38b0baa38c6894335f187eaa4c8ea5c02d4563b.zip
[SPARK-13055] SQLHistoryListener throws ClassCastException
This is an existing issue uncovered recently by #10835. The reason for the exception was because the `SQLHistoryListener` gets all sorts of accumulators, not just the ones that represent SQL metrics. For example, the listener gets the `internal.metrics.shuffleRead.remoteBlocksFetched`, which is an Int, then it proceeds to cast the Int to a Long, which fails. The fix is to mark accumulators representing SQL metrics using some internal metadata. Then we can identify which ones are SQL metrics and only process those in the `SQLHistoryListener`. Author: Andrew Or <andrew@databricks.com> Closes #10971 from andrewor14/fix-sql-history.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala16
9 files changed, 37 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index 52f572b63f..601b503d12 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, Serializable}
import scala.collection.generic.Growable
import scala.reflect.ClassTag
+import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
@@ -187,6 +188,13 @@ class Accumulable[R, T] private (
*/
private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }
+ /**
+ * Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values.
+ */
+ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
+ new AccumulableInfo(id, name, update, value, internal, countFailedValues)
+ }
+
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 8d10bf588e..0a6ebcb3e0 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -323,8 +323,8 @@ class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
* field is always empty, since this represents the partial updates recorded in this task,
* not the aggregated value across multiple tasks.
*/
- def accumulatorUpdates(): Seq[AccumulableInfo] = accums.map { a =>
- new AccumulableInfo(a.id, a.name, Some(a.localValue), None, a.isInternal, a.countFailedValues)
+ def accumulatorUpdates(): Seq[AccumulableInfo] = {
+ accums.map { a => a.toInfo(Some(a.localValue), None) }
}
// If we are reconstructing this TaskMetrics on the driver, some metrics may already be set.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index 9d45fff921..cedacad44a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -35,6 +35,7 @@ import org.apache.spark.annotation.DeveloperApi
* @param value total accumulated value so far, maybe None if used on executors to describe a task
* @param internal whether this accumulator was internal
* @param countFailedValues whether to count this accumulator's partial value if the task failed
+ * @param metadata internal metadata associated with this accumulator, if any
*/
@DeveloperApi
case class AccumulableInfo private[spark] (
@@ -43,7 +44,9 @@ case class AccumulableInfo private[spark] (
update: Option[Any], // represents a partial update within a task
value: Option[Any],
private[spark] val internal: Boolean,
- private[spark] val countFailedValues: Boolean)
+ private[spark] val countFailedValues: Boolean,
+ // TODO: use this to identify internal task metrics instead of encoding it in the name
+ private[spark] val metadata: Option[String] = None)
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 897479b500..ee0b8a1c95 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1101,11 +1101,8 @@ class DAGScheduler(
acc ++= partialValue
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
- val name = acc.name
- stage.latestInfo.accumulables(id) = new AccumulableInfo(
- id, name, None, Some(acc.value), acc.isInternal, acc.countFailedValues)
- event.taskInfo.accumulables += new AccumulableInfo(
- id, name, Some(partialValue), Some(acc.value), acc.isInternal, acc.countFailedValues)
+ stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value))
+ event.taskInfo.accumulables += acc.toInfo(Some(partialValue), Some(acc.value))
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index dc8070cf8a..a2487eeb04 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -290,7 +290,8 @@ private[spark] object JsonProtocol {
("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~
("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~
("Internal" -> accumulableInfo.internal) ~
- ("Count Failed Values" -> accumulableInfo.countFailedValues)
+ ("Count Failed Values" -> accumulableInfo.countFailedValues) ~
+ ("Metadata" -> accumulableInfo.metadata)
}
/**
@@ -728,7 +729,8 @@ private[spark] object JsonProtocol {
val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
- new AccumulableInfo(id, name, update, value, internal, countFailedValues)
+ val metadata = (json \ "Metadata").extractOpt[String]
+ new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 15be0b194e..67c4595ed1 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -551,8 +551,6 @@ private[spark] object TaskMetricsSuite extends Assertions {
* Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
* info as an accumulator update.
*/
- def makeInfo(a: Accumulable[_, _]): AccumulableInfo = {
- new AccumulableInfo(a.id, a.name, Some(a.value), None, a.isInternal, a.countFailedValues)
- }
+ def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d9c71ec2ea..62972a0738 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1581,12 +1581,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(Accumulators.get(acc1.id).isDefined)
assert(Accumulators.get(acc2.id).isDefined)
assert(Accumulators.get(acc3.id).isDefined)
- val accInfo1 = new AccumulableInfo(
- acc1.id, acc1.name, Some(15L), None, internal = false, countFailedValues = false)
- val accInfo2 = new AccumulableInfo(
- acc2.id, acc2.name, Some(13L), None, internal = false, countFailedValues = false)
- val accInfo3 = new AccumulableInfo(
- acc3.id, acc3.name, Some(18L), None, internal = false, countFailedValues = false)
+ val accInfo1 = acc1.toInfo(Some(15L), None)
+ val accInfo2 = acc2.toInfo(Some(13L), None)
+ val accInfo3 = acc3.toInfo(Some(18L), None)
val accumUpdates = Seq(accInfo1, accInfo2, accInfo3)
val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates)
submit(new MyRDD(sc, 1, Nil), Array(0))
@@ -1954,10 +1951,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
val accumUpdates = reason match {
- case Success =>
- task.initialAccumulators.map { a =>
- new AccumulableInfo(a.id, a.name, Some(a.zero), None, a.isInternal, a.countFailedValues)
- }
+ case Success => task.initialAccumulators.map { a => a.toInfo(Some(a.zero), None) }
case ef: ExceptionFailure => ef.accumUpdates
case _ => Seq.empty[AccumulableInfo]
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index a2e7436564..2c99dd5afb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -165,9 +165,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
- val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a =>
- new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
- }
+ val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a => a.toInfo(Some(0L), None) }
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
@@ -186,9 +184,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
- task.initialAccumulators.map { a =>
- new AccumulableInfo(a.id, a.name, Some(0L), None, a.isInternal, a.countFailedValues)
- }
+ task.initialAccumulators.map { a => a.toInfo(Some(0L), None) }
}
// First three offers should all find tasks
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 57021d1d3d..48951c3168 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -374,15 +374,18 @@ class JsonProtocolSuite extends SparkFunSuite {
test("AccumulableInfo backward compatibility") {
// "Internal" property of AccumulableInfo was added in 1.5.1
val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true)
- val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
- .removeField({ _._1 == "Internal" })
+ val accumulableInfoJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
+ val oldJson = accumulableInfoJson.removeField({ _._1 == "Internal" })
val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
assert(!oldInfo.internal)
// "Count Failed Values" property of AccumulableInfo was added in 2.0.0
- val oldJson2 = JsonProtocol.accumulableInfoToJson(accumulableInfo)
- .removeField({ _._1 == "Count Failed Values" })
+ val oldJson2 = accumulableInfoJson.removeField({ _._1 == "Count Failed Values" })
val oldInfo2 = JsonProtocol.accumulableInfoFromJson(oldJson2)
assert(!oldInfo2.countFailedValues)
+ // "Metadata" property of AccumulableInfo was added in 2.0.0
+ val oldJson3 = accumulableInfoJson.removeField({ _._1 == "Metadata" })
+ val oldInfo3 = JsonProtocol.accumulableInfoFromJson(oldJson3)
+ assert(oldInfo3.metadata.isEmpty)
}
test("ExceptionFailure backward compatibility: accumulator updates") {
@@ -820,9 +823,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
private def makeAccumulableInfo(
id: Int,
internal: Boolean = false,
- countFailedValues: Boolean = false): AccumulableInfo =
+ countFailedValues: Boolean = false,
+ metadata: Option[String] = None): AccumulableInfo =
new AccumulableInfo(id, Some(s"Accumulable$id"), Some(s"delta$id"), Some(s"val$id"),
- internal, countFailedValues)
+ internal, countFailedValues, metadata)
/**
* Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is