aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-19 21:20:24 -0700
committerReynold Xin <rxin@databricks.com>2016-04-19 21:20:24 -0700
commit85d759ca3aebb7d60b963207dcada83c75502e52 (patch)
treeb4fbaba9dfce1ae47485f318123881f42cd05e6c
parent78b38109ed2fc20e97f9a968185d0c02ef83aa42 (diff)
downloadspark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.gz
spark-85d759ca3aebb7d60b963207dcada83c75502e52.tar.bz2
spark-85d759ca3aebb7d60b963207dcada83c75502e52.zip
[SPARK-14704][CORE] create accumulators in TaskMetrics
## What changes were proposed in this pull request? Before this PR, we create accumulators at driver side(and register them) and send them to executor side, then we create `TaskMetrics` with these accumulators at executor side. After this PR, we will create `TaskMetrics` at driver side and send it to executor side, so that we can create accumulators inside `TaskMetrics` directly, which is cleaner. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12472 from cloud-fan/acc.
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala127
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala175
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala41
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala129
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala277
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala4
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala4
31 files changed, 271 insertions, 775 deletions
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 714c8737a9..0b494c146f 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -17,17 +17,11 @@
package org.apache.spark
-import org.apache.spark.storage.{BlockId, BlockStatus}
-
-
/**
* A collection of fields and methods concerned with internal accumulators that represent
* task level metrics.
*/
private[spark] object InternalAccumulator {
-
- import AccumulatorParam._
-
// Prefixes used in names of internal task level metrics
val METRICS_PREFIX = "internal.metrics."
val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
@@ -79,125 +73,4 @@ private[spark] object InternalAccumulator {
}
// scalastyle:on
-
- /**
- * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]].
- */
- def create(name: String): Accumulator[_] = {
- require(name.startsWith(METRICS_PREFIX),
- s"internal accumulator name must start with '$METRICS_PREFIX': $name")
- getParam(name) match {
- case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
- case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
- case p @ StringAccumulatorParam => newMetric[String]("", name, p)
- case p @ UpdatedBlockStatusesAccumulatorParam =>
- newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
- case p => throw new IllegalArgumentException(
- s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.")
- }
- }
-
- /**
- * Get the [[AccumulatorParam]] associated with the internal metric name,
- * which must begin with [[METRICS_PREFIX]].
- */
- def getParam(name: String): AccumulatorParam[_] = {
- require(name.startsWith(METRICS_PREFIX),
- s"internal accumulator name must start with '$METRICS_PREFIX': $name")
- name match {
- case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
- case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
- case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
- case _ => LongAccumulatorParam
- }
- }
-
- /**
- * Accumulators for tracking internal metrics.
- */
- def createAll(): Seq[Accumulator[_]] = {
- Seq[String](
- EXECUTOR_DESERIALIZE_TIME,
- EXECUTOR_RUN_TIME,
- RESULT_SIZE,
- JVM_GC_TIME,
- RESULT_SERIALIZATION_TIME,
- MEMORY_BYTES_SPILLED,
- DISK_BYTES_SPILLED,
- PEAK_EXECUTION_MEMORY,
- UPDATED_BLOCK_STATUSES).map(create) ++
- createShuffleReadAccums() ++
- createShuffleWriteAccums() ++
- createInputAccums() ++
- createOutputAccums() ++
- sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq
- }
-
- /**
- * Accumulators for tracking shuffle read metrics.
- */
- def createShuffleReadAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- shuffleRead.REMOTE_BLOCKS_FETCHED,
- shuffleRead.LOCAL_BLOCKS_FETCHED,
- shuffleRead.REMOTE_BYTES_READ,
- shuffleRead.LOCAL_BYTES_READ,
- shuffleRead.FETCH_WAIT_TIME,
- shuffleRead.RECORDS_READ).map(create)
- }
-
- /**
- * Accumulators for tracking shuffle write metrics.
- */
- def createShuffleWriteAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- shuffleWrite.BYTES_WRITTEN,
- shuffleWrite.RECORDS_WRITTEN,
- shuffleWrite.WRITE_TIME).map(create)
- }
-
- /**
- * Accumulators for tracking input metrics.
- */
- def createInputAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- input.BYTES_READ,
- input.RECORDS_READ).map(create)
- }
-
- /**
- * Accumulators for tracking output metrics.
- */
- def createOutputAccums(): Seq[Accumulator[_]] = {
- Seq[String](
- output.BYTES_WRITTEN,
- output.RECORDS_WRITTEN).map(create)
- }
-
- /**
- * Accumulators for tracking internal metrics.
- *
- * These accumulators are created with the stage such that all tasks in the stage will
- * add to the same set of accumulators. We do this to report the distribution of accumulator
- * values across all tasks within each stage.
- */
- def createAll(sc: SparkContext): Seq[Accumulator[_]] = {
- val accums = createAll()
- accums.foreach { accum =>
- Accumulators.register(accum)
- sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
- }
- accums
- }
-
- /**
- * Create a new accumulator representing an internal task metric.
- */
- private def newMetric[T](
- initialValue: T,
- name: String,
- param: AccumulatorParam[T]): Accumulator[T] = {
- new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
- }
-
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 757c1b5116..e7940bd9ed 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -62,12 +62,11 @@ object TaskContext {
protected[spark] def unset(): Unit = taskContext.remove()
/**
- * An empty task context that does not represent an actual task.
+ * An empty task context that does not represent an actual task. This is only used in tests.
*/
private[spark] def empty(): TaskContextImpl = {
new TaskContextImpl(0, 0, 0, 0, null, new Properties, null)
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index fa0b2d3d28..e8f83c6d14 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -36,15 +36,10 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
- initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.createAll())
+ override val taskMetrics: TaskMetrics = new TaskMetrics)
extends TaskContext
with Logging {
- /**
- * Metrics associated with this task.
- */
- override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators)
-
/** List of callback functions to execute when the task completes. */
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b20bd11f7d..650f05c309 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -293,16 +293,14 @@ private[spark] class Executor(
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
- for (m <- task.metrics) {
- // Deserialization happens in two parts: first, we deserialize a Task object, which
- // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
- m.setExecutorDeserializeTime(
- (taskStart - deserializeStartTime) + task.executorDeserializeTime)
- // We need to subtract Task.run()'s deserialization time to avoid double-counting
- m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
- m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- m.setResultSerializationTime(afterSerialization - beforeSerialization)
- }
+ // Deserialization happens in two parts: first, we deserialize a Task object, which
+ // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
+ task.metrics.setExecutorDeserializeTime(
+ (taskStart - deserializeStartTime) + task.executorDeserializeTime)
+ // We need to subtract Task.run()'s deserialization time to avoid double-counting
+ task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
+ task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
+ task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
@@ -357,10 +355,8 @@ private[spark] class Executor(
// Collect latest accumulator values to report back to the driver
val accumulatorUpdates: Seq[AccumulableInfo] =
if (task != null) {
- task.metrics.foreach { m =>
- m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
- m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- }
+ task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
+ task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.collectAccumulatorUpdates(taskFailed = true)
} else {
Seq.empty[AccumulableInfo]
@@ -485,11 +481,9 @@ private[spark] class Executor(
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
- taskRunner.task.metrics.foreach { metrics =>
- metrics.mergeShuffleReadMetrics()
- metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
- accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates()))
- }
+ taskRunner.task.metrics.mergeShuffleReadMetrics()
+ taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
+ accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulatorUpdates()))
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 0ec81d8d35..535352e7dd 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -39,14 +39,11 @@ object DataReadMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about reading data from external systems.
*/
@DeveloperApi
-class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumulator[Long])
- extends Serializable {
+class InputMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ))
- }
+ private[executor] val _bytesRead = TaskMetrics.createLongAccum(input.BYTES_READ)
+ private[executor] val _recordsRead = TaskMetrics.createLongAccum(input.RECORDS_READ)
/**
* Total number of bytes read.
@@ -61,5 +58,4 @@ class InputMetrics private (_bytesRead: Accumulator[Long], _recordsRead: Accumul
private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
-
}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 5b36cc4739..586c98b156 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -38,14 +38,11 @@ object DataWriteMethod extends Enumeration with Serializable {
* A collection of accumulators that represents metrics about writing data to external systems.
*/
@DeveloperApi
-class OutputMetrics private (_bytesWritten: Accumulator[Long], _recordsWritten: Accumulator[Long])
- extends Serializable {
-
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN))
- }
+class OutputMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
+
+ private[executor] val _bytesWritten = TaskMetrics.createLongAccum(output.BYTES_WRITTEN)
+ private[executor] val _recordsWritten = TaskMetrics.createLongAccum(output.RECORDS_WRITTEN)
/**
* Total number of bytes written.
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 47cfb74b9e..8e9a332b7c 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -27,38 +27,21 @@ import org.apache.spark.annotation.DeveloperApi
* Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleReadMetrics private (
- _remoteBlocksFetched: Accumulator[Int],
- _localBlocksFetched: Accumulator[Int],
- _remoteBytesRead: Accumulator[Long],
- _localBytesRead: Accumulator[Long],
- _fetchWaitTime: Accumulator[Long],
- _recordsRead: Accumulator[Long])
- extends Serializable {
-
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED),
- TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ))
- }
-
- /**
- * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task.
- *
- * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in
- * many places only to merge their values together later. In the future, we should revisit
- * whether this is needed.
- *
- * A better alternative is [[TaskMetrics.createTempShuffleReadMetrics]] followed by
- * [[TaskMetrics.mergeShuffleReadMetrics]].
- */
- private[spark] def this() {
- this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap)
- }
+class ShuffleReadMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
+
+ private[executor] val _remoteBlocksFetched =
+ TaskMetrics.createIntAccum(shuffleRead.REMOTE_BLOCKS_FETCHED)
+ private[executor] val _localBlocksFetched =
+ TaskMetrics.createIntAccum(shuffleRead.LOCAL_BLOCKS_FETCHED)
+ private[executor] val _remoteBytesRead =
+ TaskMetrics.createLongAccum(shuffleRead.REMOTE_BYTES_READ)
+ private[executor] val _localBytesRead =
+ TaskMetrics.createLongAccum(shuffleRead.LOCAL_BYTES_READ)
+ private[executor] val _fetchWaitTime =
+ TaskMetrics.createLongAccum(shuffleRead.FETCH_WAIT_TIME)
+ private[executor] val _recordsRead =
+ TaskMetrics.createLongAccum(shuffleRead.RECORDS_READ)
/**
* Number of remote blocks fetched in this shuffle by this task.
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 704dee747e..7326fba841 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import org.apache.spark.{Accumulator, InternalAccumulator}
+import org.apache.spark.InternalAccumulator
import org.apache.spark.annotation.DeveloperApi
@@ -27,31 +27,15 @@ import org.apache.spark.annotation.DeveloperApi
* Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleWriteMetrics private (
- _bytesWritten: Accumulator[Long],
- _recordsWritten: Accumulator[Long],
- _writeTime: Accumulator[Long])
- extends Serializable {
+class ShuffleWriteMetrics private[spark] () extends Serializable {
+ import InternalAccumulator._
- private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
- this(
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN),
- TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME))
- }
-
- /**
- * Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task.
- *
- * This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in
- * many places only to merge their values together later. In the future, we should revisit
- * whether this is needed.
- *
- * A better alternative is [[TaskMetrics.shuffleWriteMetrics]].
- */
- private[spark] def this() {
- this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap)
- }
+ private[executor] val _bytesWritten =
+ TaskMetrics.createLongAccum(shuffleWrite.BYTES_WRITTEN)
+ private[executor] val _recordsWritten =
+ TaskMetrics.createLongAccum(shuffleWrite.RECORDS_WRITTEN)
+ private[executor] val _writeTime =
+ TaskMetrics.createLongAccum(shuffleWrite.WRITE_TIME)
/**
* Number of bytes written for the shuffle by this task.
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 0198364825..4558fbb4d9 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,10 +17,10 @@
package org.apache.spark.executor
-import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
+import org.apache.spark.AccumulatorParam.{IntAccumulatorParam, LongAccumulatorParam, UpdatedBlockStatusesAccumulatorParam}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
@@ -39,65 +39,21 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* The accumulator updates are also sent to the driver periodically (on executor heartbeat)
* and when the task failed with an exception. The [[TaskMetrics]] object itself should never
* be sent to the driver.
- *
- * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on.
- * Each accumulator in this initial set must be uniquely named and marked
- * as internal. Additional accumulators registered later need not satisfy
- * these requirements.
*/
@DeveloperApi
-class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable {
+class TaskMetrics private[spark] () extends Serializable {
import InternalAccumulator._
- // Needed for Java tests
- def this() {
- this(InternalAccumulator.createAll())
- }
-
- /**
- * All accumulators registered with this task.
- */
- private val accums = new ArrayBuffer[Accumulable[_, _]]
- accums ++= initialAccums
-
- /**
- * A map for quickly accessing the initial set of accumulators by name.
- */
- private val initialAccumsMap: Map[String, Accumulator[_]] = {
- val map = new mutable.HashMap[String, Accumulator[_]]
- initialAccums.foreach { a =>
- val name = a.name.getOrElse {
- throw new IllegalArgumentException(
- "initial accumulators passed to TaskMetrics must be named")
- }
- require(a.isInternal,
- s"initial accumulator '$name' passed to TaskMetrics must be marked as internal")
- require(!map.contains(name),
- s"detected duplicate accumulator name '$name' when constructing TaskMetrics")
- map(name) = a
- }
- map.toMap
- }
-
// Each metric is internally represented as an accumulator
- private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME)
- private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME)
- private val _resultSize = getAccum(RESULT_SIZE)
- private val _jvmGCTime = getAccum(JVM_GC_TIME)
- private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME)
- private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED)
- private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED)
- private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY)
- private val _updatedBlockStatuses =
- TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES)
-
- private val _inputMetrics = new InputMetrics(initialAccumsMap)
-
- private val _outputMetrics = new OutputMetrics(initialAccumsMap)
-
- private val _shuffleReadMetrics = new ShuffleReadMetrics(initialAccumsMap)
-
- private val _shuffleWriteMetrics = new ShuffleWriteMetrics(initialAccumsMap)
+ private val _executorDeserializeTime = TaskMetrics.createLongAccum(EXECUTOR_DESERIALIZE_TIME)
+ private val _executorRunTime = TaskMetrics.createLongAccum(EXECUTOR_RUN_TIME)
+ private val _resultSize = TaskMetrics.createLongAccum(RESULT_SIZE)
+ private val _jvmGCTime = TaskMetrics.createLongAccum(JVM_GC_TIME)
+ private val _resultSerializationTime = TaskMetrics.createLongAccum(RESULT_SERIALIZATION_TIME)
+ private val _memoryBytesSpilled = TaskMetrics.createLongAccum(MEMORY_BYTES_SPILLED)
+ private val _diskBytesSpilled = TaskMetrics.createLongAccum(DISK_BYTES_SPILLED)
+ private val _peakExecutionMemory = TaskMetrics.createLongAccum(PEAK_EXECUTION_MEMORY)
+ private val _updatedBlockStatuses = TaskMetrics.createBlocksAccum(UPDATED_BLOCK_STATUSES)
/**
* Time taken on the executor to deserialize this task.
@@ -164,30 +120,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
_updatedBlockStatuses.setValue(v)
/**
- * Get a Long accumulator from the given map by name, assuming it exists.
- * Note: this only searches the initial set of accumulators passed into the constructor.
- */
- private[spark] def getAccum(name: String): Accumulator[Long] = {
- TaskMetrics.getAccum[Long](initialAccumsMap, name)
- }
-
- /**
* Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted
* data, defined only in tasks with input.
*/
- def inputMetrics: InputMetrics = _inputMetrics
+ val inputMetrics: InputMetrics = new InputMetrics()
/**
* Metrics related to writing data externally (e.g. to a distributed filesystem),
* defined only in tasks with output.
*/
- def outputMetrics: OutputMetrics = _outputMetrics
+ val outputMetrics: OutputMetrics = new OutputMetrics()
/**
* Metrics related to shuffle read aggregated across all shuffle dependencies.
* This is defined only if there are shuffle dependencies in this task.
*/
- def shuffleReadMetrics: ShuffleReadMetrics = _shuffleReadMetrics
+ val shuffleReadMetrics: ShuffleReadMetrics = new ShuffleReadMetrics()
+
+ /**
+ * Metrics related to shuffle write, defined only in shuffle map stages.
+ */
+ val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics()
/**
* Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
@@ -217,21 +170,45 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
*/
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
- _shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics)
+ shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics)
}
}
- /**
- * Metrics related to shuffle write, defined only in shuffle map stages.
- */
- def shuffleWriteMetrics: ShuffleWriteMetrics = _shuffleWriteMetrics
+ // Only used for test
+ private[spark] val testAccum =
+ sys.props.get("spark.testing").map(_ => TaskMetrics.createLongAccum(TEST_ACCUM))
+
+ @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, _]] = {
+ val in = inputMetrics
+ val out = outputMetrics
+ val sr = shuffleReadMetrics
+ val sw = shuffleWriteMetrics
+ Seq(_executorDeserializeTime, _executorRunTime, _resultSize, _jvmGCTime,
+ _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, _peakExecutionMemory,
+ _updatedBlockStatuses, sr._remoteBlocksFetched, sr._localBlocksFetched, sr._remoteBytesRead,
+ sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, sw._bytesWritten, sw._recordsWritten,
+ sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, out._recordsWritten) ++
+ testAccum
+ }
/* ========================== *
| OTHER THINGS |
* ========================== */
+ private[spark] def registerAccums(sc: SparkContext): Unit = {
+ internalAccums.foreach { accum =>
+ Accumulators.register(accum)
+ sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
+ }
+ }
+
+ /**
+ * External accumulators registered with this task.
+ */
+ @transient private lazy val externalAccums = new ArrayBuffer[Accumulable[_, _]]
+
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
- accums += a
+ externalAccums += a
}
/**
@@ -242,7 +219,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
* not the aggregated value across multiple tasks.
*/
def accumulatorUpdates(): Seq[AccumulableInfo] = {
- accums.map { a => a.toInfo(Some(a.localValue), None) }
+ (internalAccums ++ externalAccums).map { a => a.toInfo(Some(a.localValue), None) }
}
}
@@ -256,9 +233,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
* UnsupportedOperationException, we choose not to do so because the overrides would quickly become
* out-of-date when new metrics are added.
*/
-private[spark] class ListenerTaskMetrics(
- initialAccums: Seq[Accumulator[_]],
- accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) {
+private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics {
override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
@@ -272,18 +247,25 @@ private[spark] object TaskMetrics extends Logging {
def empty: TaskMetrics = new TaskMetrics
/**
- * Get an accumulator from the given map by name, assuming it exists.
+ * Create a new accumulator representing an internal task metric.
*/
- def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = {
- require(accumMap.contains(name), s"metric '$name' is missing")
- val accum = accumMap(name)
- try {
- // Note: we can't do pattern matching here because types are erased by compile time
- accum.asInstanceOf[Accumulator[T]]
- } catch {
- case e: ClassCastException =>
- throw new SparkException(s"accumulator $name was of unexpected type", e)
- }
+ private def newMetric[T](
+ initialValue: T,
+ name: String,
+ param: AccumulatorParam[T]): Accumulator[T] = {
+ new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
+ }
+
+ def createLongAccum(name: String): Accumulator[Long] = {
+ newMetric(0L, name, LongAccumulatorParam)
+ }
+
+ def createIntAccum(name: String): Accumulator[Int] = {
+ newMetric(0, name, IntAccumulatorParam)
+ }
+
+ def createBlocksAccum(name: String): Accumulator[Seq[(BlockId, BlockStatus)]] = {
+ newMetric(Nil, name, UpdatedBlockStatusesAccumulatorParam)
}
/**
@@ -297,18 +279,11 @@ private[spark] object TaskMetrics extends Logging {
* internal task level metrics.
*/
def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
- // Initial accumulators are passed into the TaskMetrics constructor first because these
- // are required to be uniquely named. The rest of the accumulators from this task are
- // registered later because they need not satisfy this requirement.
- val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined }
- val initialAccums = definedAccumUpdates
- .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
- .map { info =>
- val accum = InternalAccumulator.create(info.name.get)
- accum.setValueAny(info.update.get)
- accum
- }
- new ListenerTaskMetrics(initialAccums, definedAccumUpdates)
+ val definedAccumUpdates = accumUpdates.filter(_.update.isDefined)
+ val metrics = new ListenerTaskMetrics(definedAccumUpdates)
+ definedAccumUpdates.filter(_.internal).foreach { accum =>
+ metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get))
+ }
+ metrics
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c27aad268d..b7fb608ea5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1029,7 +1029,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, stage.latestInfo.internalAccumulators, properties)
+ taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
@@ -1039,7 +1039,7 @@ class DAGScheduler(
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
- taskBinary, part, locs, id, properties, stage.latestInfo.internalAccumulators)
+ taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index db6276f75d..75c6018e21 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -23,6 +23,7 @@ import java.util.Properties
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rdd.RDD
/**
@@ -40,9 +41,7 @@ import org.apache.spark.rdd.RDD
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
* @param localProperties copy of thread-local properties set by the user on the driver side.
- * @param _initialAccums initial set of accumulators to be used in this task for tracking
- * internal metrics. Other accumulators will be registered later when
- * they are deserialized on the executors.
+ * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
*/
private[spark] class ResultTask[T, U](
stageId: Int,
@@ -52,8 +51,8 @@ private[spark] class ResultTask[T, U](
locs: Seq[TaskLocation],
val outputId: Int,
localProperties: Properties,
- _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.createAll())
- extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums, localProperties)
+ metrics: TaskMetrics)
+ extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
@@ -68,7 +67,6 @@ private[spark] class ResultTask[T, U](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
- metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index b7cab7013e..84b3e5ba6c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -24,6 +24,7 @@ import scala.language.existentials
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter
@@ -40,9 +41,7 @@ import org.apache.spark.shuffle.ShuffleWriter
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
- * @param _initialAccums initial set of accumulators to be used in this task for tracking
- * internal metrics. Other accumulators will be registered later when
- * they are deserialized on the executors.
+ * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
*/
private[spark] class ShuffleMapTask(
@@ -51,9 +50,9 @@ private[spark] class ShuffleMapTask(
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
- _initialAccums: Seq[Accumulator[_]],
+ metrics: TaskMetrics,
localProperties: Properties)
- extends Task[MapStatus](stageId, stageAttemptId, partition.index, _initialAccums, localProperties)
+ extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties)
with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
@@ -73,7 +72,6 @@ private[spark] class ShuffleMapTask(
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
- metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index b6d4e39fe5..d5cf6b82e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import scala.collection.mutable.HashSet
import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.util.CallSite
@@ -110,9 +111,10 @@ private[scheduler] abstract class Stage(
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
+ val metrics = new TaskMetrics
+ metrics.registerAccums(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
- this, nextAttemptId, Some(numPartitionsToCompute),
- InternalAccumulator.createAll(rdd.sparkContext), taskLocalityPreferences)
+ this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 0fd58c41cd..58349fe250 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -19,8 +19,8 @@ package org.apache.spark.scheduler
import scala.collection.mutable.HashMap
-import org.apache.spark.Accumulator
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.RDDInfo
/**
@@ -36,7 +36,7 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
- val internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
+ val taskMetrics: TaskMetrics = new TaskMetrics,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -81,7 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
- internalAccumulators: Seq[Accumulator[_]] = Seq.empty,
+ taskMetrics: TaskMetrics = new TaskMetrics,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
@@ -94,7 +94,7 @@ private[spark] object StageInfo {
rddInfos,
stage.parents.map(_.id),
stage.details,
- internalAccumulators,
+ taskMetrics,
taskLocalityPreferences)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1ff9d7795f..9f2fa02c69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -23,7 +23,7 @@ import java.util.Properties
import scala.collection.mutable.HashMap
-import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl}
+import org.apache.spark.{SparkEnv, TaskContext, TaskContextImpl}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
@@ -44,17 +44,17 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
* @param stageId id of the stage this task belongs to
* @param stageAttemptId attempt id of the stage this task belongs to
* @param partitionId index of the number in the RDD
- * @param initialAccumulators initial set of accumulators to be used in this task for tracking
- * internal metrics. Other accumulators will be registered later when
- * they are deserialized on the executors.
+ * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
+ *
+ * The default values for `metrics` and `localProperties` are used by tests only.
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
- val initialAccumulators: Seq[Accumulator[_]],
- @transient var localProperties: Properties) extends Serializable {
+ val metrics: TaskMetrics = new TaskMetrics,
+ @transient var localProperties: Properties = new Properties) extends Serializable {
/**
* Called by [[org.apache.spark.executor.Executor]] to run this task.
@@ -76,7 +76,7 @@ private[spark] abstract class Task[T](
taskMemoryManager,
localProperties,
metricsSystem,
- initialAccumulators)
+ metrics)
TaskContext.setTaskContext(context)
taskThread = Thread.currentThread()
if (_killed) {
@@ -128,8 +128,6 @@ private[spark] abstract class Task[T](
// Map output tracker epoch. Will be set by TaskScheduler.
var epoch: Long = -1
- var metrics: Option[TaskMetrics] = None
-
// Task context, to be initialized in run().
@transient protected var context: TaskContextImpl = _
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index bd4797ae8e..645e2d2e36 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -203,7 +203,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
- new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown", Seq.empty))
+ new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
}
val activeStages = Buffer[StageInfo]()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 38ca3224ff..6c50c72a91 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -304,20 +304,17 @@ private[spark] object JsonProtocol {
* The behavior here must match that of [[accumValueFromJson]]. Exposed for testing.
*/
private[util] def accumValueToJson(name: Option[String], value: Any): JValue = {
- import AccumulatorParam._
if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
- (value, InternalAccumulator.getParam(name.get)) match {
- case (v: Int, IntAccumulatorParam) => JInt(v)
- case (v: Long, LongAccumulatorParam) => JInt(v)
- case (v: String, StringAccumulatorParam) => JString(v)
- case (v, UpdatedBlockStatusesAccumulatorParam) =>
+ value match {
+ case v: Int => JInt(v)
+ case v: Long => JInt(v)
+ // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
+ // the blocks accumulator, whose type is `Seq[(BlockId, BlockStatus)]`
+ case v =>
JArray(v.asInstanceOf[Seq[(BlockId, BlockStatus)]].toList.map { case (id, status) =>
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
- case (v, p) =>
- throw new IllegalArgumentException(s"unexpected combination of accumulator value " +
- s"type (${v.getClass.getName}) and param (${p.getClass.getName}) in '${name.get}'")
}
} else {
// For all external accumulators, just use strings
@@ -569,7 +566,7 @@ private[spark] object JsonProtocol {
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
stageIds.map { id =>
- new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", Seq.empty)
+ new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown")
}
}
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
@@ -678,7 +675,7 @@ private[spark] object JsonProtocol {
}
val stageInfo = new StageInfo(
- stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details, Seq.empty)
+ stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
@@ -735,25 +732,21 @@ private[spark] object JsonProtocol {
* The behavior here must match that of [[accumValueToJson]]. Exposed for testing.
*/
private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = {
- import AccumulatorParam._
if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
- (value, InternalAccumulator.getParam(name.get)) match {
- case (JInt(v), IntAccumulatorParam) => v.toInt
- case (JInt(v), LongAccumulatorParam) => v.toLong
- case (JString(v), StringAccumulatorParam) => v
- case (JArray(v), UpdatedBlockStatusesAccumulatorParam) =>
+ value match {
+ case JInt(v) => v.toLong
+ case JArray(v) =>
v.map { blockJson =>
val id = BlockId((blockJson \ "Block ID").extract[String])
val status = blockStatusFromJson(blockJson \ "Status")
(id, status)
}
- case (v, p) =>
- throw new IllegalArgumentException(s"unexpected combination of accumulator " +
- s"value in JSON ($v) and accumulator param (${p.getClass.getName}) in '${name.get}'")
- }
- } else {
- value.extract[String]
- }
+ case _ => throw new IllegalArgumentException(s"unexpected json value $value for " +
+ "accumulator " + name.get)
+ }
+ } else {
+ value.extract[String]
+ }
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 37879d11ca..454c42517c 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark
-import java.util.Properties
import java.util.concurrent.Semaphore
import javax.annotation.concurrent.GuardedBy
@@ -29,6 +28,7 @@ import scala.util.control.NonFatal
import org.scalatest.Matchers
import org.scalatest.exceptions.TestFailedException
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.serializer.JavaSerializer
@@ -278,16 +278,13 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
val externalAccums = Seq(acc1, acc2)
- val internalAccums = InternalAccumulator.createAll()
+ val taskMetrics = new TaskMetrics
// Set some values; these should not be observed later on the "executors"
acc1.setValue(10)
acc2.setValue(20L)
- internalAccums
- .find(_.name == Some(InternalAccumulator.TEST_ACCUM))
- .get.asInstanceOf[Accumulator[Long]]
- .setValue(30L)
+ taskMetrics.testAccum.get.asInstanceOf[Accumulator[Long]].setValue(30L)
// Simulate the task being serialized and sent to the executors.
- val dummyTask = new DummyTask(internalAccums, externalAccums)
+ val dummyTask = new DummyTask(taskMetrics, externalAccums)
val serInstance = new JavaSerializer(new SparkConf).newInstance()
val taskSer = Task.serializeWithDependencies(
dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
@@ -298,7 +295,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
taskBytes, Thread.currentThread.getContextClassLoader)
// Assert that executors see only zeros
taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) }
- taskDeser.internalAccums.foreach { a => assert(a.localValue == a.zero) }
+ taskDeser.metrics.internalAccums.foreach { a => assert(a.localValue == a.zero) }
}
}
@@ -402,8 +399,7 @@ private class SaveInfoListener extends SparkListener {
* A dummy [[Task]] that contains internal and external [[Accumulator]]s.
*/
private[spark] class DummyTask(
- val internalAccums: Seq[Accumulator[_]],
- val externalAccums: Seq[Accumulator[_]])
- extends Task[Int](0, 0, 0, internalAccums, new Properties) {
+ metrics: TaskMetrics,
+ val externalAccums: Seq[Accumulator[_]]) extends Task[Int](0, 0, 0, metrics) {
override def runTask(c: TaskContext): Int = 1
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index ee6b991461..c130649830 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -929,7 +929,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details",
- Seq.empty, taskLocalityPreferences)
+ taskLocalityPreferences = taskLocalityPreferences)
}
private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index db087a9c3c..b074b95424 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -19,14 +19,13 @@ package org.apache.spark
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.storage.{BlockId, BlockStatus}
class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
import InternalAccumulator._
- import AccumulatorParam._
override def afterEach(): Unit = {
try {
@@ -36,120 +35,12 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
}
- test("get param") {
- assert(getParam(EXECUTOR_DESERIALIZE_TIME) === LongAccumulatorParam)
- assert(getParam(EXECUTOR_RUN_TIME) === LongAccumulatorParam)
- assert(getParam(RESULT_SIZE) === LongAccumulatorParam)
- assert(getParam(JVM_GC_TIME) === LongAccumulatorParam)
- assert(getParam(RESULT_SERIALIZATION_TIME) === LongAccumulatorParam)
- assert(getParam(MEMORY_BYTES_SPILLED) === LongAccumulatorParam)
- assert(getParam(DISK_BYTES_SPILLED) === LongAccumulatorParam)
- assert(getParam(PEAK_EXECUTION_MEMORY) === LongAccumulatorParam)
- assert(getParam(UPDATED_BLOCK_STATUSES) === UpdatedBlockStatusesAccumulatorParam)
- assert(getParam(TEST_ACCUM) === LongAccumulatorParam)
- // shuffle read
- assert(getParam(shuffleRead.REMOTE_BLOCKS_FETCHED) === IntAccumulatorParam)
- assert(getParam(shuffleRead.LOCAL_BLOCKS_FETCHED) === IntAccumulatorParam)
- assert(getParam(shuffleRead.REMOTE_BYTES_READ) === LongAccumulatorParam)
- assert(getParam(shuffleRead.LOCAL_BYTES_READ) === LongAccumulatorParam)
- assert(getParam(shuffleRead.FETCH_WAIT_TIME) === LongAccumulatorParam)
- assert(getParam(shuffleRead.RECORDS_READ) === LongAccumulatorParam)
- // shuffle write
- assert(getParam(shuffleWrite.BYTES_WRITTEN) === LongAccumulatorParam)
- assert(getParam(shuffleWrite.RECORDS_WRITTEN) === LongAccumulatorParam)
- assert(getParam(shuffleWrite.WRITE_TIME) === LongAccumulatorParam)
- // input
- assert(getParam(input.RECORDS_READ) === LongAccumulatorParam)
- assert(getParam(input.BYTES_READ) === LongAccumulatorParam)
- // output
- assert(getParam(output.RECORDS_WRITTEN) === LongAccumulatorParam)
- assert(getParam(output.BYTES_WRITTEN) === LongAccumulatorParam)
- // default to Long
- assert(getParam(METRICS_PREFIX + "anything") === LongAccumulatorParam)
- intercept[IllegalArgumentException] {
- getParam("something that does not start with the right prefix")
- }
- }
-
- test("create by name") {
- val executorRunTime = create(EXECUTOR_RUN_TIME)
- val updatedBlockStatuses = create(UPDATED_BLOCK_STATUSES)
- val shuffleRemoteBlocksRead = create(shuffleRead.REMOTE_BLOCKS_FETCHED)
- assert(executorRunTime.name === Some(EXECUTOR_RUN_TIME))
- assert(updatedBlockStatuses.name === Some(UPDATED_BLOCK_STATUSES))
- assert(shuffleRemoteBlocksRead.name === Some(shuffleRead.REMOTE_BLOCKS_FETCHED))
- assert(executorRunTime.value.isInstanceOf[Long])
- assert(updatedBlockStatuses.value.isInstanceOf[Seq[_]])
- // We cannot assert the type of the value directly since the type parameter is erased.
- // Instead, try casting a `Seq` of expected type and see if it fails in run time.
- updatedBlockStatuses.setValueAny(Seq.empty[(BlockId, BlockStatus)])
- assert(shuffleRemoteBlocksRead.value.isInstanceOf[Int])
- // default to Long
- val anything = create(METRICS_PREFIX + "anything")
- assert(anything.value.isInstanceOf[Long])
- }
-
- test("create") {
- val accums = createAll()
- val shuffleReadAccums = createShuffleReadAccums()
- val shuffleWriteAccums = createShuffleWriteAccums()
- val inputAccums = createInputAccums()
- val outputAccums = createOutputAccums()
- // assert they're all internal
- assert(accums.forall(_.isInternal))
- assert(shuffleReadAccums.forall(_.isInternal))
- assert(shuffleWriteAccums.forall(_.isInternal))
- assert(inputAccums.forall(_.isInternal))
- assert(outputAccums.forall(_.isInternal))
- // assert they all count on failures
- assert(accums.forall(_.countFailedValues))
- assert(shuffleReadAccums.forall(_.countFailedValues))
- assert(shuffleWriteAccums.forall(_.countFailedValues))
- assert(inputAccums.forall(_.countFailedValues))
- assert(outputAccums.forall(_.countFailedValues))
- // assert they all have names
- assert(accums.forall(_.name.isDefined))
- assert(shuffleReadAccums.forall(_.name.isDefined))
- assert(shuffleWriteAccums.forall(_.name.isDefined))
- assert(inputAccums.forall(_.name.isDefined))
- assert(outputAccums.forall(_.name.isDefined))
- // assert `accums` is a strict superset of the others
- val accumNames = accums.map(_.name.get).toSet
- val shuffleReadAccumNames = shuffleReadAccums.map(_.name.get).toSet
- val shuffleWriteAccumNames = shuffleWriteAccums.map(_.name.get).toSet
- val inputAccumNames = inputAccums.map(_.name.get).toSet
- val outputAccumNames = outputAccums.map(_.name.get).toSet
- assert(shuffleReadAccumNames.subsetOf(accumNames))
- assert(shuffleWriteAccumNames.subsetOf(accumNames))
- assert(inputAccumNames.subsetOf(accumNames))
- assert(outputAccumNames.subsetOf(accumNames))
- }
-
- test("naming") {
- val accums = createAll()
- val shuffleReadAccums = createShuffleReadAccums()
- val shuffleWriteAccums = createShuffleWriteAccums()
- val inputAccums = createInputAccums()
- val outputAccums = createOutputAccums()
- // assert that prefixes are properly namespaced
- assert(SHUFFLE_READ_METRICS_PREFIX.startsWith(METRICS_PREFIX))
- assert(SHUFFLE_WRITE_METRICS_PREFIX.startsWith(METRICS_PREFIX))
- assert(INPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX))
- assert(OUTPUT_METRICS_PREFIX.startsWith(METRICS_PREFIX))
- assert(accums.forall(_.name.get.startsWith(METRICS_PREFIX)))
- // assert they all start with the expected prefixes
- assert(shuffleReadAccums.forall(_.name.get.startsWith(SHUFFLE_READ_METRICS_PREFIX)))
- assert(shuffleWriteAccums.forall(_.name.get.startsWith(SHUFFLE_WRITE_METRICS_PREFIX)))
- assert(inputAccums.forall(_.name.get.startsWith(INPUT_METRICS_PREFIX)))
- assert(outputAccums.forall(_.name.get.startsWith(OUTPUT_METRICS_PREFIX)))
- }
-
test("internal accumulators in TaskContext") {
val taskContext = TaskContext.empty()
val accumUpdates = taskContext.taskMetrics.accumulatorUpdates()
assert(accumUpdates.size > 0)
assert(accumUpdates.forall(_.internal))
- val testAccum = taskContext.taskMetrics.getAccum(TEST_ACCUM)
+ val testAccum = taskContext.taskMetrics.testAccum.get
assert(accumUpdates.exists(_.id == testAccum.id))
}
@@ -160,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
sc.addSparkListener(listener)
// Have each task add 1 to the internal accumulator
val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
- TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1
+ TaskContext.get().taskMetrics().testAccum.get += 1
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -196,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
val rdd = sc.parallelize(1 to 100, numPartitions)
.map { i => (i, i) }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1
+ TaskContext.get().taskMetrics().testAccum.get += 1
iter
}
.reduceByKey { case (x, y) => x + y }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 10
+ TaskContext.get().taskMetrics().testAccum.get += 10
iter
}
.repartition(numPartitions * 2)
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 100
+ TaskContext.get().taskMetrics().testAccum.get += 100
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -236,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
// This should retry both stages in the scheduler. Note that we only want to fail the
// first stage attempt because we want the stage to eventually succeed.
val x = sc.parallelize(1 to 100, numPartitions)
- .mapPartitions { iter => TaskContext.get().taskMetrics().getAccum(TEST_ACCUM) += 1; iter }
+ .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter }
.groupBy(identity)
val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId
val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
@@ -294,15 +185,15 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
}
assert(Accumulators.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
- val internalAccums = InternalAccumulator.createAll()
+ val numInternalAccums = TaskMetrics.empty.internalAccums.length
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
- assert(Accumulators.originals.size === internalAccums.size * 2)
+ assert(Accumulators.originals.size === numInternalAccums * 2)
val accumsRegistered = sc.cleaner match {
case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
case _ => Seq.empty[Long]
}
// Make sure the same set of accumulators is registered for cleanup
- assert(accumsRegistered.size === internalAccums.size * 2)
+ assert(accumsRegistered.size === numInternalAccums * 2)
assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet)
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 079109d137..a854f5bb9b 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -336,16 +336,14 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
- new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.createAll(sc)))
+ new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem))
val data1 = (1 to 10).map { x => x -> x}
// second attempt -- also successful. We'll write out different data,
// just to simulate the fact that the records may get written differently
// depending on what gets spilled, what gets combined, etc.
val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0,
- new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.createAll(sc)))
+ new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem))
val data2 = (11 to 20).map { x => x -> x}
// interleave writes of both attempts -- we want to test that both attempts can occur
@@ -373,8 +371,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
- new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.createAll(sc)))
+ new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem))
val readData = reader.read().toIndexedSeq
assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index a263fce8ab..fbc2fae08d 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -26,102 +26,20 @@ import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId
class TaskMetricsSuite extends SparkFunSuite {
import AccumulatorParam._
- import InternalAccumulator._
import StorageLevel._
import TaskMetricsSuite._
- test("create with unnamed accum") {
- intercept[IllegalArgumentException] {
- new TaskMetrics(
- InternalAccumulator.createAll() ++ Seq(
- new Accumulator(0, IntAccumulatorParam, None, internal = true)))
- }
- }
-
- test("create with duplicate name accum") {
- intercept[IllegalArgumentException] {
- new TaskMetrics(
- InternalAccumulator.createAll() ++ Seq(
- new Accumulator(0, IntAccumulatorParam, Some(RESULT_SIZE), internal = true)))
- }
- }
-
- test("create with external accum") {
- intercept[IllegalArgumentException] {
- new TaskMetrics(
- InternalAccumulator.createAll() ++ Seq(
- new Accumulator(0, IntAccumulatorParam, Some("x"))))
- }
- }
-
- test("create shuffle read metrics") {
- import shuffleRead._
- val accums = InternalAccumulator.createShuffleReadAccums()
- .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
- accums(REMOTE_BLOCKS_FETCHED).setValueAny(1)
- accums(LOCAL_BLOCKS_FETCHED).setValueAny(2)
- accums(REMOTE_BYTES_READ).setValueAny(3L)
- accums(LOCAL_BYTES_READ).setValueAny(4L)
- accums(FETCH_WAIT_TIME).setValueAny(5L)
- accums(RECORDS_READ).setValueAny(6L)
- val sr = new ShuffleReadMetrics(accums)
- assert(sr.remoteBlocksFetched === 1)
- assert(sr.localBlocksFetched === 2)
- assert(sr.remoteBytesRead === 3L)
- assert(sr.localBytesRead === 4L)
- assert(sr.fetchWaitTime === 5L)
- assert(sr.recordsRead === 6L)
- }
-
- test("create shuffle write metrics") {
- import shuffleWrite._
- val accums = InternalAccumulator.createShuffleWriteAccums()
- .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
- accums(BYTES_WRITTEN).setValueAny(1L)
- accums(RECORDS_WRITTEN).setValueAny(2L)
- accums(WRITE_TIME).setValueAny(3L)
- val sw = new ShuffleWriteMetrics(accums)
- assert(sw.bytesWritten === 1L)
- assert(sw.recordsWritten === 2L)
- assert(sw.writeTime === 3L)
- }
-
- test("create input metrics") {
- import input._
- val accums = InternalAccumulator.createInputAccums()
- .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
- accums(BYTES_READ).setValueAny(1L)
- accums(RECORDS_READ).setValueAny(2L)
- val im = new InputMetrics(accums)
- assert(im.bytesRead === 1L)
- assert(im.recordsRead === 2L)
- }
-
- test("create output metrics") {
- import output._
- val accums = InternalAccumulator.createOutputAccums()
- .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]
- accums(BYTES_WRITTEN).setValueAny(1L)
- accums(RECORDS_WRITTEN).setValueAny(2L)
- val om = new OutputMetrics(accums)
- assert(om.bytesWritten === 1L)
- assert(om.recordsWritten === 2L)
- }
-
test("mutating values") {
- val accums = InternalAccumulator.createAll()
- val tm = new TaskMetrics(accums)
- // initial values
- assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 0L)
- assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 0L)
- assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 0L)
- assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 0L)
- assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 0L)
- assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 0L)
- assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 0L)
- assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 0L)
- assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES,
- Seq.empty[(BlockId, BlockStatus)])
+ val tm = new TaskMetrics
+ assert(tm.executorDeserializeTime == 0L)
+ assert(tm.executorRunTime == 0L)
+ assert(tm.resultSize == 0L)
+ assert(tm.jvmGCTime == 0L)
+ assert(tm.resultSerializationTime == 0L)
+ assert(tm.memoryBytesSpilled == 0L)
+ assert(tm.diskBytesSpilled == 0L)
+ assert(tm.peakExecutionMemory == 0L)
+ assert(tm.updatedBlockStatuses.isEmpty)
// set or increment values
tm.setExecutorDeserializeTime(100L)
tm.setExecutorDeserializeTime(1L) // overwrite
@@ -144,36 +62,27 @@ class TaskMetricsSuite extends SparkFunSuite {
tm.incUpdatedBlockStatuses(Seq(block1))
tm.incUpdatedBlockStatuses(Seq(block2))
// assert new values exist
- assertValueEquals(tm, _.executorDeserializeTime, accums, EXECUTOR_DESERIALIZE_TIME, 1L)
- assertValueEquals(tm, _.executorRunTime, accums, EXECUTOR_RUN_TIME, 2L)
- assertValueEquals(tm, _.resultSize, accums, RESULT_SIZE, 3L)
- assertValueEquals(tm, _.jvmGCTime, accums, JVM_GC_TIME, 4L)
- assertValueEquals(tm, _.resultSerializationTime, accums, RESULT_SERIALIZATION_TIME, 5L)
- assertValueEquals(tm, _.memoryBytesSpilled, accums, MEMORY_BYTES_SPILLED, 606L)
- assertValueEquals(tm, _.diskBytesSpilled, accums, DISK_BYTES_SPILLED, 707L)
- assertValueEquals(tm, _.peakExecutionMemory, accums, PEAK_EXECUTION_MEMORY, 808L)
- assertValueEquals(tm, _.updatedBlockStatuses, accums, UPDATED_BLOCK_STATUSES,
- Seq(block1, block2))
+ assert(tm.executorDeserializeTime == 1L)
+ assert(tm.executorRunTime == 2L)
+ assert(tm.resultSize == 3L)
+ assert(tm.jvmGCTime == 4L)
+ assert(tm.resultSerializationTime == 5L)
+ assert(tm.memoryBytesSpilled == 606L)
+ assert(tm.diskBytesSpilled == 707L)
+ assert(tm.peakExecutionMemory == 808L)
+ assert(tm.updatedBlockStatuses == Seq(block1, block2))
}
test("mutating shuffle read metrics values") {
- import shuffleRead._
- val accums = InternalAccumulator.createAll()
- val tm = new TaskMetrics(accums)
- def assertValEquals[T](tmValue: ShuffleReadMetrics => T, name: String, value: T): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.shuffleReadMetrics), accums, name, value)
- }
- // create shuffle read metrics
- tm.createTempShuffleReadMetrics()
- tm.mergeShuffleReadMetrics()
+ val tm = new TaskMetrics
val sr = tm.shuffleReadMetrics
// initial values
- assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 0)
- assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 0)
- assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 0L)
- assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 0L)
- assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 0L)
- assertValEquals(_.recordsRead, RECORDS_READ, 0L)
+ assert(sr.remoteBlocksFetched == 0)
+ assert(sr.localBlocksFetched == 0)
+ assert(sr.remoteBytesRead == 0L)
+ assert(sr.localBytesRead == 0L)
+ assert(sr.fetchWaitTime == 0L)
+ assert(sr.recordsRead == 0L)
// set and increment values
sr.setRemoteBlocksFetched(100)
sr.setRemoteBlocksFetched(10)
@@ -200,27 +109,21 @@ class TaskMetricsSuite extends SparkFunSuite {
sr.incRecordsRead(6L)
sr.incRecordsRead(6L)
// assert new values exist
- assertValEquals(_.remoteBlocksFetched, REMOTE_BLOCKS_FETCHED, 12)
- assertValEquals(_.localBlocksFetched, LOCAL_BLOCKS_FETCHED, 24)
- assertValEquals(_.remoteBytesRead, REMOTE_BYTES_READ, 36L)
- assertValEquals(_.localBytesRead, LOCAL_BYTES_READ, 48L)
- assertValEquals(_.fetchWaitTime, FETCH_WAIT_TIME, 60L)
- assertValEquals(_.recordsRead, RECORDS_READ, 72L)
+ assert(sr.remoteBlocksFetched == 12)
+ assert(sr.localBlocksFetched == 24)
+ assert(sr.remoteBytesRead == 36L)
+ assert(sr.localBytesRead == 48L)
+ assert(sr.fetchWaitTime == 60L)
+ assert(sr.recordsRead == 72L)
}
test("mutating shuffle write metrics values") {
- import shuffleWrite._
- val accums = InternalAccumulator.createAll()
- val tm = new TaskMetrics(accums)
- def assertValEquals[T](tmValue: ShuffleWriteMetrics => T, name: String, value: T): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.shuffleWriteMetrics), accums, name, value)
- }
- // create shuffle write metrics
+ val tm = new TaskMetrics
val sw = tm.shuffleWriteMetrics
// initial values
- assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
- assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
- assertValEquals(_.writeTime, WRITE_TIME, 0L)
+ assert(sw.bytesWritten == 0L)
+ assert(sw.recordsWritten == 0L)
+ assert(sw.writeTime == 0L)
// increment and decrement values
sw.incBytesWritten(100L)
sw.incBytesWritten(10L) // 100 + 10
@@ -233,55 +136,41 @@ class TaskMetricsSuite extends SparkFunSuite {
sw.incWriteTime(300L)
sw.incWriteTime(30L)
// assert new values exist
- assertValEquals(_.bytesWritten, BYTES_WRITTEN, 108L)
- assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 216L)
- assertValEquals(_.writeTime, WRITE_TIME, 330L)
+ assert(sw.bytesWritten == 108L)
+ assert(sw.recordsWritten == 216L)
+ assert(sw.writeTime == 330L)
}
test("mutating input metrics values") {
- import input._
- val accums = InternalAccumulator.createAll()
- val tm = new TaskMetrics(accums)
- def assertValEquals(tmValue: InputMetrics => Any, name: String, value: Any): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.inputMetrics), accums, name, value,
- (x: Any, y: Any) => assert(x.toString === y.toString))
- }
- // create input metrics
+ val tm = new TaskMetrics
val in = tm.inputMetrics
// initial values
- assertValEquals(_.bytesRead, BYTES_READ, 0L)
- assertValEquals(_.recordsRead, RECORDS_READ, 0L)
+ assert(in.bytesRead == 0L)
+ assert(in.recordsRead == 0L)
// set and increment values
in.setBytesRead(1L)
in.setBytesRead(2L)
in.incRecordsRead(1L)
in.incRecordsRead(2L)
// assert new values exist
- assertValEquals(_.bytesRead, BYTES_READ, 2L)
- assertValEquals(_.recordsRead, RECORDS_READ, 3L)
+ assert(in.bytesRead == 2L)
+ assert(in.recordsRead == 3L)
}
test("mutating output metrics values") {
- import output._
- val accums = InternalAccumulator.createAll()
- val tm = new TaskMetrics(accums)
- def assertValEquals(tmValue: OutputMetrics => Any, name: String, value: Any): Unit = {
- assertValueEquals(tm, tm => tmValue(tm.outputMetrics), accums, name, value,
- (x: Any, y: Any) => assert(x.toString === y.toString))
- }
- // create input metrics
+ val tm = new TaskMetrics
val out = tm.outputMetrics
// initial values
- assertValEquals(_.bytesWritten, BYTES_WRITTEN, 0L)
- assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 0L)
+ assert(out.bytesWritten == 0L)
+ assert(out.recordsWritten == 0L)
// set values
out.setBytesWritten(1L)
out.setBytesWritten(2L)
out.setRecordsWritten(3L)
out.setRecordsWritten(4L)
// assert new values exist
- assertValEquals(_.bytesWritten, BYTES_WRITTEN, 2L)
- assertValEquals(_.recordsWritten, RECORDS_WRITTEN, 4L)
+ assert(out.bytesWritten == 2L)
+ assert(out.recordsWritten == 4L)
}
test("merging multiple shuffle read metrics") {
@@ -305,9 +194,7 @@ class TaskMetricsSuite extends SparkFunSuite {
}
test("additional accumulables") {
- val internalAccums = InternalAccumulator.createAll()
- val tm = new TaskMetrics(internalAccums)
- assert(tm.accumulatorUpdates().size === internalAccums.size)
+ val tm = new TaskMetrics
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
@@ -338,47 +225,11 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(newUpdates(acc4.id).countFailedValues)
assert(newUpdates.values.map(_.update).forall(_.isDefined))
assert(newUpdates.values.map(_.value).forall(_.isEmpty))
- assert(newUpdates.size === internalAccums.size + 4)
- }
-
- test("existing values in shuffle read accums") {
- // set shuffle read accum before passing it into TaskMetrics
- val accums = InternalAccumulator.createAll()
- val srAccum = accums.find(_.name === Some(shuffleRead.FETCH_WAIT_TIME))
- assert(srAccum.isDefined)
- srAccum.get.asInstanceOf[Accumulator[Long]] += 10L
- val tm = new TaskMetrics(accums)
- }
-
- test("existing values in shuffle write accums") {
- // set shuffle write accum before passing it into TaskMetrics
- val accums = InternalAccumulator.createAll()
- val swAccum = accums.find(_.name === Some(shuffleWrite.RECORDS_WRITTEN))
- assert(swAccum.isDefined)
- swAccum.get.asInstanceOf[Accumulator[Long]] += 10L
- val tm = new TaskMetrics(accums)
- }
-
- test("existing values in input accums") {
- // set input accum before passing it into TaskMetrics
- val accums = InternalAccumulator.createAll()
- val inAccum = accums.find(_.name === Some(input.RECORDS_READ))
- assert(inAccum.isDefined)
- inAccum.get.asInstanceOf[Accumulator[Long]] += 10L
- val tm = new TaskMetrics(accums)
- }
-
- test("existing values in output accums") {
- // set output accum before passing it into TaskMetrics
- val accums = InternalAccumulator.createAll()
- val outAccum = accums.find(_.name === Some(output.RECORDS_WRITTEN))
- assert(outAccum.isDefined)
- outAccum.get.asInstanceOf[Accumulator[Long]] += 10L
- val tm4 = new TaskMetrics(accums)
+ assert(newUpdates.size === tm.internalAccums.size + 4)
}
test("from accumulator updates") {
- val accumUpdates1 = InternalAccumulator.createAll().map { a =>
+ val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a =>
AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
}
val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
@@ -413,29 +264,6 @@ class TaskMetricsSuite extends SparkFunSuite {
private[spark] object TaskMetricsSuite extends Assertions {
/**
- * Assert that the following three things are equal to `value`:
- * (1) TaskMetrics value
- * (2) TaskMetrics accumulator update value
- * (3) Original accumulator value
- */
- def assertValueEquals(
- tm: TaskMetrics,
- tmValue: TaskMetrics => Any,
- accums: Seq[Accumulator[_]],
- metricName: String,
- value: Any,
- assertEquals: (Any, Any) => Unit = (x: Any, y: Any) => assert(x === y)): Unit = {
- assertEquals(tmValue(tm), value)
- val accum = accums.find(_.name == Some(metricName))
- assert(accum.isDefined)
- assertEquals(accum.get.value, value)
- val accumUpdate = tm.accumulatorUpdates().find(_.name == Some(metricName))
- assert(accumUpdate.isDefined)
- assert(accumUpdate.get.value === None)
- assertEquals(accumUpdate.get.update, Some(value))
- }
-
- /**
* Assert that two lists of accumulator updates are equal.
* Note: this does NOT check accumulator ID equality.
*/
@@ -458,5 +286,4 @@ private[spark] object TaskMetricsSuite extends Assertions {
* info as an accumulator update.
*/
def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
-
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index fd96fb04f8..b76c0a4bd1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// SPARK-9809 -- this stage is submitted without a task for each partition (because some of
// the shuffle map output is still available from stage 0); make sure we've still got internal
// accumulators setup
- assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty)
+ assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics != null)
completeShuffleMapStageSuccessfully(2, 0, 2)
completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
assert(results === Map(0 -> 1234, 1 -> 1235))
@@ -2010,7 +2010,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
val accumUpdates = reason match {
- case Success => task.initialAccumulators.map { a => a.toInfo(Some(a.zero), None) }
+ case Success => task.metrics.accumulatorUpdates()
case ef: ExceptionFailure => ef.accumUpdates
case _ => Seq.empty[AccumulableInfo]
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index e3e6df6831..4fe705b201 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -17,14 +17,11 @@
package org.apache.spark.scheduler
-import java.util.Properties
-
import org.apache.spark.TaskContext
class FakeTask(
stageId: Int,
- prefLocs: Seq[TaskLocation] = Nil)
- extends Task[Int](stageId, 0, 0, Seq.empty, new Properties) {
+ prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 0) {
override def runTask(context: TaskContext): Int = 0
override def preferredLocations: Seq[TaskLocation] = prefLocs
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
index 76a7087645..255be6f46b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala
@@ -18,7 +18,6 @@
package org.apache.spark.scheduler
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
-import java.util.Properties
import org.apache.spark.TaskContext
@@ -26,7 +25,7 @@ import org.apache.spark.TaskContext
* A Task implementation that fails to serialize.
*/
private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int)
- extends Task[Array[Byte]](stageId, 0, 0, Seq.empty, new Properties) {
+ extends Task[Array[Byte]](stageId, 0, 0) {
override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte]
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 86911d2211..bda4c996b2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -24,7 +24,7 @@ import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.apache.spark._
-import org.apache.spark.executor.{Executor, TaskMetricsSuite}
+import org.apache.spark.executor.{Executor, TaskMetrics, TaskMetricsSuite}
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.network.util.JavaUtils
@@ -62,7 +62,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
val task = new ResultTask[String, String](
- 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties)
+ 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics)
intercept[RuntimeException] {
task.run(0, 0, null)
}
@@ -83,7 +83,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val func = (c: TaskContext, i: Iterator[String]) => i.next()
val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func))))
val task = new ResultTask[String, String](
- 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties)
+ 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics)
intercept[RuntimeException] {
task.run(0, 0, null)
}
@@ -171,26 +171,27 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
val param = AccumulatorParam.LongAccumulatorParam
val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
- val initialAccums = InternalAccumulator.createAll()
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
- val task = new Task[Int](0, 0, 0, Seq.empty[Accumulator[_]], new Properties) {
+ val taskMetrics = new TaskMetrics
+ val task = new Task[Int](0, 0, 0) {
context = new TaskContextImpl(0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
new Properties,
SparkEnv.get.metricsSystem,
- initialAccums)
- context.taskMetrics.registerAccumulator(acc1)
- context.taskMetrics.registerAccumulator(acc2)
+ taskMetrics)
+ taskMetrics.registerAccumulator(acc1)
+ taskMetrics.registerAccumulator(acc2)
override def runTask(tc: TaskContext): Int = 0
}
// First, simulate task success. This should give us all the accumulators.
val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
- val accumUpdates2 = (initialAccums ++ Seq(acc1, acc2)).map(TaskMetricsSuite.makeInfo)
+ val accumUpdates2 = (taskMetrics.internalAccums ++ Seq(acc1, acc2))
+ .map(TaskMetricsSuite.makeInfo)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
// Now, simulate task failures. This should give us only the accums that count failed values.
val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
- val accumUpdates4 = (initialAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
+ val accumUpdates4 = (taskMetrics.internalAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ade8e84d84..ecf4b76da5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,9 +17,8 @@
package org.apache.spark.scheduler
-import java.util.{Properties, Random}
+import java.util.Random
-import scala.collection.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -138,7 +137,8 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
/**
* A Task implementation that results in a large serialized task.
*/
-class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0, Seq.empty, new Properties) {
+class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0, 0) {
+
val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024)
val random = new Random(0)
random.nextBytes(randomBuffer)
@@ -166,7 +166,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
- val accumUpdates = taskSet.tasks.head.initialAccumulators.map { a => a.toInfo(Some(0L), None) }
+ val accumUpdates =
+ taskSet.tasks.head.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
@@ -185,7 +186,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
- task.initialAccumulators.map { a => a.toInfo(Some(0L), None) }
+ task.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
}
// First three offers should all find tasks
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
index 88817dccf3..d223af1496 100644
--- a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
@@ -38,7 +38,7 @@ class AllStagesResourceSuite extends SparkFunSuite {
stageUiData.taskData = tasks
val status = StageStatus.ACTIVE
val stageInfo = new StageInfo(
- 1, 1, "stage 1", 10, Seq.empty, Seq.empty, "details abc", Seq.empty)
+ 1, 1, "stage 1", 10, Seq.empty, Seq.empty, "details abc")
val stageData = AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, false)
stageData.firstTaskLaunchedTime
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 85c877e3dd..221124829f 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -269,9 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"
def makeTaskMetrics(base: Int): TaskMetrics = {
- val accums = InternalAccumulator.createAll()
- accums.foreach(Accumulators.register)
- val taskMetrics = new TaskMetrics(accums)
+ val taskMetrics = new TaskMetrics
val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
val inputMetrics = taskMetrics.inputMetrics
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7b15f58558..65b0a97e4d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -656,6 +656,10 @@ object MimaExcludes {
// [SPARK-14407] Hides HadoopFsRelation related data source API into execution package
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.OutputWriter"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.OutputWriterFactory")
+ ) ++ Seq(
+ // SPARK-14704: Create accumulators in TaskMetrics
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.this")
)
case v if v.startsWith("1.6") =>
Seq(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 01687877ee..53105e0b24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File}
import java.util.Properties
import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
@@ -113,8 +114,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
(i, converter(Row(i)))
}
val taskMemoryManager = new TaskMemoryManager(sc.env.memoryManager, 0)
- val taskContext = new TaskContextImpl(
- 0, 0, 0, 0, taskMemoryManager, new Properties, null, InternalAccumulator.createAll(sc))
+ val taskContext = new TaskContextImpl(0, 0, 0, 0, taskMemoryManager, new Properties, null)
val sorter = new ExternalSorter[Int, UnsafeRow, UnsafeRow](
taskContext,