aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-19 21:20:24 -0700
committerReynold Xin <rxin@databricks.com>2016-04-19 21:20:24 -0700
commit85d759ca3aebb7d60b963207dcada83c75502e52 (patch)
treeb4fbaba9dfce1ae47485f318123881f42cd05e6c /core/src/main/scala
parent78b38109ed2fc20e97f9a968185d0c02ef83aa42 (diff)
downloadspark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.gz
spark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.bz2
spark-85d759ca3aebb7d60b963207dcada83c75502e52.zip
[SPARK-14704][CORE] create accumulators in TaskMetrics
## What changes were proposed in this pull request? Before this PR, we create accumulators at driver side(and register them) and send them to executor side, then we create `TaskMetrics` with these accumulators at executor side. After this PR, we will create `TaskMetrics` at driver side and send it to executor side, so that we can create accumulators inside `TaskMetrics` directly, which is cleaner. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12472 from cloud-fan/acc.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala127
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala175
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala41
17 files changed, 169 insertions, 384 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 714c8737a9..0b494c146f 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -17,17 +17,11 @@
package org.apache.spark
-import org.apache.spark.storage.{BlockId, BlockStatus}
-
-
/**
* A collection of fields and methods concerned with internal accumulators that represent
* task level metrics.
*/
private[spark] object InternalAccumulator {
-
- import AccumulatorParam._
-
// Prefixes used in names of internal task level metrics
val METRICS_PREFIX = "internal.metrics."
val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
@@ -79,125 +73,4 @@ private[spark] object InternalAccumulator {
}
// scalastyle:on
-
- /**
- * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]].
- */
- def create(name: String): Accumulator[_] = {
- require(name.startsWith(METRICS_PREFIX),
- s"internal accumulator name must start with '$METRICS_PREFIX': $name")
- getParam(name) match {
- case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
- case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
- case p @ StringAccumulatorParam => newMetric[String]("", name, p)
- case p @ UpdatedBlockStatusesAccumulatorParam =>
- newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
- case p => throw new IllegalArgumentException(
- s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.")
- }
- }
-
- /**
- * Get the [[AccumulatorParam]] associated with the internal metric name,
- * which must begin with [[METRICS_PREFIX]].
- */
- def getParam(name: String): AccumulatorParam[_] = {
- require(name.startsWith(METRICS_PREFIX),
- s"internal accumulator name must start with '$METRICS_PREFIX': $name")
- name match {
- case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
- case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
- case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
- case _ => LongAccumulatorParam
- }
- }
-
- /**
- * Accumulators for tracking internal metrics.
- */
- def createAll(): Seq[Accumulator[_]] = {
- Seq[String](
- EXECUTOR_DESERIALIZE_TIME,
- EXECUTOR_RUN_TIME,
- RESULT_SIZE,
- JVM_GC_TIME,
- RESULT_SERIALIZATION_TIME,
- MEMORY_BYTES_SPILLED,
- DISK_BYTES_SPILLED,
- PEAK_EXECUTION_MEMORY,
- UPDATED_BLOCK_STATUSES).map(create) ++
- createShuffleReadAccums() ++
- createShuffleWriteAccums() ++
- createInputAccums() ++
- createOutputAccums() ++
- sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq
- }
-
- /**
- * Accumulators for tracking shuffle read metrics.
- */
- def createShuffleReadAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- shuffleRead.REMOTE_BLOCKS_FETCHED,
- shuffleRead.LOCAL_BLOCKS_FETCHED,
- shuffleRead.REMOTE_BYTES_READ,
- shuffleRead.LOCAL_BYTES_READ,
- shuffleRead.FETCH_WAIT_TIME,
- shuffleRead.RECORDS_READ).map(create)
- }
-
- /**
- * Accumulators for tracking shuffle write metrics.
- */
- def createShuffleWriteAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- shuffleWrite.BYTES_WRITTEN,
- shuffleWrite.RECORDS_WRITTEN,
- shuffleWrite.WRITE_TIME).map(create)
- }
-
- /**
- * Accumulators for tracking input metrics.
- */
- def createInputAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- input.BYTES_READ,
- input.RECORDS_READ).map(create)
- }
-
- /**
- * Accumulators for tracking output metrics.
- */
- def createOutputAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- output.BYTES_WRITTEN,
- output.RECORDS_WRITTEN).map(create)
- }
-
- /**
- * Accumulators for tracking internal metrics.
- *
- * These accumulators are created with the stage such that all tasks in the stage will
- * add to the same set of accumulators. We do this to report the distribution of accumulator
- * values across all tasks within each stage.
- */
- def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
- val accums = createAll()
- accums.foreach { accum =>
- Accumulators.register(accum)
- sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
- }
- accums
- }
-
- /**
- * Create a new accumulator representing an internal task metric.
- */
- private def newMetric[T](
- initialValue: T,
- name: String,
- param: AccumulatorParam[T]): Accumulator[T] = {
- new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
- }
-
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 757c1b5116..e7940bd9ed 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -62,12 +62,11 @@ object TaskContext {
protected[spark] def unset(): Unit = taskContext.remove()
/**
- * An empty task context that does not represent an actual task.
+ * An empty task context that does not represent an actual task. This is only used in tests.
*/
private[spark] def empty(): TaskContextImpl = {
new TaskContextImpl(0, 0, 0, 0, null, new Properties, null)
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index fa0b2d3d28..e8f83c6d14 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -36,15 +36,10 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
- initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
+ override val taskMetrics: TaskMetrics = new TaskMetrics)
extends TaskContext
with Logging {
- /**
- * Metrics associated with this task.
- */
- override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators)
-
/** List of callback functions to execute when the task completes. */
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b20bd11f7d..650f05c309 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -293,16 +293,14 @@ private[spark] class Executor(
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
- for (m <- task.metrics) {
- // Deserialization happens in two parts: first, we deserialize a Task object, which
- // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
- m.setExecutorDeserializeTime(
- (taskStart - deserializeStartTime) + task.executorDeserializeTime)
- // We need to subtract Task.run()'s deserialization time to avoid double-counting
- m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
- m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- m.setResultSerializationTime(afterSerialization - beforeSerialization)
- }
+ // Deserialization happens in two parts: first, we deserialize a Task object, which
+ // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
+ task.metrics.setExecutorDeserializeTime(
+ (taskStart - deserializeStartTime) + task.executorDeserializeTime)
+ // We need to subtract Task.run()'s deserialization time to avoid double-counting
+ task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
+ task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
+ task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
@@ -357,10 +355,8 @@ private[spark] class Executor(
// Collect latest accumulator values to report back to the driver
val accumulatorUpdates: Seq[AccumulableInfo] =
if (task != null) {
- task.metrics.foreach { m =>
- m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
- m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- }
+ task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
+ task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty[AccumulableInfo]
@@ -485,11 +481,9 @@ private[spark] class Executor(
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
- taskRunner.task.metrics.foreach { metrics =>
- metrics.mergeShuffleReadMetrics()
- metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
- accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates()))
- }
+ taskRunner.task.metrics.mergeShuffleReadMetrics()
+ taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
+ accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulatorUpdates()))
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 0ec81d8d35..535352e7dd 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -39,14 +39,11 @@ object DataReadMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about reading data from external systems.
*/
@DeveloperApi
-class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long])
- extends Serializable {
+class InputMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ))
- }
+ private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ)
+ private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ)
/**
* Total number of bytes read.
@@ -61,5 +58,4 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
-
}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 5b36cc4739..586c98b156 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -38,14 +38,11 @@ object DataWriteMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about writing data to external systems.
*/
@DeveloperApi
-class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long])
- extends Serializable {
-
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN))
- }
+class OutputMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
+
+ private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN)
+ private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN)
/**
* Total number of bytes written.
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 47cfb74b9e..8e9a332b7c 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -27,38 +27,21 @@ import org.apache.spark.annotation.DeveloperApi
* Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleReadMetrics private (
- _remoteBlocksFetched: Accumulator[Int],
- _localBlocksFetched: Accumulator[Int],
- _remoteBytesRead: Accumulator[Long],
- _localBytesRead: Accumulator[Long],
- _fetchWaitTime: Accumulator[Long],
- _recordsRead: Accumulator[Long])
- extends Serializable {
-
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED),
- TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ))
- }
-
- /**
- * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task.
- *
- * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in
- * many places only to merge their values together later. In the future, we should revisit
- * whether this is needed.
- *
- * A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by
- * [[TaskMetrics.mergeShuffleReadMetrics]].
- */
- private[spark] def this() {
- this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap)
- }
+class ShuffleReadMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
+
+ private[executor] val _remoteBlocksFetched =
+ TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED)
+ private[executor] val _localBlocksFetched =
+ TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED)
+ private[executor] val _remoteBytesRead =
+ TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ)
+ private[executor] val _localBytesRead =
+ TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ)
+ private[executor] val _fetchWaitTime =
+ TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME)
+ private[executor] val _recordsRead =
+ TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ)
/**
* Number of remote blocks fetched in this shuffle by this task.
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 704dee747e..7326fba841 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -27,31 +27,15 @@ import org.apache.spark.annotation.DeveloperApi
* Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleWriteMetrics private (
- _bytesWritten: Accumulator[Long],
- _recordsWritten: Accumulator[Long],
- _writeTime: Accumulator[Long])
- extends Serializable {
+class ShuffleWriteMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME))
- }
-
- /**
- * Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task.
- *
- * This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in
- * many places only to merge their values together later. In the future, we should revisit
- * whether this is needed.
- *
- * A better alternative is [[TaskMetrics.shuffleWriteMetrics]].
- */
- private[spark] def this() {
- this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap)
- }
+ private[executor] val _bytesWritten =
+ TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN)
+ private[executor] val _recordsWritten =
+ TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN)
+ private[executor] val _writeTime =
+ TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME)
/**
* Number of bytes written for the shuffle by this task.
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0198364825..4558fbb4d9 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,10 +17,10 @@
package org.apache.spark.executor
-import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
+import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
@@ -39,65 +39,21 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* The accumulator updates are also sent to the driver periodically (on executor heartbeat)
* and when the task failed with an exception. The [[TaskMetrics]] object itself should never
* be sent to the driver.
- *
- * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on.
- * Each accumulator in this initial set must be uniquely named and marked
- * as internal. Additional accumulators registered later need not satisfy
- * these requirements.
*/
@DeveloperApi
-class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable {
+class TaskMetrics private[spark] () extends Serializable {
import InternalAccumulator._
- // Needed for Java tests
- def this() {
- this(InternalAccumulator.createAll())
- }
-
- /**
- * All accumulators registered with this task.
- */
- private val accums = new ArrayBuffer[Accumulable[_, _]]
- accums ++= initialAccums
-
- /**
- * A map for quickly accessing the initial set of accumulators by name.
- */
- private val initialAccumsMap: Map[String, Accumulator[_]] = {
- val map = new mutable.HashMap[String, Accumulator[_]]
- initialAccums.foreach { a =>
- val name = a.name.getOrElse {
- throw new IllegalArgumentException(
- "initial accumulators passed to TaskMetrics must be named")
- }
- require(a.isInternal,
- s"initial accumulator '$name' passed to TaskMetrics must be marked as internal")
- require(!map.contains(name),
- s"detected duplicate accumulator name '$name' when constructing TaskMetrics")
- map(name) = a
- }
- map.toMap
- }
-
// Each metric is internally represented as an accumulator
- private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME)
- private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME)
- private val _resultSize = getAccum(RESULT_SIZE)
- private val _jvmGCTime = getAccum(JVM_GC_TIME)
- private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME)
- private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED)
- private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED)
- private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY)
- private val _updatedBlockStatuses =
- TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES)
-
- private val _inputMetrics = new InputMetrics(initialAccumsMap)
-
- private val _outputMetrics = new OutputMetrics(initialAccumsMap)
-
- private val _shuffleReadMetrics = new ShuffleReadMetrics(initialAccumsMap)
-
- private val _shuffleWriteMetrics = new ShuffleWriteMetrics(initialAccumsMap)
+ private val _executorDeserializeTime = TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME)
+ private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME)
+ private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE)
+ private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME)
+ private val _resultSerializationTime = TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME)
+ private val _memoryBytesSpilled = TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED)
+ private val _diskBytesSpilled = TaskMetrics.createLongAccum(DISK_BYTES_SPILLED)
+ private val _peakExecutionMemory = TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY)
+ private val _updatedBlockStatuses = TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES)
/**
* Time taken on the executor to deserialize this task.
@@ -164,30 +120,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
_updatedBlockStatuses.setValue(v)
/**
- * Get a Long accumulator from the given map by name, assuming it exists.
- * Note: this only searches the initial set of accumulators passed into the constructor.
- */
- private[spark] def getAccum(name: String): Accumulator[Long] = {
- TaskMetrics.getAccum[Long](initialAccumsMap, name)
- }
-
- /**
* Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted
* data, defined only in tasks with input.
*/
- def inputMetrics: InputMetrics = _inputMetrics
+ val inputMetrics: InputMetrics = new InputMetrics()
/**
* Metrics related to writing data externally (e.g. to a distributed filesystem),
* defined only in tasks with output.
*/
- def outputMetrics: OutputMetrics = _outputMetrics
+ val outputMetrics: OutputMetrics = new OutputMetrics()
/**
* Metrics related to shuffle read aggregated across all shuffle dependencies.
* This is defined only if there are shuffle dependencies in this task.
*/
- def shuffleReadMetrics: ShuffleReadMetrics = _shuffleReadMetrics
+ val shuffleReadMetrics: ShuffleReadMetrics = new ShuffleReadMetrics()
+
+ /**
+ * Metrics related to shuffle write, defined only in shuffle map stages.
+ */
+ val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics()
/**
* Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
@@ -217,21 +170,45 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
- _shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics)
+ shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics)
}
}
- /**
- * Metrics related to shuffle write, defined only in shuffle map stages.
- */
- def shuffleWriteMetrics: ShuffleWriteMetrics = _shuffleWriteMetrics
+ // Only used for test
+ private[spark] val testAccum =
+ sys.props.get("spark.testing").map(_ => TaskMetrics.createLongAccum(TEST_ACCUM))
+
+ @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = {
+ val in = inputMetrics
+ val out = outputMetrics
+ val sr = shuffleReadMetrics
+ val sw = shuffleWriteMetrics
+ Seq(_executorDeserializeTime, _executorRunTime, _resultSize, _jvmGCTime,
+ _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, _peakExecutionMemory,
+ _updatedBlockStatuses, sr._remoteBlocksFetched, sr._localBlocksFetched, sr._remoteBytesRead,
+ sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, sw._bytesWritten, sw._recordsWritten,
+ sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, out._recordsWritten) ++
+ testAccum
+ }
/* ========================== *
| OTHER THINGS |
* ========================== */
+ private[spark] def registerAccums(sc: SparkContext): Unit = {
+ internalAccums.foreach { accum =>
+ Accumulators.register(accum)
+ sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
+ }
+ }
+
+ /**
+ * External accumulators registered with this task.
+ */
+ @transient private lazy val externalAccums = new ArrayBuffer[Accumulable[_, _]]
+
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
- accums += a
+ externalAccums += a
}
/**
@@ -242,7 +219,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
* not the aggregated value across multiple tasks.
*/
def accumulatorUpdates(): Seq[AccumulableInfo] = {
- accums.map { a => a.toInfo(Some(a.localValue), None) }
+ (internalAccums ++ externalAccums).map { a => a.toInfo(Some(a.localValue), None) }
}
}
@@ -256,9 +233,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
* UnsupportedOperationException, we choose not to do so because the overrides would quickly become
* out-of-date when new metrics are added.
*/
-private[spark] class ListenerTaskMetrics(
- initialAccums: Seq[Accumulator[_]],
- accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) {
+private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics {
override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
@@ -272,18 +247,25 @@ private[spark] object TaskMetrics extends Logging {
def empty: TaskMetrics = new TaskMetrics
/**
- * Get an accumulator from the given map by name, assuming it exists.
+ * Create a new accumulator representing an internal task metric.
*/
- def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = {
- require(accumMap.contains(name), s"metric '$name' is missing")
- val accum = accumMap(name)
- try {
- // Note: we can't do pattern matching here because types are erased by compile time
- accum.asInstanceOf[Accumulator[T]]
- } catch {
- case e: ClassCastException =>
- throw new SparkException(s"accumulator $name was of unexpected type", e)
- }
+ private def newMetric[T](
+ initialValue: T,
+ name: String,
+ param: AccumulatorParam[T]): Accumulator[T] = {
+ new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
+ }
+
+ def createLongAccum(name: String): Accumulator[Long] = {
+ newMetric(0L, name, LongAccumulatorParam)
+ }
+
+ def createIntAccum(name: String): Accumulator[Int] = {
+ newMetric(0, name, IntAccumulatorParam)
+ }
+
+ def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, BlockStatus)]] = {
+ newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam)
}
/**
@@ -297,18 +279,11 @@ private[spark] object TaskMetrics extends Logging {
* internal task level metrics.
*/
def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
- // Initial accumulators are passed into the TaskMetrics constructor first because these
- // are required to be uniquely named. The rest of the accumulators from this task are
- // registered later because they need not satisfy this requirement.
- val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined }
- val initialAccums = definedAccumUpdates
- .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
- .map { info =>
- val accum = InternalAccumulator.create(info.name.get)
- accum.setValueAny(info.update.get)
- accum
- }
- new ListenerTaskMetrics(initialAccums, definedAccumUpdates)
+ val definedAccumUpdates = accumUpdates.filter(_.update.isDefined)
+ val metrics = new ListenerTaskMetrics(definedAccumUpdates)
+ definedAccumUpdates.filter(_.internal).foreach { accum =>
+ metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get))
+ }
+ metrics
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c27aad268d..b7fb608ea5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1029,7 +1029,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties)
+ taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
@@ -1039,7 +1039,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators)
+ taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index db6276f75d..75c6018e21 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -23,6 +23,7 @@ import java.util.Properties
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
/**
@@ -40,9 +41,7 @@ import org.apache.spark.rdd.RDD
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
* @param localProperties copy of thread-local properties set by the user on the driver side.
- * @param _initialAccums initial set of accumulators to be used in this task for tracking
- * internal metrics. Other accumulators will be registered later when
- * they are deserialized on the executors.
+ * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
*/
private[spark] class ResultTask[T, U](
stageId: Int,
@@ -52,8 +51,8 @@ private[spark] class ResultTask[T, U](
locs: Seq[TaskLocation],
val outputId: Int,
localProperties: Properties,
- _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
- extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums, localProperties)
+ metrics: TaskMetrics)
+ extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
@@ -68,7 +67,6 @@ private[spark] class ResultTask[T, U](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
- metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index b7cab7013e..84b3e5ba6c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -24,6 +24,7 @@ import scala.language.existentials
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter
@@ -40,9 +41,7 @@ import org.apache.spark.shuffle.ShuffleWriter
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
- * @param _initialAccums initial set of accumulators to be used in this task for tracking
- * internal metrics. Other accumulators will be registered later when
- * they are deserialized on the executors.
+ * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
*/
private[spark] class ShuffleMapTask(
@@ -51,9 +50,9 @@ private[spark] class ShuffleMapTask(
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
- _initialAccums: Seq[Accumulator[_]],
+ metrics: TaskMetrics,
localProperties: Properties)
- extends Task[MapStatus](stageId, stageAttemptId, partition.index, _initialAccums, localProperties)
+ extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties)
with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
@@ -73,7 +72,6 @@ private[spark] class ShuffleMapTask(
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
- metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index b6d4e39fe5..d5cf6b82e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import scala.collection.mutable.HashSet
import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.util.CallSite
@@ -110,9 +111,10 @@ private[scheduler] abstract class Stage(
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
+ val metrics = new TaskMetrics
+ metrics.registerAccums(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
- this, nextAttemptId, Some(numPartitionsToCompute),
- InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences)
+ this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 0fd58c41cd..58349fe250 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -19,8 +19,8 @@ package org.apache.spark.scheduler
import scala.collection.mutable.HashMap
-import org.apache.spark.Accumulator
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.RDDInfo
/**
@@ -36,7 +36,7 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
- val internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
+ val taskMetrics: TaskMetrics = new TaskMetrics,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -81,7 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
- internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
+ taskMetrics: TaskMetrics = new TaskMetrics,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
@@ -94,7 +94,7 @@ private[spark] object StageInfo {
rddInfos,
stage.parents.map(_.id),
stage.details,
- internalAccumulators,
+ taskMetrics,
taskLocalityPreferences)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1ff9d7795f..9f2fa02c69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -23,7 +23,7 @@ import java.util.Properties
import scala.collection.mutable.HashMap
-import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl}
+import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
@@ -44,17 +44,17 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
* @param stageId id of the stage this task belongs to
* @param stageAttemptId attempt id of the stage this task belongs to
* @param partitionId index of the number in the RDD
- * @param initialAccumulators initial set of accumulators to be used in this task for tracking
- * internal metrics. Other accumulators will be registered later when
- * they are deserialized on the executors.
+ * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
+ *
+ * The default values for `metrics` and `localProperties` are used by tests only.
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
- val initialAccumulators: Seq[Accumulator[_]],
- @transient var localProperties: Properties) extends Serializable {
+ val metrics: TaskMetrics = new TaskMetrics,
+ @transient var localProperties: Properties = new Properties) extends Serializable {
/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
@@ -76,7 +76,7 @@ private[spark] abstract class Task[T](
taskMemoryManager,
localProperties,
metricsSystem,
- initialAccumulators)
+ metrics)
TaskContext.setTaskContext(context)
taskThread = Thread.currentThread()
if (_killed) {
@@ -128,8 +128,6 @@ private[spark] abstract class Task[T](
// Map output tracker epoch. Will be set by TaskScheduler.
var epoch: Long = -1
- var metrics: Option[TaskMetrics] = None
-
// Task context, to be initialized in run().
@transient protected var context: TaskContextImpl = _
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index bd4797ae8e..645e2d2e36 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
- new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty))
+ new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
}
val activeStages = Buffer[StageInfo]()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 38ca3224ff..6c50c72a91 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -304,20 +304,17 @@ private[spark] object JsonProtocol {
* The behavior here must match that of [[accumValueFromJson]]. Exposed for testing.
*/
private[util] def accumValueToJson(name: Option[String], value: Any): JValue = {
- import AccumulatorParam._
if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
- (value, InternalAccumulator.getParam(name.get)) match {
- case (v: Int, IntAccumulatorParam) => JInt(v)
- case (v: Long, LongAccumulatorParam) => JInt(v)
- case (v: String, StringAccumulatorParam) => JString(v)
- case (v, UpdatedBlockStatusesAccumulatorParam) =>
+ value match {
+ case v: Int => JInt(v)
+ case v: Long => JInt(v)
+ // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
+ // the blocks accumulator, whose type is `Seq[(BlockId, BlockStatus)]`
+ case v =>
JArray(v.asInstanceOf[Seq[(BlockId, BlockStatus)]].toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
- case (v, p) =>
- throw new IllegalArgumentException(s"unexpected combination of accumulator value " +
- s"type (${v.getClass.getName}) and param (${p.getClass.getName}) in '${name.get}'")
}
} else {
// For all external accumulators, just use strings
@@ -569,7 +566,7 @@ private[spark] object JsonProtocol {
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
stageIds.map { id =>
- new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty)
+ new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")
}
}
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
@@ -678,7 +675,7 @@ private[spark] object JsonProtocol {
}
val stageInfo = new StageInfo(
- stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty)
+ stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
@@ -735,25 +732,21 @@ private[spark] object JsonProtocol {
* The behavior here must match that of [[accumValueToJson]]. Exposed for testing.
*/
private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = {
- import AccumulatorParam._
if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
- (value, InternalAccumulator.getParam(name.get)) match {
- case (JInt(v), IntAccumulatorParam) => v.toInt
- case (JInt(v), LongAccumulatorParam) => v.toLong
- case (JString(v), StringAccumulatorParam) => v
- case (JArray(v), UpdatedBlockStatusesAccumulatorParam) =>
+ value match {
+ case JInt(v) => v.toLong
+ case JArray(v) =>
v.map { blockJson =>
val id = BlockId((blockJson \ "Block ID").extract[String])
val status = blockStatusFromJson(blockJson \ "Status")
(id, status)
}
- case (v, p) =>
- throw new IllegalArgumentException(s"unexpected combination of accumulator " +
- s"value in JSON ($v) and accumulator param (${p.getClass.getName}) in '${name.get}'")
- }
- } else {
- value.extract[String]
- }
+ case _ => throw new IllegalArgumentException(s"unexpected json value $value for " +
+ "accumulator " + name.get)
+ }
+ } else {
+ value.extract[String]
+ }
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {