diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-04-19 21:20:24 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-19 21:20:24 -0700 |
commit | 85d759ca3aebb7d60b963207dcada83c75502e52 (patch) | |
tree | b4fbaba9dfce1ae47485f318123881f42cd05e6c /core/src/main/scala | |
parent | 78b38109ed2fc20e97f9a968185d0c02ef83aa42 (diff) | |
download | spark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.gz spark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.bz2 spark-85d759ca3aebb7d60b963207dcada83c75502e52.zip |
[SPARK-14704][CORE] create accumulators in TaskMetrics
## What changes were proposed in this pull request?
Before this PR, we create accumulators at driver side(and register them) and send them to executor side, then we create `TaskMetrics` with these accumulators at executor side.
After this PR, we will create `TaskMetrics` at driver side and send it to executor side, so that we can create accumulators inside `TaskMetrics` directly, which is cleaner.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12472 from cloud-fan/acc.
Diffstat (limited to 'core/src/main/scala')
17 files changed, 169 insertions, 384 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 714c8737a9..0b494c146f 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -17,17 +17,11 @@ package org.apache.spark -import org.apache.spark.storage.{BlockId, BlockStatus} - - /** * A collection of fields and methods concerned with internal accumulators that represent * task level metrics. */ private[spark] object InternalAccumulator { - - import AccumulatorParam._ - // Prefixes used in names of internal task level metrics val METRICS_PREFIX = "internal.metrics." val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read." @@ -79,125 +73,4 @@ private[spark] object InternalAccumulator { } // scalastyle:on - - /** - * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]]. - */ - def create(name: String): Accumulator[_] = { - require(name.startsWith(METRICS_PREFIX), - s"internal accumulator name must start with '$METRICS_PREFIX': $name") - getParam(name) match { - case p @ LongAccumulatorParam => newMetric[Long](0L, name, p) - case p @ IntAccumulatorParam => newMetric[Int](0, name, p) - case p @ StringAccumulatorParam => newMetric[String]("", name, p) - case p @ UpdatedBlockStatusesAccumulatorParam => - newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p) - case p => throw new IllegalArgumentException( - s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.") - } - } - - /** - * Get the [[AccumulatorParam]] associated with the internal metric name, - * which must begin with [[METRICS_PREFIX]]. - */ - def getParam(name: String): AccumulatorParam[_] = { - require(name.startsWith(METRICS_PREFIX), - s"internal accumulator name must start with '$METRICS_PREFIX': $name") - name match { - case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam - case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam - case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam - case _ => LongAccumulatorParam - } - } - - /** - * Accumulators for tracking internal metrics. - */ - def createAll(): Seq[Accumulator[_]] = { - Seq[String]( - EXECUTOR_DESERIALIZE_TIME, - EXECUTOR_RUN_TIME, - RESULT_SIZE, - JVM_GC_TIME, - RESULT_SERIALIZATION_TIME, - MEMORY_BYTES_SPILLED, - DISK_BYTES_SPILLED, - PEAK_EXECUTION_MEMORY, - UPDATED_BLOCK_STATUSES).map(create) ++ - createShuffleReadAccums() ++ - createShuffleWriteAccums() ++ - createInputAccums() ++ - createOutputAccums() ++ - sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq - } - - /** - * Accumulators for tracking shuffle read metrics. - */ - def createShuffleReadAccums(): Seq[Accumulator[_]] = { - Seq[String]( - shuffleRead.REMOTE_BLOCKS_FETCHED, - shuffleRead.LOCAL_BLOCKS_FETCHED, - shuffleRead.REMOTE_BYTES_READ, - shuffleRead.LOCAL_BYTES_READ, - shuffleRead.FETCH_WAIT_TIME, - shuffleRead.RECORDS_READ).map(create) - } - - /** - * Accumulators for tracking shuffle write metrics. - */ - def createShuffleWriteAccums(): Seq[Accumulator[_]] = { - Seq[String]( - shuffleWrite.BYTES_WRITTEN, - shuffleWrite.RECORDS_WRITTEN, - shuffleWrite.WRITE_TIME).map(create) - } - - /** - * Accumulators for tracking input metrics. - */ - def createInputAccums(): Seq[Accumulator[_]] = { - Seq[String]( - input.BYTES_READ, - input.RECORDS_READ).map(create) - } - - /** - * Accumulators for tracking output metrics. - */ - def createOutputAccums(): Seq[Accumulator[_]] = { - Seq[String]( - output.BYTES_WRITTEN, - output.RECORDS_WRITTEN).map(create) - } - - /** - * Accumulators for tracking internal metrics. - * - * These accumulators are created with the stage such that all tasks in the stage will - * add to the same set of accumulators. We do this to report the distribution of accumulator - * values across all tasks within each stage. - */ - def createAll(sc: SparkContext): Seq[Accumulator[_]] = { - val accums = createAll() - accums.foreach { accum => - Accumulators.register(accum) - sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) - } - accums - } - - /** - * Create a new accumulator representing an internal task metric. - */ - private def newMetric[T]( - initialValue: T, - name: String, - param: AccumulatorParam[T]): Accumulator[T] = { - new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true) - } - } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 757c1b5116..e7940bd9ed 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -62,12 +62,11 @@ object TaskContext { protected[spark] def unset(): Unit = taskContext.remove() /** - * An empty task context that does not represent an actual task. + * An empty task context that does not represent an actual task. This is only used in tests. */ private[spark] def empty(): TaskContextImpl = { new TaskContextImpl(0, 0, 0, 0, null, new Properties, null) } - } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index fa0b2d3d28..e8f83c6d14 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -36,15 +36,10 @@ private[spark] class TaskContextImpl( override val taskMemoryManager: TaskMemoryManager, localProperties: Properties, @transient private val metricsSystem: MetricsSystem, - initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll()) + override val taskMetrics: TaskMetrics = new TaskMetrics) extends TaskContext with Logging { - /** - * Metrics associated with this task. - */ - override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators) - /** List of callback functions to execute when the task completes. */ @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener] diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b20bd11f7d..650f05c309 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -293,16 +293,14 @@ private[spark] class Executor( val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() - for (m <- task.metrics) { - // Deserialization happens in two parts: first, we deserialize a Task object, which - // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. - m.setExecutorDeserializeTime( - (taskStart - deserializeStartTime) + task.executorDeserializeTime) - // We need to subtract Task.run()'s deserialization time to avoid double-counting - m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) - m.setJvmGCTime(computeTotalGcTime() - startGCTime) - m.setResultSerializationTime(afterSerialization - beforeSerialization) - } + // Deserialization happens in two parts: first, we deserialize a Task object, which + // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. + task.metrics.setExecutorDeserializeTime( + (taskStart - deserializeStartTime) + task.executorDeserializeTime) + // We need to subtract Task.run()'s deserialization time to avoid double-counting + task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization) // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() @@ -357,10 +355,8 @@ private[spark] class Executor( // Collect latest accumulator values to report back to the driver val accumulatorUpdates: Seq[AccumulableInfo] = if (task != null) { - task.metrics.foreach { m => - m.setExecutorRunTime(System.currentTimeMillis() - taskStart) - m.setJvmGCTime(computeTotalGcTime() - startGCTime) - } + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.collectAccumulatorUpdates(taskFailed = true) } else { Seq.empty[AccumulableInfo] @@ -485,11 +481,9 @@ private[spark] class Executor( for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { - taskRunner.task.metrics.foreach { metrics => - metrics.mergeShuffleReadMetrics() - metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates())) - } + taskRunner.task.metrics.mergeShuffleReadMetrics() + taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) + accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulatorUpdates())) } } diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 0ec81d8d35..535352e7dd 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -39,14 +39,11 @@ object DataReadMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about reading data from external systems. */ @DeveloperApi -class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long]) - extends Serializable { +class InputMetrics private[spark] () extends Serializable { + import InternalAccumulator._ - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ)) - } + private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ) + private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ) /** * Total number of bytes read. @@ -61,5 +58,4 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v) private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v) - } diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index 5b36cc4739..586c98b156 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -38,14 +38,11 @@ object DataWriteMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about writing data to external systems. */ @DeveloperApi -class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long]) - extends Serializable { - - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN)) - } +class OutputMetrics private[spark] () extends Serializable { + import InternalAccumulator._ + + private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN) + private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN) /** * Total number of bytes written. diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 47cfb74b9e..8e9a332b7c 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -27,38 +27,21 @@ import org.apache.spark.annotation.DeveloperApi * Operations are not thread-safe. */ @DeveloperApi -class ShuffleReadMetrics private ( - _remoteBlocksFetched: Accumulator[Int], - _localBlocksFetched: Accumulator[Int], - _remoteBytesRead: Accumulator[Long], - _localBytesRead: Accumulator[Long], - _fetchWaitTime: Accumulator[Long], - _recordsRead: Accumulator[Long]) - extends Serializable { - - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED), - TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ)) - } - - /** - * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task. - * - * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in - * many places only to merge their values together later. In the future, we should revisit - * whether this is needed. - * - * A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by - * [[TaskMetrics.mergeShuffleReadMetrics]]. - */ - private[spark] def this() { - this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap) - } +class ShuffleReadMetrics private[spark] () extends Serializable { + import InternalAccumulator._ + + private[executor] val _remoteBlocksFetched = + TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED) + private[executor] val _localBlocksFetched = + TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED) + private[executor] val _remoteBytesRead = + TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ) + private[executor] val _localBytesRead = + TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ) + private[executor] val _fetchWaitTime = + TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME) + private[executor] val _recordsRead = + TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ) /** * Number of remote blocks fetched in this shuffle by this task. diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 704dee747e..7326fba841 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -27,31 +27,15 @@ import org.apache.spark.annotation.DeveloperApi * Operations are not thread-safe. */ @DeveloperApi -class ShuffleWriteMetrics private ( - _bytesWritten: Accumulator[Long], - _recordsWritten: Accumulator[Long], - _writeTime: Accumulator[Long]) - extends Serializable { +class ShuffleWriteMetrics private[spark] () extends Serializable { + import InternalAccumulator._ - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME)) - } - - /** - * Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task. - * - * This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in - * many places only to merge their values together later. In the future, we should revisit - * whether this is needed. - * - * A better alternative is [[TaskMetrics.shuffleWriteMetrics]]. - */ - private[spark] def this() { - this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap) - } + private[executor] val _bytesWritten = + TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN) + private[executor] val _recordsWritten = + TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN) + private[executor] val _writeTime = + TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME) /** * Number of bytes written for the shuffle by this task. diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 0198364825..4558fbb4d9 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,10 +17,10 @@ package org.apache.spark.executor -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark._ +import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo @@ -39,65 +39,21 @@ import org.apache.spark.storage.{BlockId, BlockStatus} * The accumulator updates are also sent to the driver periodically (on executor heartbeat) * and when the task failed with an exception. The [[TaskMetrics]] object itself should never * be sent to the driver. - * - * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on. - * Each accumulator in this initial set must be uniquely named and marked - * as internal. Additional accumulators registered later need not satisfy - * these requirements. */ @DeveloperApi -class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable { +class TaskMetrics private[spark] () extends Serializable { import InternalAccumulator._ - // Needed for Java tests - def this() { - this(InternalAccumulator.createAll()) - } - - /** - * All accumulators registered with this task. - */ - private val accums = new ArrayBuffer[Accumulable[_, _]] - accums ++= initialAccums - - /** - * A map for quickly accessing the initial set of accumulators by name. - */ - private val initialAccumsMap: Map[String, Accumulator[_]] = { - val map = new mutable.HashMap[String, Accumulator[_]] - initialAccums.foreach { a => - val name = a.name.getOrElse { - throw new IllegalArgumentException( - "initial accumulators passed to TaskMetrics must be named") - } - require(a.isInternal, - s"initial accumulator '$name' passed to TaskMetrics must be marked as internal") - require(!map.contains(name), - s"detected duplicate accumulator name '$name' when constructing TaskMetrics") - map(name) = a - } - map.toMap - } - // Each metric is internally represented as an accumulator - private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME) - private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME) - private val _resultSize = getAccum(RESULT_SIZE) - private val _jvmGCTime = getAccum(JVM_GC_TIME) - private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME) - private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED) - private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED) - private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY) - private val _updatedBlockStatuses = - TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES) - - private val _inputMetrics = new InputMetrics(initialAccumsMap) - - private val _outputMetrics = new OutputMetrics(initialAccumsMap) - - private val _shuffleReadMetrics = new ShuffleReadMetrics(initialAccumsMap) - - private val _shuffleWriteMetrics = new ShuffleWriteMetrics(initialAccumsMap) + private val _executorDeserializeTime = TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME) + private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME) + private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE) + private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME) + private val _resultSerializationTime = TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME) + private val _memoryBytesSpilled = TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED) + private val _diskBytesSpilled = TaskMetrics.createLongAccum(DISK_BYTES_SPILLED) + private val _peakExecutionMemory = TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY) + private val _updatedBlockStatuses = TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES) /** * Time taken on the executor to deserialize this task. @@ -164,30 +120,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se _updatedBlockStatuses.setValue(v) /** - * Get a Long accumulator from the given map by name, assuming it exists. - * Note: this only searches the initial set of accumulators passed into the constructor. - */ - private[spark] def getAccum(name: String): Accumulator[Long] = { - TaskMetrics.getAccum[Long](initialAccumsMap, name) - } - - /** * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted * data, defined only in tasks with input. */ - def inputMetrics: InputMetrics = _inputMetrics + val inputMetrics: InputMetrics = new InputMetrics() /** * Metrics related to writing data externally (e.g. to a distributed filesystem), * defined only in tasks with output. */ - def outputMetrics: OutputMetrics = _outputMetrics + val outputMetrics: OutputMetrics = new OutputMetrics() /** * Metrics related to shuffle read aggregated across all shuffle dependencies. * This is defined only if there are shuffle dependencies in this task. */ - def shuffleReadMetrics: ShuffleReadMetrics = _shuffleReadMetrics + val shuffleReadMetrics: ShuffleReadMetrics = new ShuffleReadMetrics() + + /** + * Metrics related to shuffle write, defined only in shuffle map stages. + */ + val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() /** * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency. @@ -217,21 +170,45 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se */ private[spark] def mergeShuffleReadMetrics(): Unit = synchronized { if (tempShuffleReadMetrics.nonEmpty) { - _shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics) + shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics) } } - /** - * Metrics related to shuffle write, defined only in shuffle map stages. - */ - def shuffleWriteMetrics: ShuffleWriteMetrics = _shuffleWriteMetrics + // Only used for test + private[spark] val testAccum = + sys.props.get("spark.testing").map(_ => TaskMetrics.createLongAccum(TEST_ACCUM)) + + @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = { + val in = inputMetrics + val out = outputMetrics + val sr = shuffleReadMetrics + val sw = shuffleWriteMetrics + Seq(_executorDeserializeTime, _executorRunTime, _resultSize, _jvmGCTime, + _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, _peakExecutionMemory, + _updatedBlockStatuses, sr._remoteBlocksFetched, sr._localBlocksFetched, sr._remoteBytesRead, + sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, sw._bytesWritten, sw._recordsWritten, + sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, out._recordsWritten) ++ + testAccum + } /* ========================== * | OTHER THINGS | * ========================== */ + private[spark] def registerAccums(sc: SparkContext): Unit = { + internalAccums.foreach { accum => + Accumulators.register(accum) + sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) + } + } + + /** + * External accumulators registered with this task. + */ + @transient private lazy val externalAccums = new ArrayBuffer[Accumulable[_, _]] + private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = { - accums += a + externalAccums += a } /** @@ -242,7 +219,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se * not the aggregated value across multiple tasks. */ def accumulatorUpdates(): Seq[AccumulableInfo] = { - accums.map { a => a.toInfo(Some(a.localValue), None) } + (internalAccums ++ externalAccums).map { a => a.toInfo(Some(a.localValue), None) } } } @@ -256,9 +233,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se * UnsupportedOperationException, we choose not to do so because the overrides would quickly become * out-of-date when new metrics are added. */ -private[spark] class ListenerTaskMetrics( - initialAccums: Seq[Accumulator[_]], - accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) { +private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics { override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates @@ -272,18 +247,25 @@ private[spark] object TaskMetrics extends Logging { def empty: TaskMetrics = new TaskMetrics /** - * Get an accumulator from the given map by name, assuming it exists. + * Create a new accumulator representing an internal task metric. */ - def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = { - require(accumMap.contains(name), s"metric '$name' is missing") - val accum = accumMap(name) - try { - // Note: we can't do pattern matching here because types are erased by compile time - accum.asInstanceOf[Accumulator[T]] - } catch { - case e: ClassCastException => - throw new SparkException(s"accumulator $name was of unexpected type", e) - } + private def newMetric[T]( + initialValue: T, + name: String, + param: AccumulatorParam[T]): Accumulator[T] = { + new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true) + } + + def createLongAccum(name: String): Accumulator[Long] = { + newMetric(0L, name, LongAccumulatorParam) + } + + def createIntAccum(name: String): Accumulator[Int] = { + newMetric(0, name, IntAccumulatorParam) + } + + def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, BlockStatus)]] = { + newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam) } /** @@ -297,18 +279,11 @@ private[spark] object TaskMetrics extends Logging { * internal task level metrics. */ def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = { - // Initial accumulators are passed into the TaskMetrics constructor first because these - // are required to be uniquely named. The rest of the accumulators from this task are - // registered later because they need not satisfy this requirement. - val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined } - val initialAccums = definedAccumUpdates - .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) } - .map { info => - val accum = InternalAccumulator.create(info.name.get) - accum.setValueAny(info.update.get) - accum - } - new ListenerTaskMetrics(initialAccums, definedAccumUpdates) + val definedAccumUpdates = accumUpdates.filter(_.update.isDefined) + val metrics = new ListenerTaskMetrics(definedAccumUpdates) + definedAccumUpdates.filter(_.internal).foreach { accum => + metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get)) + } + metrics } - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c27aad268d..b7fb608ea5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1029,7 +1029,7 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties) + taskBinary, part, locs, stage.latestInfo.taskMetrics, properties) } case stage: ResultStage => @@ -1039,7 +1039,7 @@ class DAGScheduler( val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators) + taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index db6276f75d..75c6018e21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -23,6 +23,7 @@ import java.util.Properties import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD /** @@ -40,9 +41,7 @@ import org.apache.spark.rdd.RDD * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). * @param localProperties copy of thread-local properties set by the user on the driver side. - * @param _initialAccums initial set of accumulators to be used in this task for tracking - * internal metrics. Other accumulators will be registered later when - * they are deserialized on the executors. + * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. */ private[spark] class ResultTask[T, U]( stageId: Int, @@ -52,8 +51,8 @@ private[spark] class ResultTask[T, U]( locs: Seq[TaskLocation], val outputId: Int, localProperties: Properties, - _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll()) - extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums, localProperties) + metrics: TaskMetrics) + extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { @@ -68,7 +67,6 @@ private[spark] class ResultTask[T, U]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime - metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index b7cab7013e..84b3e5ba6c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -24,6 +24,7 @@ import scala.language.existentials import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.ShuffleWriter @@ -40,9 +41,7 @@ import org.apache.spark.shuffle.ShuffleWriter * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling - * @param _initialAccums initial set of accumulators to be used in this task for tracking - * internal metrics. Other accumulators will be registered later when - * they are deserialized on the executors. + * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. */ private[spark] class ShuffleMapTask( @@ -51,9 +50,9 @@ private[spark] class ShuffleMapTask( taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation], - _initialAccums: Seq[Accumulator[_]], + metrics: TaskMetrics, localProperties: Properties) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, _initialAccums, localProperties) + extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ @@ -73,7 +72,6 @@ private[spark] class ShuffleMapTask( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime - metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index b6d4e39fe5..d5cf6b82e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import scala.collection.mutable.HashSet import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.util.CallSite @@ -110,9 +111,10 @@ private[scheduler] abstract class Stage( def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { + val metrics = new TaskMetrics + metrics.registerAccums(rdd.sparkContext) _latestInfo = StageInfo.fromStage( - this, nextAttemptId, Some(numPartitionsToCompute), - InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences) + this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 0fd58c41cd..58349fe250 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -19,8 +19,8 @@ package org.apache.spark.scheduler import scala.collection.mutable.HashMap -import org.apache.spark.Accumulator import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.RDDInfo /** @@ -36,7 +36,7 @@ class StageInfo( val rddInfos: Seq[RDDInfo], val parentIds: Seq[Int], val details: String, - val internalAccumulators: Seq[Accumulator[_]] = Seq.empty, + val taskMetrics: TaskMetrics = new TaskMetrics, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None @@ -81,7 +81,7 @@ private[spark] object StageInfo { stage: Stage, attemptId: Int, numTasks: Option[Int] = None, - internalAccumulators: Seq[Accumulator[_]] = Seq.empty, + taskMetrics: TaskMetrics = new TaskMetrics, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) @@ -94,7 +94,7 @@ private[spark] object StageInfo { rddInfos, stage.parents.map(_.id), stage.details, - internalAccumulators, + taskMetrics, taskLocalityPreferences) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1ff9d7795f..9f2fa02c69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -23,7 +23,7 @@ import java.util.Properties import scala.collection.mutable.HashMap -import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl} +import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem @@ -44,17 +44,17 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to * @param partitionId index of the number in the RDD - * @param initialAccumulators initial set of accumulators to be used in this task for tracking - * internal metrics. Other accumulators will be registered later when - * they are deserialized on the executors. + * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * + * The default values for `metrics` and `localProperties` are used by tests only. */ private[spark] abstract class Task[T]( val stageId: Int, val stageAttemptId: Int, val partitionId: Int, - val initialAccumulators: Seq[Accumulator[_]], - @transient var localProperties: Properties) extends Serializable { + val metrics: TaskMetrics = new TaskMetrics, + @transient var localProperties: Properties = new Properties) extends Serializable { /** * Called by [[org.apache.spark.executor.Executor]] to run this task. @@ -76,7 +76,7 @@ private[spark] abstract class Task[T]( taskMemoryManager, localProperties, metricsSystem, - initialAccumulators) + metrics) TaskContext.setTaskContext(context) taskThread = Thread.currentThread() if (_killed) { @@ -128,8 +128,6 @@ private[spark] abstract class Task[T]( // Map output tracker epoch. Will be set by TaskScheduler. var epoch: Long = -1 - var metrics: Option[TaskMetrics] = None - // Task context, to be initialized in run(). @transient protected var context: TaskContextImpl = _ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index bd4797ae8e..645e2d2e36 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { // This could be empty if the JobProgressListener hasn't received information about the // stage or if the stage information has been garbage collected listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty)) + new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) } val activeStages = Buffer[StageInfo]() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 38ca3224ff..6c50c72a91 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -304,20 +304,17 @@ private[spark] object JsonProtocol { * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing. */ private[util] def accumValueToJson(name: Option[String], value: Any): JValue = { - import AccumulatorParam._ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { - (value, InternalAccumulator.getParam(name.get)) match { - case (v: Int, IntAccumulatorParam) => JInt(v) - case (v: Long, LongAccumulatorParam) => JInt(v) - case (v: String, StringAccumulatorParam) => JString(v) - case (v, UpdatedBlockStatusesAccumulatorParam) => + value match { + case v: Int => JInt(v) + case v: Long => JInt(v) + // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be + // the blocks accumulator, whose type is `Seq[(BlockId, BlockStatus)]` + case v => JArray(v.asInstanceOf[Seq[(BlockId, BlockStatus)]].toList.map { case (id, status) => ("Block ID" -> id.toString) ~ ("Status" -> blockStatusToJson(status)) }) - case (v, p) => - throw new IllegalArgumentException(s"unexpected combination of accumulator value " + - s"type (${v.getClass.getName}) and param (${p.getClass.getName}) in '${name.get}'") } } else { // For all external accumulators, just use strings @@ -569,7 +566,7 @@ private[spark] object JsonProtocol { val stageInfos = Utils.jsonOption(json \ "Stage Infos") .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { stageIds.map { id => - new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty) + new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown") } } SparkListenerJobStart(jobId, submissionTime, stageInfos, properties) @@ -678,7 +675,7 @@ private[spark] object JsonProtocol { } val stageInfo = new StageInfo( - stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty) + stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason @@ -735,25 +732,21 @@ private[spark] object JsonProtocol { * The behavior here must match that of [[accumValueToJson]]. Exposed for testing. */ private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = { - import AccumulatorParam._ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { - (value, InternalAccumulator.getParam(name.get)) match { - case (JInt(v), IntAccumulatorParam) => v.toInt - case (JInt(v), LongAccumulatorParam) => v.toLong - case (JString(v), StringAccumulatorParam) => v - case (JArray(v), UpdatedBlockStatusesAccumulatorParam) => + value match { + case JInt(v) => v.toLong + case JArray(v) => v.map { blockJson => val id = BlockId((blockJson \ "Block ID").extract[String]) val status = blockStatusFromJson(blockJson \ "Status") (id, status) } - case (v, p) => - throw new IllegalArgumentException(s"unexpected combination of accumulator " + - s"value in JSON ($v) and accumulator param (${p.getClass.getName}) in '${name.get}'") - } - } else { - value.extract[String] - } + case _ => throw new IllegalArgumentException(s"unexpected json value $value for " + + "accumulator " + name.get) + } + } else { + value.extract[String] + } } def taskMetricsFromJson(json: JValue): TaskMetrics = { |