diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-04-19 21:20:24 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-19 21:20:24 -0700 |
commit | 85d759ca3aebb7d60b963207dcada83c75502e52 (patch) | |
tree | b4fbaba9dfce1ae47485f318123881f42cd05e6c /core | |
parent | 78b38109ed2fc20e97f9a968185d0c02ef83aa42 (diff) | |
download | spark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.gz spark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.bz2 spark-85d759ca3aebb7d60b963207dcada83c75502e52.zip |
[SPARK-14704][CORE] create accumulators in TaskMetrics
## What changes were proposed in this pull request?
Before this PR, we create accumulators at driver side(and register them) and send them to executor side, then we create `TaskMetrics` with these accumulators at executor side.
After this PR, we will create `TaskMetrics` at driver side and send it to executor side, so that we can create accumulators inside `TaskMetrics` directly, which is cleaner.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12472 from cloud-fan/acc.
Diffstat (limited to 'core')
29 files changed, 265 insertions, 773 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 714c8737a9..0b494c146f 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -17,17 +17,11 @@ package org.apache.spark -import org.apache.spark.storage.{BlockId, BlockStatus} - - /** * A collection of fields and methods concerned with internal accumulators that represent * task level metrics. */ private[spark] object InternalAccumulator { - - import AccumulatorParam._ - // Prefixes used in names of internal task level metrics val METRICS_PREFIX = "internal.metrics." val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read." @@ -79,125 +73,4 @@ private[spark] object InternalAccumulator { } // scalastyle:on - - /** - * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]]. - */ - def create(name: String): Accumulator[_] = { - require(name.startsWith(METRICS_PREFIX), - s"internal accumulator name must start with '$METRICS_PREFIX': $name") - getParam(name) match { - case p @ LongAccumulatorParam => newMetric[Long](0L, name, p) - case p @ IntAccumulatorParam => newMetric[Int](0, name, p) - case p @ StringAccumulatorParam => newMetric[String]("", name, p) - case p @ UpdatedBlockStatusesAccumulatorParam => - newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p) - case p => throw new IllegalArgumentException( - s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.") - } - } - - /** - * Get the [[AccumulatorParam]] associated with the internal metric name, - * which must begin with [[METRICS_PREFIX]]. - */ - def getParam(name: String): AccumulatorParam[_] = { - require(name.startsWith(METRICS_PREFIX), - s"internal accumulator name must start with '$METRICS_PREFIX': $name") - name match { - case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam - case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam - case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam - case _ => LongAccumulatorParam - } - } - - /** - * Accumulators for tracking internal metrics. - */ - def createAll(): Seq[Accumulator[_]] = { - Seq[String]( - EXECUTOR_DESERIALIZE_TIME, - EXECUTOR_RUN_TIME, - RESULT_SIZE, - JVM_GC_TIME, - RESULT_SERIALIZATION_TIME, - MEMORY_BYTES_SPILLED, - DISK_BYTES_SPILLED, - PEAK_EXECUTION_MEMORY, - UPDATED_BLOCK_STATUSES).map(create) ++ - createShuffleReadAccums() ++ - createShuffleWriteAccums() ++ - createInputAccums() ++ - createOutputAccums() ++ - sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq - } - - /** - * Accumulators for tracking shuffle read metrics. - */ - def createShuffleReadAccums(): Seq[Accumulator[_]] = { - Seq[String]( - shuffleRead.REMOTE_BLOCKS_FETCHED, - shuffleRead.LOCAL_BLOCKS_FETCHED, - shuffleRead.REMOTE_BYTES_READ, - shuffleRead.LOCAL_BYTES_READ, - shuffleRead.FETCH_WAIT_TIME, - shuffleRead.RECORDS_READ).map(create) - } - - /** - * Accumulators for tracking shuffle write metrics. - */ - def createShuffleWriteAccums(): Seq[Accumulator[_]] = { - Seq[String]( - shuffleWrite.BYTES_WRITTEN, - shuffleWrite.RECORDS_WRITTEN, - shuffleWrite.WRITE_TIME).map(create) - } - - /** - * Accumulators for tracking input metrics. - */ - def createInputAccums(): Seq[Accumulator[_]] = { - Seq[String]( - input.BYTES_READ, - input.RECORDS_READ).map(create) - } - - /** - * Accumulators for tracking output metrics. - */ - def createOutputAccums(): Seq[Accumulator[_]] = { - Seq[String]( - output.BYTES_WRITTEN, - output.RECORDS_WRITTEN).map(create) - } - - /** - * Accumulators for tracking internal metrics. - * - * These accumulators are created with the stage such that all tasks in the stage will - * add to the same set of accumulators. We do this to report the distribution of accumulator - * values across all tasks within each stage. - */ - def createAll(sc: SparkContext): Seq[Accumulator[_]] = { - val accums = createAll() - accums.foreach { accum => - Accumulators.register(accum) - sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) - } - accums - } - - /** - * Create a new accumulator representing an internal task metric. - */ - private def newMetric[T]( - initialValue: T, - name: String, - param: AccumulatorParam[T]): Accumulator[T] = { - new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true) - } - } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 757c1b5116..e7940bd9ed 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -62,12 +62,11 @@ object TaskContext { protected[spark] def unset(): Unit = taskContext.remove() /** - * An empty task context that does not represent an actual task. + * An empty task context that does not represent an actual task. This is only used in tests. */ private[spark] def empty(): TaskContextImpl = { new TaskContextImpl(0, 0, 0, 0, null, new Properties, null) } - } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index fa0b2d3d28..e8f83c6d14 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -36,15 +36,10 @@ private[spark] class TaskContextImpl( override val taskMemoryManager: TaskMemoryManager, localProperties: Properties, @transient private val metricsSystem: MetricsSystem, - initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll()) + override val taskMetrics: TaskMetrics = new TaskMetrics) extends TaskContext with Logging { - /** - * Metrics associated with this task. - */ - override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators) - /** List of callback functions to execute when the task completes. */ @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener] diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b20bd11f7d..650f05c309 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -293,16 +293,14 @@ private[spark] class Executor( val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() - for (m <- task.metrics) { - // Deserialization happens in two parts: first, we deserialize a Task object, which - // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. - m.setExecutorDeserializeTime( - (taskStart - deserializeStartTime) + task.executorDeserializeTime) - // We need to subtract Task.run()'s deserialization time to avoid double-counting - m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) - m.setJvmGCTime(computeTotalGcTime() - startGCTime) - m.setResultSerializationTime(afterSerialization - beforeSerialization) - } + // Deserialization happens in two parts: first, we deserialize a Task object, which + // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. + task.metrics.setExecutorDeserializeTime( + (taskStart - deserializeStartTime) + task.executorDeserializeTime) + // We need to subtract Task.run()'s deserialization time to avoid double-counting + task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) + task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization) // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() @@ -357,10 +355,8 @@ private[spark] class Executor( // Collect latest accumulator values to report back to the driver val accumulatorUpdates: Seq[AccumulableInfo] = if (task != null) { - task.metrics.foreach { m => - m.setExecutorRunTime(System.currentTimeMillis() - taskStart) - m.setJvmGCTime(computeTotalGcTime() - startGCTime) - } + task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart) + task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) task.collectAccumulatorUpdates(taskFailed = true) } else { Seq.empty[AccumulableInfo] @@ -485,11 +481,9 @@ private[spark] class Executor( for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { - taskRunner.task.metrics.foreach { metrics => - metrics.mergeShuffleReadMetrics() - metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates())) - } + taskRunner.task.metrics.mergeShuffleReadMetrics() + taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) + accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulatorUpdates())) } } diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 0ec81d8d35..535352e7dd 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -39,14 +39,11 @@ object DataReadMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about reading data from external systems. */ @DeveloperApi -class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long]) - extends Serializable { +class InputMetrics private[spark] () extends Serializable { + import InternalAccumulator._ - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ)) - } + private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ) + private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ) /** * Total number of bytes read. @@ -61,5 +58,4 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v) private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v) - } diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index 5b36cc4739..586c98b156 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -38,14 +38,11 @@ object DataWriteMethod extends Enumeration with Serializable { * A collection of accumulators that represents metrics about writing data to external systems. */ @DeveloperApi -class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long]) - extends Serializable { - - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN)) - } +class OutputMetrics private[spark] () extends Serializable { + import InternalAccumulator._ + + private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN) + private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN) /** * Total number of bytes written. diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 47cfb74b9e..8e9a332b7c 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -27,38 +27,21 @@ import org.apache.spark.annotation.DeveloperApi * Operations are not thread-safe. */ @DeveloperApi -class ShuffleReadMetrics private ( - _remoteBlocksFetched: Accumulator[Int], - _localBlocksFetched: Accumulator[Int], - _remoteBytesRead: Accumulator[Long], - _localBytesRead: Accumulator[Long], - _fetchWaitTime: Accumulator[Long], - _recordsRead: Accumulator[Long]) - extends Serializable { - - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED), - TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ)) - } - - /** - * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task. - * - * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in - * many places only to merge their values together later. In the future, we should revisit - * whether this is needed. - * - * A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by - * [[TaskMetrics.mergeShuffleReadMetrics]]. - */ - private[spark] def this() { - this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap) - } +class ShuffleReadMetrics private[spark] () extends Serializable { + import InternalAccumulator._ + + private[executor] val _remoteBlocksFetched = + TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED) + private[executor] val _localBlocksFetched = + TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED) + private[executor] val _remoteBytesRead = + TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ) + private[executor] val _localBytesRead = + TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ) + private[executor] val _fetchWaitTime = + TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME) + private[executor] val _recordsRead = + TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ) /** * Number of remote blocks fetched in this shuffle by this task. diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 704dee747e..7326fba841 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{Accumulator, InternalAccumulator} +import org.apache.spark.InternalAccumulator import org.apache.spark.annotation.DeveloperApi @@ -27,31 +27,15 @@ import org.apache.spark.annotation.DeveloperApi * Operations are not thread-safe. */ @DeveloperApi -class ShuffleWriteMetrics private ( - _bytesWritten: Accumulator[Long], - _recordsWritten: Accumulator[Long], - _writeTime: Accumulator[Long]) - extends Serializable { +class ShuffleWriteMetrics private[spark] () extends Serializable { + import InternalAccumulator._ - private[executor] def this(accumMap: Map[String, Accumulator[_]]) { - this( - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN), - TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME)) - } - - /** - * Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task. - * - * This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in - * many places only to merge their values together later. In the future, we should revisit - * whether this is needed. - * - * A better alternative is [[TaskMetrics.shuffleWriteMetrics]]. - */ - private[spark] def this() { - this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap) - } + private[executor] val _bytesWritten = + TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN) + private[executor] val _recordsWritten = + TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN) + private[executor] val _writeTime = + TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME) /** * Number of bytes written for the shuffle by this task. diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 0198364825..4558fbb4d9 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,10 +17,10 @@ package org.apache.spark.executor -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark._ +import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo @@ -39,65 +39,21 @@ import org.apache.spark.storage.{BlockId, BlockStatus} * The accumulator updates are also sent to the driver periodically (on executor heartbeat) * and when the task failed with an exception. The [[TaskMetrics]] object itself should never * be sent to the driver. - * - * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on. - * Each accumulator in this initial set must be uniquely named and marked - * as internal. Additional accumulators registered later need not satisfy - * these requirements. */ @DeveloperApi -class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable { +class TaskMetrics private[spark] () extends Serializable { import InternalAccumulator._ - // Needed for Java tests - def this() { - this(InternalAccumulator.createAll()) - } - - /** - * All accumulators registered with this task. - */ - private val accums = new ArrayBuffer[Accumulable[_, _]] - accums ++= initialAccums - - /** - * A map for quickly accessing the initial set of accumulators by name. - */ - private val initialAccumsMap: Map[String, Accumulator[_]] = { - val map = new mutable.HashMap[String, Accumulator[_]] - initialAccums.foreach { a => - val name = a.name.getOrElse { - throw new IllegalArgumentException( - "initial accumulators passed to TaskMetrics must be named") - } - require(a.isInternal, - s"initial accumulator '$name' passed to TaskMetrics must be marked as internal") - require(!map.contains(name), - s"detected duplicate accumulator name '$name' when constructing TaskMetrics") - map(name) = a - } - map.toMap - } - // Each metric is internally represented as an accumulator - private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME) - private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME) - private val _resultSize = getAccum(RESULT_SIZE) - private val _jvmGCTime = getAccum(JVM_GC_TIME) - private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME) - private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED) - private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED) - private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY) - private val _updatedBlockStatuses = - TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES) - - private val _inputMetrics = new InputMetrics(initialAccumsMap) - - private val _outputMetrics = new OutputMetrics(initialAccumsMap) - - private val _shuffleReadMetrics = new ShuffleReadMetrics(initialAccumsMap) - - private val _shuffleWriteMetrics = new ShuffleWriteMetrics(initialAccumsMap) + private val _executorDeserializeTime = TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME) + private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME) + private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE) + private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME) + private val _resultSerializationTime = TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME) + private val _memoryBytesSpilled = TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED) + private val _diskBytesSpilled = TaskMetrics.createLongAccum(DISK_BYTES_SPILLED) + private val _peakExecutionMemory = TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY) + private val _updatedBlockStatuses = TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES) /** * Time taken on the executor to deserialize this task. @@ -164,30 +120,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se _updatedBlockStatuses.setValue(v) /** - * Get a Long accumulator from the given map by name, assuming it exists. - * Note: this only searches the initial set of accumulators passed into the constructor. - */ - private[spark] def getAccum(name: String): Accumulator[Long] = { - TaskMetrics.getAccum[Long](initialAccumsMap, name) - } - - /** * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted * data, defined only in tasks with input. */ - def inputMetrics: InputMetrics = _inputMetrics + val inputMetrics: InputMetrics = new InputMetrics() /** * Metrics related to writing data externally (e.g. to a distributed filesystem), * defined only in tasks with output. */ - def outputMetrics: OutputMetrics = _outputMetrics + val outputMetrics: OutputMetrics = new OutputMetrics() /** * Metrics related to shuffle read aggregated across all shuffle dependencies. * This is defined only if there are shuffle dependencies in this task. */ - def shuffleReadMetrics: ShuffleReadMetrics = _shuffleReadMetrics + val shuffleReadMetrics: ShuffleReadMetrics = new ShuffleReadMetrics() + + /** + * Metrics related to shuffle write, defined only in shuffle map stages. + */ + val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() /** * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency. @@ -217,21 +170,45 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se */ private[spark] def mergeShuffleReadMetrics(): Unit = synchronized { if (tempShuffleReadMetrics.nonEmpty) { - _shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics) + shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics) } } - /** - * Metrics related to shuffle write, defined only in shuffle map stages. - */ - def shuffleWriteMetrics: ShuffleWriteMetrics = _shuffleWriteMetrics + // Only used for test + private[spark] val testAccum = + sys.props.get("spark.testing").map(_ => TaskMetrics.createLongAccum(TEST_ACCUM)) + + @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = { + val in = inputMetrics + val out = outputMetrics + val sr = shuffleReadMetrics + val sw = shuffleWriteMetrics + Seq(_executorDeserializeTime, _executorRunTime, _resultSize, _jvmGCTime, + _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, _peakExecutionMemory, + _updatedBlockStatuses, sr._remoteBlocksFetched, sr._localBlocksFetched, sr._remoteBytesRead, + sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, sw._bytesWritten, sw._recordsWritten, + sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, out._recordsWritten) ++ + testAccum + } /* ========================== * | OTHER THINGS | * ========================== */ + private[spark] def registerAccums(sc: SparkContext): Unit = { + internalAccums.foreach { accum => + Accumulators.register(accum) + sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) + } + } + + /** + * External accumulators registered with this task. + */ + @transient private lazy val externalAccums = new ArrayBuffer[Accumulable[_, _]] + private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = { - accums += a + externalAccums += a } /** @@ -242,7 +219,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se * not the aggregated value across multiple tasks. */ def accumulatorUpdates(): Seq[AccumulableInfo] = { - accums.map { a => a.toInfo(Some(a.localValue), None) } + (internalAccums ++ externalAccums).map { a => a.toInfo(Some(a.localValue), None) } } } @@ -256,9 +233,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se * UnsupportedOperationException, we choose not to do so because the overrides would quickly become * out-of-date when new metrics are added. */ -private[spark] class ListenerTaskMetrics( - initialAccums: Seq[Accumulator[_]], - accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) { +private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics { override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates @@ -272,18 +247,25 @@ private[spark] object TaskMetrics extends Logging { def empty: TaskMetrics = new TaskMetrics /** - * Get an accumulator from the given map by name, assuming it exists. + * Create a new accumulator representing an internal task metric. */ - def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = { - require(accumMap.contains(name), s"metric '$name' is missing") - val accum = accumMap(name) - try { - // Note: we can't do pattern matching here because types are erased by compile time - accum.asInstanceOf[Accumulator[T]] - } catch { - case e: ClassCastException => - throw new SparkException(s"accumulator $name was of unexpected type", e) - } + private def newMetric[T]( + initialValue: T, + name: String, + param: AccumulatorParam[T]): Accumulator[T] = { + new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true) + } + + def createLongAccum(name: String): Accumulator[Long] = { + newMetric(0L, name, LongAccumulatorParam) + } + + def createIntAccum(name: String): Accumulator[Int] = { + newMetric(0, name, IntAccumulatorParam) + } + + def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, BlockStatus)]] = { + newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam) } /** @@ -297,18 +279,11 @@ private[spark] object TaskMetrics extends Logging { * internal task level metrics. */ def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = { - // Initial accumulators are passed into the TaskMetrics constructor first because these - // are required to be uniquely named. The rest of the accumulators from this task are - // registered later because they need not satisfy this requirement. - val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined } - val initialAccums = definedAccumUpdates - .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) } - .map { info => - val accum = InternalAccumulator.create(info.name.get) - accum.setValueAny(info.update.get) - accum - } - new ListenerTaskMetrics(initialAccums, definedAccumUpdates) + val definedAccumUpdates = accumUpdates.filter(_.update.isDefined) + val metrics = new ListenerTaskMetrics(definedAccumUpdates) + definedAccumUpdates.filter(_.internal).foreach { accum => + metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get)) + } + metrics } - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c27aad268d..b7fb608ea5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1029,7 +1029,7 @@ class DAGScheduler( val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties) + taskBinary, part, locs, stage.latestInfo.taskMetrics, properties) } case stage: ResultStage => @@ -1039,7 +1039,7 @@ class DAGScheduler( val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, - taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators) + taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index db6276f75d..75c6018e21 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -23,6 +23,7 @@ import java.util.Properties import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD /** @@ -40,9 +41,7 @@ import org.apache.spark.rdd.RDD * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). * @param localProperties copy of thread-local properties set by the user on the driver side. - * @param _initialAccums initial set of accumulators to be used in this task for tracking - * internal metrics. Other accumulators will be registered later when - * they are deserialized on the executors. + * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. */ private[spark] class ResultTask[T, U]( stageId: Int, @@ -52,8 +51,8 @@ private[spark] class ResultTask[T, U]( locs: Seq[TaskLocation], val outputId: Int, localProperties: Properties, - _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll()) - extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums, localProperties) + metrics: TaskMetrics) + extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { @@ -68,7 +67,6 @@ private[spark] class ResultTask[T, U]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime - metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index b7cab7013e..84b3e5ba6c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -24,6 +24,7 @@ import scala.language.existentials import org.apache.spark._ import org.apache.spark.broadcast.Broadcast +import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.ShuffleWriter @@ -40,9 +41,7 @@ import org.apache.spark.shuffle.ShuffleWriter * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling - * @param _initialAccums initial set of accumulators to be used in this task for tracking - * internal metrics. Other accumulators will be registered later when - * they are deserialized on the executors. + * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. */ private[spark] class ShuffleMapTask( @@ -51,9 +50,9 @@ private[spark] class ShuffleMapTask( taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation], - _initialAccums: Seq[Accumulator[_]], + metrics: TaskMetrics, localProperties: Properties) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, _initialAccums, localProperties) + extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ @@ -73,7 +72,6 @@ private[spark] class ShuffleMapTask( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime - metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index b6d4e39fe5..d5cf6b82e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import scala.collection.mutable.HashSet import org.apache.spark._ +import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.util.CallSite @@ -110,9 +111,10 @@ private[scheduler] abstract class Stage( def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { + val metrics = new TaskMetrics + metrics.registerAccums(rdd.sparkContext) _latestInfo = StageInfo.fromStage( - this, nextAttemptId, Some(numPartitionsToCompute), - InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences) + this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 0fd58c41cd..58349fe250 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -19,8 +19,8 @@ package org.apache.spark.scheduler import scala.collection.mutable.HashMap -import org.apache.spark.Accumulator import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.RDDInfo /** @@ -36,7 +36,7 @@ class StageInfo( val rddInfos: Seq[RDDInfo], val parentIds: Seq[Int], val details: String, - val internalAccumulators: Seq[Accumulator[_]] = Seq.empty, + val taskMetrics: TaskMetrics = new TaskMetrics, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None @@ -81,7 +81,7 @@ private[spark] object StageInfo { stage: Stage, attemptId: Int, numTasks: Option[Int] = None, - internalAccumulators: Seq[Accumulator[_]] = Seq.empty, + taskMetrics: TaskMetrics = new TaskMetrics, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) @@ -94,7 +94,7 @@ private[spark] object StageInfo { rddInfos, stage.parents.map(_.id), stage.details, - internalAccumulators, + taskMetrics, taskLocalityPreferences) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 1ff9d7795f..9f2fa02c69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -23,7 +23,7 @@ import java.util.Properties import scala.collection.mutable.HashMap -import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl} +import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem @@ -44,17 +44,17 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to * @param partitionId index of the number in the RDD - * @param initialAccumulators initial set of accumulators to be used in this task for tracking - * internal metrics. Other accumulators will be registered later when - * they are deserialized on the executors. + * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. + * + * The default values for `metrics` and `localProperties` are used by tests only. */ private[spark] abstract class Task[T]( val stageId: Int, val stageAttemptId: Int, val partitionId: Int, - val initialAccumulators: Seq[Accumulator[_]], - @transient var localProperties: Properties) extends Serializable { + val metrics: TaskMetrics = new TaskMetrics, + @transient var localProperties: Properties = new Properties) extends Serializable { /** * Called by [[org.apache.spark.executor.Executor]] to run this task. @@ -76,7 +76,7 @@ private[spark] abstract class Task[T]( taskMemoryManager, localProperties, metricsSystem, - initialAccumulators) + metrics) TaskContext.setTaskContext(context) taskThread = Thread.currentThread() if (_killed) { @@ -128,8 +128,6 @@ private[spark] abstract class Task[T]( // Map output tracker epoch. Will be set by TaskScheduler. var epoch: Long = -1 - var metrics: Option[TaskMetrics] = None - // Task context, to be initialized in run(). @transient protected var context: TaskContextImpl = _ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index bd4797ae8e..645e2d2e36 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { // This could be empty if the JobProgressListener hasn't received information about the // stage or if the stage information has been garbage collected listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty)) + new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) } val activeStages = Buffer[StageInfo]() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 38ca3224ff..6c50c72a91 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -304,20 +304,17 @@ private[spark] object JsonProtocol { * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing. */ private[util] def accumValueToJson(name: Option[String], value: Any): JValue = { - import AccumulatorParam._ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { - (value, InternalAccumulator.getParam(name.get)) match { - case (v: Int, IntAccumulatorParam) => JInt(v) - case (v: Long, LongAccumulatorParam) => JInt(v) - case (v: String, StringAccumulatorParam) => JString(v) - case (v, UpdatedBlockStatusesAccumulatorParam) => + value match { + case v: Int => JInt(v) + case v: Long => JInt(v) + // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be + // the blocks accumulator, whose type is `Seq[(BlockId, BlockStatus)]` + case v => JArray(v.asInstanceOf[Seq[(BlockId, BlockStatus)]].toList.map { case (id, status) => ("Block ID" -> id.toString) ~ ("Status" -> blockStatusToJson(status)) }) - case (v, p) => - throw new IllegalArgumentException(s"unexpected combination of accumulator value " + - s"type (${v.getClass.getName}) and param (${p.getClass.getName}) in '${name.get}'") } } else { // For all external accumulators, just use strings @@ -569,7 +566,7 @@ private[spark] object JsonProtocol { val stageInfos = Utils.jsonOption(json \ "Stage Infos") .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse { stageIds.map { id => - new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty) + new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown") } } SparkListenerJobStart(jobId, submissionTime, stageInfos, properties) @@ -678,7 +675,7 @@ private[spark] object JsonProtocol { } val stageInfo = new StageInfo( - stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty) + stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason @@ -735,25 +732,21 @@ private[spark] object JsonProtocol { * The behavior here must match that of [[accumValueToJson]]. Exposed for testing. */ private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = { - import AccumulatorParam._ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { - (value, InternalAccumulator.getParam(name.get)) match { - case (JInt(v), IntAccumulatorParam) => v.toInt - case (JInt(v), LongAccumulatorParam) => v.toLong - case (JString(v), StringAccumulatorParam) => v - case (JArray(v), UpdatedBlockStatusesAccumulatorParam) => + value match { + case JInt(v) => v.toLong + case JArray(v) => v.map { blockJson => val id = BlockId((blockJson \ "Block ID").extract[String]) val status = blockStatusFromJson(blockJson \ "Status") (id, status) } - case (v, p) => - throw new IllegalArgumentException(s"unexpected combination of accumulator " + - s"value in JSON ($v) and accumulator param (${p.getClass.getName}) in '${name.get}'") - } - } else { - value.extract[String] - } + case _ => throw new IllegalArgumentException(s"unexpected json value $value for " + + "accumulator " + name.get) + } + } else { + value.extract[String] + } } def taskMetricsFromJson(json: JValue): TaskMetrics = { diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 37879d11ca..454c42517c 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark -import java.util.Properties import java.util.concurrent.Semaphore import javax.annotation.concurrent.GuardedBy @@ -29,6 +28,7 @@ import scala.util.control.NonFatal import org.scalatest.Matchers import org.scalatest.exceptions.TestFailedException +import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.serializer.JavaSerializer @@ -278,16 +278,13 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false) val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false) val externalAccums = Seq(acc1, acc2) - val internalAccums = InternalAccumulator.createAll() + val taskMetrics = new TaskMetrics // Set some values; these should not be observed later on the "executors" acc1.setValue(10) acc2.setValue(20L) - internalAccums - .find(_.name == Some(InternalAccumulator.TEST_ACCUM)) - .get.asInstanceOf[Accumulator[Long]] - .setValue(30L) + taskMetrics.testAccum.get.asInstanceOf[Accumulator[Long]].setValue(30L) // Simulate the task being serialized and sent to the executors. - val dummyTask = new DummyTask(internalAccums, externalAccums) + val dummyTask = new DummyTask(taskMetrics, externalAccums) val serInstance = new JavaSerializer(new SparkConf).newInstance() val taskSer = Task.serializeWithDependencies( dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance) @@ -298,7 +295,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex taskBytes, Thread.currentThread.getContextClassLoader) // Assert that executors see only zeros taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) } - taskDeser.internalAccums.foreach { a => assert(a.localValue == a.zero) } + taskDeser.metrics.internalAccums.foreach { a => assert(a.localValue == a.zero) } } } @@ -402,8 +399,7 @@ private class SaveInfoListener extends SparkListener { * A dummy [[Task]] that contains internal and external [[Accumulator]]s. */ private[spark] class DummyTask( - val internalAccums: Seq[Accumulator[_]], - val externalAccums: Seq[Accumulator[_]]) - extends Task[Int](0, 0, 0, internalAccums, new Properties) { + metrics: TaskMetrics, + val externalAccums: Seq[Accumulator[_]]) extends Task[Int](0, 0, 0, metrics) { override def runTask(c: TaskContext): Int = 1 } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ee6b991461..c130649830 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -929,7 +929,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", - Seq.empty, taskLocalityPreferences) + taskLocalityPreferences = taskLocalityPreferences) } private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index db087a9c3c..b074b95424 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -19,14 +19,13 @@ package org.apache.spark import scala.collection.mutable.ArrayBuffer +import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.storage.{BlockId, BlockStatus} class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { import InternalAccumulator._ - import AccumulatorParam._ override def afterEach(): Unit = { try { @@ -36,120 +35,12 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } } - test("get param") { - assert(getParam(EXECUTOR_DESERIALIZE_TIME) === LongAccumulatorParam) - assert(getParam(EXECUTOR_RUN_TIME) === LongAccumulatorParam) - assert(getParam(RESULT_SIZE) === LongAccumulatorParam) - assert(getParam(JVM_GC_TIME) === LongAccumulatorParam) - assert(getParam(RESULT_SERIALIZATION_TIME) === LongAccumulatorParam) - assert(getParam(MEMORY_BYTES_SPILLED) === LongAccumulatorParam) - assert(getParam(DISK_BYTES_SPILLED) === LongAccumulatorParam) - assert(getParam(PEAK_EXECUTION_MEMORY) === LongAccumulatorParam) - assert(getParam(UPDATED_BLOCK_STATUSES) === UpdatedBlockStatusesAccumulatorParam) - assert(getParam(TEST_ACCUM) === LongAccumulatorParam) - // shuffle read - assert(getParam(shuffleRead.REMOTE_BLOCKS_FETCHED) === IntAccumulatorParam) - assert(getParam(shuffleRead.LOCAL_BLOCKS_FETCHED) === IntAccumulatorParam) - assert(getParam(shuffleRead.REMOTE_BYTES_READ) === LongAccumulatorParam) - assert(getParam(shuffleRead.LOCAL_BYTES_READ) === LongAccumulatorParam) - assert(getParam(shuffleRead.FETCH_WAIT_TIME) === LongAccumulatorParam) - assert(getParam(shuffleRead.RECORDS_READ) === LongAccumulatorParam) - // shuffle write - assert(getParam(shuffleWrite.BYTES_WRITTEN) === LongAccumulatorParam) - assert(getParam(shuffleWrite.RECORDS_WRITTEN) === LongAccumulatorParam) - assert(getParam(shuffleWrite.WRITE_TIME) === LongAccumulatorParam) - // input - assert(getParam(input.RECORDS_READ) === LongAccumulatorParam) - assert(getParam(input.BYTES_READ) === LongAccumulatorParam) - // output - assert(getParam(output.RECORDS_WRITTEN) === LongAccumulatorParam) - assert(getParam(output.BYTES_WRITTEN) === LongAccumulatorParam) - // default to Long - assert(getParam(METRICS_PREFIX + "anything") === LongAccumulatorParam) - intercept[IllegalArgumentException] { - getParam("something that does not start with the right prefix") - } - } - - test("create by name") { - val executorRunTime = create(EXECUTOR_RUN_TIME) - val updatedBlockStatuses = create(UPDATED_BLOCK_STATUSES) - val shuffleRemoteBlocksRead = create(shuffleRead.REMOTE_BLOCKS_FETCHED) - assert(executorRunTime.name === Some(EXECUTOR_RUN_TIME)) - assert(updatedBlockStatuses.name === Some(UPDATED_BLOCK_STATUSES)) - assert(shuffleRemoteBlocksRead.name === Some(shuffleRead.REMOTE_BLOCKS_FETCHED)) - assert(executorRunTime.value.isInstanceOf[Long]) - assert(updatedBlockStatuses.value.isInstanceOf[Seq[_]]) - // We cannot assert the type of the value directly since the type parameter is erased. - // Instead, try casting a `Seq` of expected type and see if it fails in run time. - updatedBlockStatuses.setValueAny(Seq.empty[(BlockId, BlockStatus)]) - assert(shuffleRemoteBlocksRead.value.isInstanceOf[Int]) - // default to Long - val anything = create(METRICS_PREFIX + "anything") - assert(anything.value.isInstanceOf[Long]) - } - - test("create") { - val accums = createAll() - val shuffleReadAccums = createShuffleReadAccums() - val shuffleWriteAccums = createShuffleWriteAccums() - val inputAccums = createInputAccums() - val outputAccums = createOutputAccums() - // assert they're all internal - assert(accums.forall(_.isInternal)) - assert(shuffleReadAccums.forall(_.isInternal)) - assert(shuffleWriteAccums.forall(_.isInternal)) - assert(inputAccums.forall(_.isInternal)) - assert(outputAccums.forall(_.isInternal)) - // assert they all count on failures - assert(accums.forall(_.countFailedValues)) - assert(shuffleReadAccums.forall(_.countFailedValues)) - assert(shuffleWriteAccums.forall(_.countFailedValues)) - assert(inputAccums.forall(_.countFailedValues)) - assert(outputAccums.forall(_.countFailedValues)) - // assert they all have names - assert(accums.forall(_.name.isDefined)) - assert(shuffleReadAccums.forall(_.name.isDefined)) - assert(shuffleWriteAccums.forall(_.name.isDefined)) - assert(inputAccums.forall(_.name.isDefined)) - assert(outputAccums.forall(_.name.isDefined)) - // assert `accums` is a strict superset of the others - val accumNames = accums.map(_.name.get).toSet - val shuffleReadAccumNames = shuffleReadAccums.map(_.name.get).toSet - val shuffleWriteAccumNames = shuffleWriteAccums.map(_.name.get).toSet - val inputAccumNames = inputAccums.map(_.name.get).toSet - val outputAccumNames = outputAccums.map(_.name.get).toSet - assert(shuffleReadAccumNames.subsetOf(accumNames)) - assert(shuffleWriteAccumNames.subsetOf(accumNames)) - assert(inputAccumNames.subsetOf(accumNames)) - assert(outputAccumNames.subsetOf(accumNames)) - } - - test("naming") { - val accums = createAll() - val shuffleReadAccums = createShuffleReadAccums() - val shuffleWriteAccums = createShuffleWriteAccums() - val inputAccums = createInputAccums() - val outputAccums = createOutputAccums() - // assert that prefixes are properly namespaced - assert(SHUFFLE_READ_METRICS_PREFIX.startsWith(METRICS_PREFIX)) - assert(SHUFFLE_WRITE_METRICS_PREFIX.startsWith(METRICS_PREFIX)) - assert(INPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX)) - assert(OUTPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX)) - assert(accums.forall(_.name.get.startsWith(METRICS_PREFIX))) - // assert they all start with the expected prefixes - assert(shuffleReadAccums.forall(_.name.get.startsWith(SHUFFLE_READ_METRICS_PREFIX))) - assert(shuffleWriteAccums.forall(_.name.get.startsWith(SHUFFLE_WRITE_METRICS_PREFIX))) - assert(inputAccums.forall(_.name.get.startsWith(INPUT_METRICS_PREFIX))) - assert(outputAccums.forall(_.name.get.startsWith(OUTPUT_METRICS_PREFIX))) - } - test("internal accumulators in TaskContext") { val taskContext = TaskContext.empty() val accumUpdates = taskContext.taskMetrics.accumulatorUpdates() assert(accumUpdates.size > 0) assert(accumUpdates.forall(_.internal)) - val testAccum = taskContext.taskMetrics.getAccum(TEST_ACCUM) + val testAccum = taskContext.taskMetrics.testAccum.get assert(accumUpdates.exists(_.id == testAccum.id)) } @@ -160,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { sc.addSparkListener(listener) // Have each task add 1 to the internal accumulator val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter => - TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1 + TaskContext.get().taskMetrics().testAccum.get += 1 iter } // Register asserts in job completion callback to avoid flakiness @@ -196,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.parallelize(1 to 100, numPartitions) .map { i => (i, i) } .mapPartitions { iter => - TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1 + TaskContext.get().taskMetrics().testAccum.get += 1 iter } .reduceByKey { case (x, y) => x + y } .mapPartitions { iter => - TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 10 + TaskContext.get().taskMetrics().testAccum.get += 10 iter } .repartition(numPartitions * 2) .mapPartitions { iter => - TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 100 + TaskContext.get().taskMetrics().testAccum.get += 100 iter } // Register asserts in job completion callback to avoid flakiness @@ -236,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { // This should retry both stages in the scheduler. Note that we only want to fail the // first stage attempt because we want the stage to eventually succeed. val x = sc.parallelize(1 to 100, numPartitions) - .mapPartitions { iter => TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1; iter } + .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter } .groupBy(identity) val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId val rdd = x.mapPartitionsWithIndex { case (i, iter) => @@ -294,15 +185,15 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { } assert(Accumulators.originals.isEmpty) sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count() - val internalAccums = InternalAccumulator.createAll() + val numInternalAccums = TaskMetrics.empty.internalAccums.length // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage - assert(Accumulators.originals.size === internalAccums.size * 2) + assert(Accumulators.originals.size === numInternalAccums * 2) val accumsRegistered = sc.cleaner match { case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup case _ => Seq.empty[Long] } // Make sure the same set of accumulators is registered for cleanup - assert(accumsRegistered.size === internalAccums.size * 2) + assert(accumsRegistered.size === numInternalAccums * 2) assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet) } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 079109d137..a854f5bb9b 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -336,16 +336,14 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // first attempt -- its successful val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0, - new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.createAll(sc))) + new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem)) val data1 = (1 to 10).map { x => x -> x} // second attempt -- also successful. We'll write out different data, // just to simulate the fact that the records may get written differently // depending on what gets spilled, what gets combined, etc. val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0, - new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.createAll(sc))) + new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem)) val data2 = (11 to 20).map { x => x -> x} // interleave writes of both attempts -- we want to test that both attempts can occur @@ -373,8 +371,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC } val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, - new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.createAll(sc))) + new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem)) val readData = reader.read().toIndexedSeq assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq) diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index a263fce8ab..fbc2fae08d 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -26,102 +26,20 @@ import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId class TaskMetricsSuite extends SparkFunSuite { import AccumulatorParam._ - import InternalAccumulator._ import StorageLevel._ import TaskMetricsSuite._ - test("create with unnamed accum") { - intercept[IllegalArgumentException] { - new TaskMetrics( - InternalAccumulator.createAll() ++ Seq( - new Accumulator(0, IntAccumulatorParam, None, internal = true))) - } - } - - test("create with duplicate name accum") { - intercept[IllegalArgumentException] { - new TaskMetrics( - InternalAccumulator.createAll() ++ Seq( - new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true))) - } - } - - test("create with external accum") { - intercept[IllegalArgumentException] { - new TaskMetrics( - InternalAccumulator.createAll() ++ Seq( - new Accumulator(0, IntAccumulatorParam, Some("x")))) - } - } - - test("create shuffle read metrics") { - import shuffleRead._ - val accums = InternalAccumulator.createShuffleReadAccums() - .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]] - accums(REMOTE_BLOCKS_FETCHED).setValueAny(1) - accums(LOCAL_BLOCKS_FETCHED).setValueAny(2) - accums(REMOTE_BYTES_READ).setValueAny(3L) - accums(LOCAL_BYTES_READ).setValueAny(4L) - accums(FETCH_WAIT_TIME).setValueAny(5L) - accums(RECORDS_READ).setValueAny(6L) - val sr = new ShuffleReadMetrics(accums) - assert(sr.remoteBlocksFetched === 1) - assert(sr.localBlocksFetched === 2) - assert(sr.remoteBytesRead === 3L) - assert(sr.localBytesRead === 4L) - assert(sr.fetchWaitTime === 5L) - assert(sr.recordsRead === 6L) - } - - test("create shuffle write metrics") { - import shuffleWrite._ - val accums = InternalAccumulator.createShuffleWriteAccums() - .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]] - accums(BYTES_WRITTEN).setValueAny(1L) - accums(RECORDS_WRITTEN).setValueAny(2L) - accums(WRITE_TIME).setValueAny(3L) - val sw = new ShuffleWriteMetrics(accums) - assert(sw.bytesWritten === 1L) - assert(sw.recordsWritten === 2L) - assert(sw.writeTime === 3L) - } - - test("create input metrics") { - import input._ - val accums = InternalAccumulator.createInputAccums() - .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]] - accums(BYTES_READ).setValueAny(1L) - accums(RECORDS_READ).setValueAny(2L) - val im = new InputMetrics(accums) - assert(im.bytesRead === 1L) - assert(im.recordsRead === 2L) - } - - test("create output metrics") { - import output._ - val accums = InternalAccumulator.createOutputAccums() - .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]] - accums(BYTES_WRITTEN).setValueAny(1L) - accums(RECORDS_WRITTEN).setValueAny(2L) - val om = new OutputMetrics(accums) - assert(om.bytesWritten === 1L) - assert(om.recordsWritten === 2L) - } - test("mutating values") { - val accums = InternalAccumulator.createAll() - val tm = new TaskMetrics(accums) - // initial values - assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L) - assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 0L) - assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 0L) - assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 0L) - assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 0L) - assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 0L) - assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 0L) - assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 0L) - assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES, - Seq.empty[(BlockId, BlockStatus)]) + val tm = new TaskMetrics + assert(tm.executorDeserializeTime == 0L) + assert(tm.executorRunTime == 0L) + assert(tm.resultSize == 0L) + assert(tm.jvmGCTime == 0L) + assert(tm.resultSerializationTime == 0L) + assert(tm.memoryBytesSpilled == 0L) + assert(tm.diskBytesSpilled == 0L) + assert(tm.peakExecutionMemory == 0L) + assert(tm.updatedBlockStatuses.isEmpty) // set or increment values tm.setExecutorDeserializeTime(100L) tm.setExecutorDeserializeTime(1L) // overwrite @@ -144,36 +62,27 @@ class TaskMetricsSuite extends SparkFunSuite { tm.incUpdatedBlockStatuses(Seq(block1)) tm.incUpdatedBlockStatuses(Seq(block2)) // assert new values exist - assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 1L) - assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 2L) - assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 3L) - assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 4L) - assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 5L) - assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 606L) - assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 707L) - assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 808L) - assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES, - Seq(block1, block2)) + assert(tm.executorDeserializeTime == 1L) + assert(tm.executorRunTime == 2L) + assert(tm.resultSize == 3L) + assert(tm.jvmGCTime == 4L) + assert(tm.resultSerializationTime == 5L) + assert(tm.memoryBytesSpilled == 606L) + assert(tm.diskBytesSpilled == 707L) + assert(tm.peakExecutionMemory == 808L) + assert(tm.updatedBlockStatuses == Seq(block1, block2)) } test("mutating shuffle read metrics values") { - import shuffleRead._ - val accums = InternalAccumulator.createAll() - val tm = new TaskMetrics(accums) - def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = { - assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics), accums, name, value) - } - // create shuffle read metrics - tm.createTempShuffleReadMetrics() - tm.mergeShuffleReadMetrics() + val tm = new TaskMetrics val sr = tm.shuffleReadMetrics // initial values - assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0) - assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0) - assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 0L) - assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 0L) - assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 0L) - assertValEquals(_.recordsRead, RECORDS_READ, 0L) + assert(sr.remoteBlocksFetched == 0) + assert(sr.localBlocksFetched == 0) + assert(sr.remoteBytesRead == 0L) + assert(sr.localBytesRead == 0L) + assert(sr.fetchWaitTime == 0L) + assert(sr.recordsRead == 0L) // set and increment values sr.setRemoteBlocksFetched(100) sr.setRemoteBlocksFetched(10) @@ -200,27 +109,21 @@ class TaskMetricsSuite extends SparkFunSuite { sr.incRecordsRead(6L) sr.incRecordsRead(6L) // assert new values exist - assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 12) - assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 24) - assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 36L) - assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 48L) - assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 60L) - assertValEquals(_.recordsRead, RECORDS_READ, 72L) + assert(sr.remoteBlocksFetched == 12) + assert(sr.localBlocksFetched == 24) + assert(sr.remoteBytesRead == 36L) + assert(sr.localBytesRead == 48L) + assert(sr.fetchWaitTime == 60L) + assert(sr.recordsRead == 72L) } test("mutating shuffle write metrics values") { - import shuffleWrite._ - val accums = InternalAccumulator.createAll() - val tm = new TaskMetrics(accums) - def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = { - assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics), accums, name, value) - } - // create shuffle write metrics + val tm = new TaskMetrics val sw = tm.shuffleWriteMetrics // initial values - assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L) - assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L) - assertValEquals(_.writeTime, WRITE_TIME, 0L) + assert(sw.bytesWritten == 0L) + assert(sw.recordsWritten == 0L) + assert(sw.writeTime == 0L) // increment and decrement values sw.incBytesWritten(100L) sw.incBytesWritten(10L) // 100 + 10 @@ -233,55 +136,41 @@ class TaskMetricsSuite extends SparkFunSuite { sw.incWriteTime(300L) sw.incWriteTime(30L) // assert new values exist - assertValEquals(_.bytesWritten, BYTES_WRITTEN, 108L) - assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 216L) - assertValEquals(_.writeTime, WRITE_TIME, 330L) + assert(sw.bytesWritten == 108L) + assert(sw.recordsWritten == 216L) + assert(sw.writeTime == 330L) } test("mutating input metrics values") { - import input._ - val accums = InternalAccumulator.createAll() - val tm = new TaskMetrics(accums) - def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = { - assertValueEquals(tm, tm => tmValue(tm.inputMetrics), accums, name, value, - (x: Any, y: Any) => assert(x.toString === y.toString)) - } - // create input metrics + val tm = new TaskMetrics val in = tm.inputMetrics // initial values - assertValEquals(_.bytesRead, BYTES_READ, 0L) - assertValEquals(_.recordsRead, RECORDS_READ, 0L) + assert(in.bytesRead == 0L) + assert(in.recordsRead == 0L) // set and increment values in.setBytesRead(1L) in.setBytesRead(2L) in.incRecordsRead(1L) in.incRecordsRead(2L) // assert new values exist - assertValEquals(_.bytesRead, BYTES_READ, 2L) - assertValEquals(_.recordsRead, RECORDS_READ, 3L) + assert(in.bytesRead == 2L) + assert(in.recordsRead == 3L) } test("mutating output metrics values") { - import output._ - val accums = InternalAccumulator.createAll() - val tm = new TaskMetrics(accums) - def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = { - assertValueEquals(tm, tm => tmValue(tm.outputMetrics), accums, name, value, - (x: Any, y: Any) => assert(x.toString === y.toString)) - } - // create input metrics + val tm = new TaskMetrics val out = tm.outputMetrics // initial values - assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L) - assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L) + assert(out.bytesWritten == 0L) + assert(out.recordsWritten == 0L) // set values out.setBytesWritten(1L) out.setBytesWritten(2L) out.setRecordsWritten(3L) out.setRecordsWritten(4L) // assert new values exist - assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L) - assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L) + assert(out.bytesWritten == 2L) + assert(out.recordsWritten == 4L) } test("merging multiple shuffle read metrics") { @@ -305,9 +194,7 @@ class TaskMetricsSuite extends SparkFunSuite { } test("additional accumulables") { - val internalAccums = InternalAccumulator.createAll() - val tm = new TaskMetrics(internalAccums) - assert(tm.accumulatorUpdates().size === internalAccums.size) + val tm = new TaskMetrics val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a")) val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b")) val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c")) @@ -338,47 +225,11 @@ class TaskMetricsSuite extends SparkFunSuite { assert(newUpdates(acc4.id).countFailedValues) assert(newUpdates.values.map(_.update).forall(_.isDefined)) assert(newUpdates.values.map(_.value).forall(_.isEmpty)) - assert(newUpdates.size === internalAccums.size + 4) - } - - test("existing values in shuffle read accums") { - // set shuffle read accum before passing it into TaskMetrics - val accums = InternalAccumulator.createAll() - val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME)) - assert(srAccum.isDefined) - srAccum.get.asInstanceOf[Accumulator[Long]] += 10L - val tm = new TaskMetrics(accums) - } - - test("existing values in shuffle write accums") { - // set shuffle write accum before passing it into TaskMetrics - val accums = InternalAccumulator.createAll() - val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN)) - assert(swAccum.isDefined) - swAccum.get.asInstanceOf[Accumulator[Long]] += 10L - val tm = new TaskMetrics(accums) - } - - test("existing values in input accums") { - // set input accum before passing it into TaskMetrics - val accums = InternalAccumulator.createAll() - val inAccum = accums.find(_.name === Some(input.RECORDS_READ)) - assert(inAccum.isDefined) - inAccum.get.asInstanceOf[Accumulator[Long]] += 10L - val tm = new TaskMetrics(accums) - } - - test("existing values in output accums") { - // set output accum before passing it into TaskMetrics - val accums = InternalAccumulator.createAll() - val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN)) - assert(outAccum.isDefined) - outAccum.get.asInstanceOf[Accumulator[Long]] += 10L - val tm4 = new TaskMetrics(accums) + assert(newUpdates.size === tm.internalAccums.size + 4) } test("from accumulator updates") { - val accumUpdates1 = InternalAccumulator.createAll().map { a => + val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a => AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues) } val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1) @@ -413,29 +264,6 @@ class TaskMetricsSuite extends SparkFunSuite { private[spark] object TaskMetricsSuite extends Assertions { /** - * Assert that the following three things are equal to `value`: - * (1) TaskMetrics value - * (2) TaskMetrics accumulator update value - * (3) Original accumulator value - */ - def assertValueEquals( - tm: TaskMetrics, - tmValue: TaskMetrics => Any, - accums: Seq[Accumulator[_]], - metricName: String, - value: Any, - assertEquals: (Any, Any) => Unit = (x: Any, y: Any) => assert(x === y)): Unit = { - assertEquals(tmValue(tm), value) - val accum = accums.find(_.name == Some(metricName)) - assert(accum.isDefined) - assertEquals(accum.get.value, value) - val accumUpdate = tm.accumulatorUpdates().find(_.name == Some(metricName)) - assert(accumUpdate.isDefined) - assert(accumUpdate.get.value === None) - assertEquals(accumUpdate.get.update, Some(value)) - } - - /** * Assert that two lists of accumulator updates are equal. * Note: this does NOT check accumulator ID equality. */ @@ -458,5 +286,4 @@ private[spark] object TaskMetricsSuite extends Assertions { * info as an accumulator update. */ def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) - } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index fd96fb04f8..b76c0a4bd1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // SPARK-9809 -- this stage is submitted without a task for each partition (because some of // the shuffle map output is still available from stage 0); make sure we've still got internal // accumulators setup - assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty) + assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics != null) completeShuffleMapStageSuccessfully(2, 0, 2) completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) assert(results === Map(0 -> 1234, 1 -> 1235)) @@ -2010,7 +2010,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo], taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = { val accumUpdates = reason match { - case Success => task.initialAccumulators.map { a => a.toInfo(Some(a.zero), None) } + case Success => task.metrics.accumulatorUpdates() case ef: ExceptionFailure => ef.accumUpdates case _ => Seq.empty[AccumulableInfo] } diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index e3e6df6831..4fe705b201 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -17,14 +17,11 @@ package org.apache.spark.scheduler -import java.util.Properties - import org.apache.spark.TaskContext class FakeTask( stageId: Int, - prefLocs: Seq[TaskLocation] = Nil) - extends Task[Int](stageId, 0, 0, Seq.empty, new Properties) { + prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 0) { override def runTask(context: TaskContext): Int = 0 override def preferredLocations: Seq[TaskLocation] = prefLocs } diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala index 76a7087645..255be6f46b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala @@ -18,7 +18,6 @@ package org.apache.spark.scheduler import java.io.{IOException, ObjectInputStream, ObjectOutputStream} -import java.util.Properties import org.apache.spark.TaskContext @@ -26,7 +25,7 @@ import org.apache.spark.TaskContext * A Task implementation that fails to serialize. */ private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) - extends Task[Array[Byte]](stageId, 0, 0, Seq.empty, new Properties) { + extends Task[Array[Byte]](stageId, 0, 0) { override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte] override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 86911d2211..bda4c996b2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -24,7 +24,7 @@ import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter import org.apache.spark._ -import org.apache.spark.executor.{Executor, TaskMetricsSuite} +import org.apache.spark.executor.{Executor, TaskMetrics, TaskMetricsSuite} import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.JvmSource import org.apache.spark.network.util.JavaUtils @@ -62,7 +62,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) val task = new ResultTask[String, String]( - 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties) + 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics) intercept[RuntimeException] { task.run(0, 0, null) } @@ -83,7 +83,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) val task = new ResultTask[String, String]( - 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties) + 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics) intercept[RuntimeException] { task.run(0, 0, null) } @@ -171,26 +171,27 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val param = AccumulatorParam.LongAccumulatorParam val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true) val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false) - val initialAccums = InternalAccumulator.createAll() // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. - val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]], new Properties) { + val taskMetrics = new TaskMetrics + val task = new Task[Int](0, 0, 0) { context = new TaskContextImpl(0, 0, 0L, 0, new TaskMemoryManager(SparkEnv.get.memoryManager, 0L), new Properties, SparkEnv.get.metricsSystem, - initialAccums) - context.taskMetrics.registerAccumulator(acc1) - context.taskMetrics.registerAccumulator(acc2) + taskMetrics) + taskMetrics.registerAccumulator(acc1) + taskMetrics.registerAccumulator(acc2) override def runTask(tc: TaskContext): Int = 0 } // First, simulate task success. This should give us all the accumulators. val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false) - val accumUpdates2 = (initialAccums ++ Seq(acc1, acc2)).map(TaskMetricsSuite.makeInfo) + val accumUpdates2 = (taskMetrics.internalAccums ++ Seq(acc1, acc2)) + .map(TaskMetricsSuite.makeInfo) TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2) // Now, simulate task failures. This should give us only the accums that count failed values. val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true) - val accumUpdates4 = (initialAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo) + val accumUpdates4 = (taskMetrics.internalAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo) TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ade8e84d84..ecf4b76da5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,9 +17,8 @@ package org.apache.spark.scheduler -import java.util.{Properties, Random} +import java.util.Random -import scala.collection.Map import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -138,7 +137,8 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex /** * A Task implementation that results in a large serialized task. */ -class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0, Seq.empty, new Properties) { +class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) { + val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) val random = new Random(0) random.nextBytes(randomBuffer) @@ -166,7 +166,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) - val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a => a.toInfo(Some(0L), None) } + val accumUpdates = + taskSet.tasks.head.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) } // Offer a host with NO_PREF as the constraint, // we should get a nopref task immediately since that's what we only have @@ -185,7 +186,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSet = FakeTask.createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task => - task.initialAccumulators.map { a => a.toInfo(Some(0L), None) } + task.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) } } // First three offers should all find tasks diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala index 88817dccf3..d223af1496 100644 --- a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala @@ -38,7 +38,7 @@ class AllStagesResourceSuite extends SparkFunSuite { stageUiData.taskData = tasks val status = StageStatus.ACTIVE val stageInfo = new StageInfo( - 1, 1, "stage 1", 10, Seq.empty, Seq.empty, "details abc", Seq.empty) + 1, 1, "stage 1", 10, Seq.empty, Seq.empty, "details abc") val stageData = AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, false) stageData.firstTaskLaunchedTime diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 85c877e3dd..221124829f 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -269,9 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val execId = "exe-1" def makeTaskMetrics(base: Int): TaskMetrics = { - val accums = InternalAccumulator.createAll() - accums.foreach(Accumulators.register) - val taskMetrics = new TaskMetrics(accums) + val taskMetrics = new TaskMetrics val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics val inputMetrics = taskMetrics.inputMetrics |