aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java8
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala101
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/InternalAccumulator.scala199
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/executor/InputMetrics.scala81
-rw-r--r--core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala104
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala62
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala370
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala102
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala124
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala3
38 files changed, 1184 insertions, 592 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index d3d79a27ea..128a82579b 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -444,13 +444,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public Option<MapStatus> stop(boolean success) {
try {
- // Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
- Map<String, Accumulator<Object>> internalAccumulators =
- taskContext.internalMetricsToAccumulators();
- if (internalAccumulators != null) {
- internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
- .add(getPeakMemoryUsedBytes());
- }
+ taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes());
if (stopping) {
return Option.apply(null);
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index a456d420b8..bde136141f 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -35,40 +35,67 @@ import org.apache.spark.util.Utils
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
+ * All accumulators created on the driver to be used on the executors must be registered with
+ * [[Accumulators]]. This is already done automatically for accumulators created by the user.
+ * Internal accumulators must be explicitly registered by the caller.
+ *
+ * Operations are not thread-safe.
+ *
+ * @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
* thread safe so that they can be reported correctly.
+ * @param countFailedValues whether to accumulate values from failed tasks. This is set to true
+ * for system and time metrics like serialization time or bytes spilled,
+ * and false for things with absolute values like number of input rows.
+ * This should be used for internal metrics only.
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
-class Accumulable[R, T] private[spark] (
- initialValue: R,
+class Accumulable[R, T] private (
+ val id: Long,
+ @transient initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
- internal: Boolean)
+ internal: Boolean,
+ private[spark] val countFailedValues: Boolean)
extends Serializable {
private[spark] def this(
- @transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = {
- this(initialValue, param, None, internal)
+ initialValue: R,
+ param: AccumulableParam[R, T],
+ name: Option[String],
+ internal: Boolean,
+ countFailedValues: Boolean) = {
+ this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
}
- def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
- this(initialValue, param, name, false)
+ private[spark] def this(
+ initialValue: R,
+ param: AccumulableParam[R, T],
+ name: Option[String],
+ internal: Boolean) = {
+ this(initialValue, param, name, internal, false /* countFailedValues */)
+ }
- def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
- this(initialValue, param, None)
+ def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
+ this(initialValue, param, name, false /* internal */)
- val id: Long = Accumulators.newId
+ def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
- @volatile @transient private var value_ : R = initialValue // Current value on master
- val zero = param.zero(initialValue) // Zero value to be passed to workers
+ @volatile @transient private var value_ : R = initialValue // Current value on driver
+ val zero = param.zero(initialValue) // Zero value to be passed to executors
private var deserialized = false
- Accumulators.register(this)
+ // In many places we create internal accumulators without access to the active context cleaner,
+ // so if we register them here then we may never unregister these accumulators. To avoid memory
+ // leaks, we require the caller to explicitly register internal accumulators elsewhere.
+ if (!internal) {
+ Accumulators.register(this)
+ }
/**
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
@@ -78,6 +105,17 @@ class Accumulable[R, T] private[spark] (
private[spark] def isInternal: Boolean = internal
/**
+ * Return a copy of this [[Accumulable]].
+ *
+ * The copy will have the same ID as the original and will not be registered with
+ * [[Accumulators]] again. This method exists so that the caller can avoid passing the
+ * same mutable instance around.
+ */
+ private[spark] def copy(): Accumulable[R, T] = {
+ new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
+ }
+
+ /**
* Add more data to this accumulator / accumulable
* @param term the data to add
*/
@@ -106,7 +144,7 @@ class Accumulable[R, T] private[spark] (
def merge(term: R) { value_ = param.addInPlace(value_, term)}
/**
- * Access the accumulator's current value; only allowed on master.
+ * Access the accumulator's current value; only allowed on driver.
*/
def value: R = {
if (!deserialized) {
@@ -128,7 +166,7 @@ class Accumulable[R, T] private[spark] (
def localValue: R = value_
/**
- * Set the accumulator's value; only allowed on master.
+ * Set the accumulator's value; only allowed on driver.
*/
def value_= (newValue: R) {
if (!deserialized) {
@@ -139,22 +177,24 @@ class Accumulable[R, T] private[spark] (
}
/**
- * Set the accumulator's value; only allowed on master
+ * Set the accumulator's value. For internal use only.
*/
- def setValue(newValue: R) {
- this.value = newValue
- }
+ def setValue(newValue: R): Unit = { value_ = newValue }
+
+ /**
+ * Set the accumulator's value. For internal use only.
+ */
+ private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }
// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
+
// Automatically register the accumulator when it is deserialized with the task closure.
- //
- // Note internal accumulators sent with task are deserialized before the TaskContext is created
- // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
- // metrics, still need to register here.
+ // This is for external accumulators and internal ones that do not represent task level
+ // metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 007136e6ae..558bd447e2 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -17,9 +17,14 @@
package org.apache.spark
-import scala.collection.{mutable, Map}
+import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
import scala.ref.WeakReference
+import org.apache.spark.storage.{BlockId, BlockStatus}
+
/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
@@ -49,14 +54,18 @@ import scala.ref.WeakReference
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
+ * @param name human-readable name associated with this accumulator
+ * @param internal whether this accumulator is used internally within Spark only
+ * @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
- internal: Boolean)
- extends Accumulable[T, T](initialValue, param, name, internal) {
+ internal: Boolean,
+ override val countFailedValues: Boolean = false)
+ extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
@@ -75,43 +84,63 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
+ * TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
*/
+ @GuardedBy("Accumulators")
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()
- private var lastId: Long = 0
+ private val nextId = new AtomicLong(0L)
- def newId(): Long = synchronized {
- lastId += 1
- lastId
- }
+ /**
+ * Return a globally unique ID for a new [[Accumulable]].
+ * Note: Once you copy the [[Accumulable]] the ID is no longer unique.
+ */
+ def newId(): Long = nextId.getAndIncrement
+ /**
+ * Register an [[Accumulable]] created on the driver such that it can be used on the executors.
+ *
+ * All accumulators registered here can later be used as a container for accumulating partial
+ * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
+ * Note: if an accumulator is registered here, it should also be registered with the active
+ * context cleaner for cleanup so as to avoid memory leaks.
+ *
+ * If an [[Accumulable]] with the same ID was already registered, this does nothing instead
+ * of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct
+ * [[org.apache.spark.executor.TaskMetrics]] from accumulator updates.
+ */
def register(a: Accumulable[_, _]): Unit = synchronized {
- originals(a.id) = new WeakReference[Accumulable[_, _]](a)
+ if (!originals.contains(a.id)) {
+ originals(a.id) = new WeakReference[Accumulable[_, _]](a)
+ }
}
- def remove(accId: Long) {
- synchronized {
- originals.remove(accId)
- }
+ /**
+ * Unregister the [[Accumulable]] with the given ID, if any.
+ */
+ def remove(accId: Long): Unit = synchronized {
+ originals.remove(accId)
}
- // Add values to the original accumulators with some given IDs
- def add(values: Map[Long, Any]): Unit = synchronized {
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- // Since we are now storing weak references, we must check whether the underlying data
- // is valid.
- originals(id).get match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
- case None =>
- throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
- }
- } else {
- logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
+ /**
+ * Return the [[Accumulable]] registered with the given ID, if any.
+ */
+ def get(id: Long): Option[Accumulable[_, _]] = synchronized {
+ originals.get(id).map { weakRef =>
+ // Since we are storing weak references, we must check whether the underlying data is valid.
+ weakRef.get.getOrElse {
+ throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id")
}
}
}
+ /**
+ * Clear all registered [[Accumulable]]s. For testing only.
+ */
+ def clear(): Unit = synchronized {
+ originals.clear()
+ }
+
}
@@ -156,5 +185,23 @@ object AccumulatorParam {
def zero(initialValue: Float): Float = 0f
}
- // TODO: Add AccumulatorParams for other types, e.g. lists and strings
+ // Note: when merging values, this param just adopts the newer value. This is used only
+ // internally for things that shouldn't really be accumulated across tasks, like input
+ // read method, which should be the same across all tasks in the same stage.
+ private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
+ def addInPlace(t1: String, t2: String): String = t2
+ def zero(initialValue: String): String = ""
+ }
+
+ // Note: this is expensive as it makes a copy of the list every time the caller adds an item.
+ // A better way to use this is to first accumulate the values yourself then them all at once.
+ private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
+ def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
+ def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
+ }
+
+ // For the internal metric that records what blocks are updated in a particular task
+ private[spark] object UpdatedBlockStatusesAccumulatorParam
+ extends ListAccumulatorParam[(BlockId, BlockStatus)]
+
}
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 62629000cf..e493d9a3cf 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -57,8 +57,7 @@ case class Aggregator[K, V, C] (
Option(context).foreach { c =>
c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
- c.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+ c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index e03977828b..45b20c0e8d 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
- taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates
blockManagerId: BlockManagerId)
/**
@@ -119,14 +119,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)
// Messages received from executors
- case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
+ case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
- executorId, taskMetrics, blockManagerId)
+ executorId, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
index 6ea997c079..c191122c06 100644
--- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
+++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala
@@ -17,42 +17,193 @@
package org.apache.spark
+import org.apache.spark.storage.{BlockId, BlockStatus}
-// This is moved to its own file because many more things will be added to it in SPARK-10620.
+
+/**
+ * A collection of fields and methods concerned with internal accumulators that represent
+ * task level metrics.
+ */
private[spark] object InternalAccumulator {
- val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
- val TEST_ACCUMULATOR = "testAccumulator"
-
- // For testing only.
- // This needs to be a def since we don't want to reuse the same accumulator across stages.
- private def maybeTestAccumulator: Option[Accumulator[Long]] = {
- if (sys.props.contains("spark.testing")) {
- Some(new Accumulator(
- 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
- } else {
- None
+
+ import AccumulatorParam._
+
+ // Prefixes used in names of internal task level metrics
+ val METRICS_PREFIX = "internal.metrics."
+ val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
+ val SHUFFLE_WRITE_METRICS_PREFIX = METRICS_PREFIX + "shuffle.write."
+ val OUTPUT_METRICS_PREFIX = METRICS_PREFIX + "output."
+ val INPUT_METRICS_PREFIX = METRICS_PREFIX + "input."
+
+ // Names of internal task level metrics
+ val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime"
+ val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
+ val RESULT_SIZE = METRICS_PREFIX + "resultSize"
+ val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
+ val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime"
+ val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
+ val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
+ val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
+ val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
+ val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"
+
+ // scalastyle:off
+
+ // Names of shuffle read metrics
+ object shuffleRead {
+ val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "remoteBlocksFetched"
+ val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "localBlocksFetched"
+ val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead"
+ val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
+ val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
+ val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+ }
+
+ // Names of shuffle write metrics
+ object shuffleWrite {
+ val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten"
+ val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten"
+ val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime"
+ }
+
+ // Names of output metrics
+ object output {
+ val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod"
+ val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten"
+ val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten"
+ }
+
+ // Names of input metrics
+ object input {
+ val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod"
+ val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead"
+ val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead"
+ }
+
+ // scalastyle:on
+
+ /**
+ * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]].
+ */
+ def create(name: String): Accumulator[_] = {
+ require(name.startsWith(METRICS_PREFIX),
+ s"internal accumulator name must start with '$METRICS_PREFIX': $name")
+ getParam(name) match {
+ case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
+ case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
+ case p @ StringAccumulatorParam => newMetric[String]("", name, p)
+ case p @ UpdatedBlockStatusesAccumulatorParam =>
+ newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
+ case p => throw new IllegalArgumentException(
+ s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.")
+ }
+ }
+
+ /**
+ * Get the [[AccumulatorParam]] associated with the internal metric name,
+ * which must begin with [[METRICS_PREFIX]].
+ */
+ def getParam(name: String): AccumulatorParam[_] = {
+ require(name.startsWith(METRICS_PREFIX),
+ s"internal accumulator name must start with '$METRICS_PREFIX': $name")
+ name match {
+ case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
+ case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
+ case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
+ case input.READ_METHOD => StringAccumulatorParam
+ case output.WRITE_METHOD => StringAccumulatorParam
+ case _ => LongAccumulatorParam
}
}
/**
* Accumulators for tracking internal metrics.
+ */
+ def create(): Seq[Accumulator[_]] = {
+ Seq[String](
+ EXECUTOR_DESERIALIZE_TIME,
+ EXECUTOR_RUN_TIME,
+ RESULT_SIZE,
+ JVM_GC_TIME,
+ RESULT_SERIALIZATION_TIME,
+ MEMORY_BYTES_SPILLED,
+ DISK_BYTES_SPILLED,
+ PEAK_EXECUTION_MEMORY,
+ UPDATED_BLOCK_STATUSES).map(create) ++
+ createShuffleReadAccums() ++
+ createShuffleWriteAccums() ++
+ createInputAccums() ++
+ createOutputAccums() ++
+ sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq
+ }
+
+ /**
+ * Accumulators for tracking shuffle read metrics.
+ */
+ def createShuffleReadAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ shuffleRead.REMOTE_BLOCKS_FETCHED,
+ shuffleRead.LOCAL_BLOCKS_FETCHED,
+ shuffleRead.REMOTE_BYTES_READ,
+ shuffleRead.LOCAL_BYTES_READ,
+ shuffleRead.FETCH_WAIT_TIME,
+ shuffleRead.RECORDS_READ).map(create)
+ }
+
+ /**
+ * Accumulators for tracking shuffle write metrics.
+ */
+ def createShuffleWriteAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ shuffleWrite.BYTES_WRITTEN,
+ shuffleWrite.RECORDS_WRITTEN,
+ shuffleWrite.WRITE_TIME).map(create)
+ }
+
+ /**
+ * Accumulators for tracking input metrics.
+ */
+ def createInputAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ input.READ_METHOD,
+ input.BYTES_READ,
+ input.RECORDS_READ).map(create)
+ }
+
+ /**
+ * Accumulators for tracking output metrics.
+ */
+ def createOutputAccums(): Seq[Accumulator[_]] = {
+ Seq[String](
+ output.WRITE_METHOD,
+ output.BYTES_WRITTEN,
+ output.RECORDS_WRITTEN).map(create)
+ }
+
+ /**
+ * Accumulators for tracking internal metrics.
*
* These accumulators are created with the stage such that all tasks in the stage will
* add to the same set of accumulators. We do this to report the distribution of accumulator
* values across all tasks within each stage.
*/
- def create(sc: SparkContext): Seq[Accumulator[Long]] = {
- val internalAccumulators = Seq(
- // Execution memory refers to the memory used by internal data structures created
- // during shuffles, aggregations and joins. The value of this accumulator should be
- // approximately the sum of the peak sizes across all such data structures created
- // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
- new Accumulator(
- 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
- ) ++ maybeTestAccumulator.toSeq
- internalAccumulators.foreach { accumulator =>
- sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+ def create(sc: SparkContext): Seq[Accumulator[_]] = {
+ val accums = create()
+ accums.foreach { accum =>
+ Accumulators.register(accum)
+ sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
}
- internalAccumulators
+ accums
+ }
+
+ /**
+ * Create a new accumulator representing an internal task metric.
+ */
+ private def newMetric[T](
+ initialValue: T,
+ name: String,
+ param: AccumulatorParam[T]): Accumulator[T] = {
+ new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 7704abc134..9f49cf1c4c 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -64,7 +64,7 @@ object TaskContext {
* An empty task context that does not represent an actual task.
*/
private[spark] def empty(): TaskContextImpl = {
- new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty)
+ new TaskContextImpl(0, 0, 0, 0, null, null)
}
}
@@ -138,7 +138,6 @@ abstract class TaskContext extends Serializable {
*/
def taskAttemptId(): Long
- /** ::DeveloperApi:: */
@DeveloperApi
def taskMetrics(): TaskMetrics
@@ -161,20 +160,4 @@ abstract class TaskContext extends Serializable {
*/
private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit
- /**
- * Return the local values of internal accumulators that belong to this task. The key of the Map
- * is the accumulator id and the value of the Map is the latest accumulator local value.
- */
- private[spark] def collectInternalAccumulators(): Map[Long, Any]
-
- /**
- * Return the local values of accumulators that belong to this task. The key of the Map is the
- * accumulator id and the value of the Map is the latest accumulator local value.
- */
- private[spark] def collectAccumulators(): Map[Long, Any]
-
- /**
- * Accumulators for tracking internal metrics indexed by the name.
- */
- private[spark] val internalMetricsToAccumulators: Map[String, Accumulator[Long]]
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 94ff884b74..27ca46f73d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
@@ -32,11 +32,15 @@ private[spark] class TaskContextImpl(
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
- internalAccumulators: Seq[Accumulator[Long]],
- val taskMetrics: TaskMetrics = TaskMetrics.empty)
+ initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create())
extends TaskContext
with Logging {
+ /**
+ * Metrics associated with this task.
+ */
+ override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators)
+
// List of callback functions to execute when the task completes.
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
@@ -91,24 +95,8 @@ private[spark] class TaskContextImpl(
override def getMetricsSources(sourceName: String): Seq[Source] =
metricsSystem.getSourcesByName(sourceName)
- @transient private val accumulators = new HashMap[Long, Accumulable[_, _]]
-
- private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = synchronized {
- accumulators(a.id) = a
- }
-
- private[spark] override def collectInternalAccumulators(): Map[Long, Any] = synchronized {
- accumulators.filter(_._2.isInternal).mapValues(_.localValue).toMap
+ private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = {
+ taskMetrics.registerAccumulator(a)
}
- private[spark] override def collectAccumulators(): Map[Long, Any] = synchronized {
- accumulators.mapValues(_.localValue).toMap
- }
-
- private[spark] override val internalMetricsToAccumulators: Map[String, Accumulator[Long]] = {
- // Explicitly register internal accumulators here because these are
- // not captured in the task closure and are already deserialized
- internalAccumulators.foreach(registerAccumulator)
- internalAccumulators.map { a => (a.name.get, a) }.toMap
- }
}
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 13241b77bf..68340cc704 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -19,8 +19,11 @@ package org.apache.spark
import java.io.{ObjectInputStream, ObjectOutputStream}
+import scala.util.Try
+
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils
@@ -115,22 +118,34 @@ case class ExceptionFailure(
description: String,
stackTrace: Array[StackTraceElement],
fullStackTrace: String,
- metrics: Option[TaskMetrics],
- private val exceptionWrapper: Option[ThrowableSerializationWrapper])
+ exceptionWrapper: Option[ThrowableSerializationWrapper],
+ accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo])
extends TaskFailedReason {
+ @deprecated("use accumUpdates instead", "2.0.0")
+ val metrics: Option[TaskMetrics] = {
+ if (accumUpdates.nonEmpty) {
+ Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption
+ } else {
+ None
+ }
+ }
+
/**
* `preserveCause` is used to keep the exception itself so it is available to the
* driver. This may be set to `false` in the event that the exception is not in fact
* serializable.
*/
- private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], preserveCause: Boolean) {
- this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics,
- if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None)
+ private[spark] def this(
+ e: Throwable,
+ accumUpdates: Seq[AccumulableInfo],
+ preserveCause: Boolean) {
+ this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e),
+ if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None, accumUpdates)
}
- private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
- this(e, metrics, preserveCause = true)
+ private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) {
+ this(e, accumUpdates, preserveCause = true)
}
def exception: Option[Throwable] = exceptionWrapper.flatMap {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 8ba3f5e241..06b5101b1f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -370,6 +370,14 @@ object SparkHadoopUtil {
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
+ /**
+ * Number of records to update input metrics when reading from HadoopRDDs.
+ *
+ * Each update is potentially expensive because we need to use reflection to access the
+ * Hadoop FileSystem API of interest (only available in 2.5), so we should do this sparingly.
+ */
+ private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000
+
def get: SparkHadoopUtil = {
// Check each time to support changing to/from YARN
val yarnMode = java.lang.Boolean.valueOf(
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 030ae41db4..51c000ea5c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -31,7 +31,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.rpc.RpcTimeout
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
+import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util._
@@ -210,7 +210,7 @@ private[spark] class Executor(
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
var threwException = true
- val (value, accumUpdates) = try {
+ val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
@@ -249,10 +249,11 @@ private[spark] class Executor(
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
- m.updateAccumulators()
}
- val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
+ // Note: accumulator updates must be collected after TaskMetrics is updated
+ val accumUpdates = task.collectAccumulatorUpdates()
+ val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
@@ -297,21 +298,25 @@ private[spark] class Executor(
// the default uncaught exception handler, which will terminate the Executor.
logError(s"Exception in $taskName (TID $taskId)", t)
- val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>
- task.metrics.map { m =>
+ // Collect latest accumulator values to report back to the driver
+ val accumulatorUpdates: Seq[AccumulableInfo] =
+ if (task != null) {
+ task.metrics.foreach { m =>
m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- m.updateAccumulators()
- m
}
+ task.collectAccumulatorUpdates(taskFailed = true)
+ } else {
+ Seq.empty[AccumulableInfo]
}
+
val serializedTaskEndReason = {
try {
- ser.serialize(new ExceptionFailure(t, metrics))
+ ser.serialize(new ExceptionFailure(t, accumulatorUpdates))
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
- ser.serialize(new ExceptionFailure(t, metrics, false))
+ ser.serialize(new ExceptionFailure(t, accumulatorUpdates, preserveCause = false))
}
}
execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
@@ -418,33 +423,21 @@ private[spark] class Executor(
/** Reports heartbeat and metrics for active tasks to the driver. */
private def reportHeartBeat(): Unit = {
- // list of (task id, metrics) to send back to the driver
- val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()
+ // list of (task id, accumUpdates) to send back to the driver
+ val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulableInfo])]()
val curGCTime = computeTotalGcTime()
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.foreach { metrics =>
metrics.mergeShuffleReadMetrics()
- metrics.updateInputMetrics()
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
- metrics.updateAccumulators()
-
- if (isLocal) {
- // JobProgressListener will hold an reference of it during
- // onExecutorMetricsUpdate(), then JobProgressListener can not see
- // the changes of metrics any more, so make a deep copy of it
- val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
- tasksMetrics += ((taskRunner.taskId, copiedMetrics))
- } else {
- // It will be copied by serialization
- tasksMetrics += ((taskRunner.taskId, metrics))
- }
+ accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates()))
}
}
}
- val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
+ val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
try {
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 8f1d7f89a4..ed9e157ce7 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -17,13 +17,15 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
- * Method by which input data was read. Network means that the data was read over the network
+ * Method by which input data was read. Network means that the data was read over the network
* from a remote block manager (which may have stored the data on-disk or in-memory).
+ * Operations are not thread-safe.
*/
@DeveloperApi
object DataReadMethod extends Enumeration with Serializable {
@@ -34,44 +36,75 @@ object DataReadMethod extends Enumeration with Serializable {
/**
* :: DeveloperApi ::
- * Metrics about reading input data.
+ * A collection of accumulators that represents metrics about reading data from external systems.
*/
@DeveloperApi
-case class InputMetrics(readMethod: DataReadMethod.Value) {
+class InputMetrics private (
+ _bytesRead: Accumulator[Long],
+ _recordsRead: Accumulator[Long],
+ _readMethod: Accumulator[String])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ),
+ TaskMetrics.getAccum[String](accumMap, InternalAccumulator.input.READ_METHOD))
+ }
/**
- * This is volatile so that it is visible to the updater thread.
+ * Create a new [[InputMetrics]] that is not associated with any particular task.
+ *
+ * This mainly exists because of SPARK-5225, where we are forced to use a dummy [[InputMetrics]]
+ * because we want to ignore metrics from a second read method. In the future, we should revisit
+ * whether this is needed.
+ *
+ * A better alternative is [[TaskMetrics.registerInputMetrics]].
*/
- @volatile @transient var bytesReadCallback: Option[() => Long] = None
+ private[executor] def this() {
+ this(InternalAccumulator.createInputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
+ }
/**
- * Total bytes read.
+ * Total number of bytes read.
*/
- private var _bytesRead: Long = _
- def bytesRead: Long = _bytesRead
- def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
+ def bytesRead: Long = _bytesRead.localValue
/**
- * Total records read.
+ * Total number of records read.
*/
- private var _recordsRead: Long = _
- def recordsRead: Long = _recordsRead
- def incRecordsRead(records: Long): Unit = _recordsRead += records
+ def recordsRead: Long = _recordsRead.localValue
/**
- * Invoke the bytesReadCallback and mutate bytesRead.
+ * The source from which this task reads its input.
*/
- def updateBytesRead() {
- bytesReadCallback.foreach { c =>
- _bytesRead = c()
- }
+ def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
+
+ @deprecated("incrementing input metrics is for internal use only", "2.0.0")
+ def incBytesRead(v: Long): Unit = _bytesRead.add(v)
+ @deprecated("incrementing input metrics is for internal use only", "2.0.0")
+ def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
+ private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
+ private[spark] def setReadMethod(v: DataReadMethod.Value): Unit =
+ _readMethod.setValue(v.toString)
+
+}
+
+/**
+ * Deprecated methods to preserve case class matching behavior before Spark 2.0.
+ */
+object InputMetrics {
+
+ @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
+ def apply(readMethod: DataReadMethod.Value): InputMetrics = {
+ val im = new InputMetrics
+ im.setReadMethod(readMethod)
+ im
}
- /**
- * Register a function that can be called to get up-to-date information on how many bytes the task
- * has read from an input source.
- */
- def setBytesReadCallback(f: Option[() => Long]) {
- bytesReadCallback = f
+ @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
+ def unapply(input: InputMetrics): Option[DataReadMethod.Value] = {
+ Some(input.readMethod)
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index ad132d004c..0b37d559c7 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -17,12 +17,14 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Method by which output data was written.
+ * Operations are not thread-safe.
*/
@DeveloperApi
object DataWriteMethod extends Enumeration with Serializable {
@@ -33,21 +35,70 @@ object DataWriteMethod extends Enumeration with Serializable {
/**
* :: DeveloperApi ::
- * Metrics about writing output data.
+ * A collection of accumulators that represents metrics about writing data to external systems.
*/
@DeveloperApi
-case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
+class OutputMetrics private (
+ _bytesWritten: Accumulator[Long],
+ _recordsWritten: Accumulator[Long],
+ _writeMethod: Accumulator[String])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN),
+ TaskMetrics.getAccum[String](accumMap, InternalAccumulator.output.WRITE_METHOD))
+ }
+
+ /**
+ * Create a new [[OutputMetrics]] that is not associated with any particular task.
+ *
+ * This is only used for preserving matching behavior on [[OutputMetrics]], which used to be
+ * a case class before Spark 2.0. Once we remove support for matching on [[OutputMetrics]]
+ * we can remove this constructor as well.
+ */
+ private[executor] def this() {
+ this(InternalAccumulator.createOutputAccums()
+ .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
+ }
+
+ /**
+ * Total number of bytes written.
+ */
+ def bytesWritten: Long = _bytesWritten.localValue
+
/**
- * Total bytes written
+ * Total number of records written.
*/
- private var _bytesWritten: Long = _
- def bytesWritten: Long = _bytesWritten
- private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
+ def recordsWritten: Long = _recordsWritten.localValue
/**
- * Total records written
+ * The source to which this task writes its output.
*/
- private var _recordsWritten: Long = 0L
- def recordsWritten: Long = _recordsWritten
- private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
+ def writeMethod: DataWriteMethod.Value = DataWriteMethod.withName(_writeMethod.localValue)
+
+ private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v)
+ private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v)
+ private[spark] def setWriteMethod(v: DataWriteMethod.Value): Unit =
+ _writeMethod.setValue(v.toString)
+
+}
+
+/**
+ * Deprecated methods to preserve case class matching behavior before Spark 2.0.
+ */
+object OutputMetrics {
+
+ @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
+ def apply(writeMethod: DataWriteMethod.Value): OutputMetrics = {
+ val om = new OutputMetrics
+ om.setWriteMethod(writeMethod)
+ om
+ }
+
+ @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
+ def unapply(output: OutputMetrics): Option[DataWriteMethod.Value] = {
+ Some(output.writeMethod)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index e985b35ace..50bb645d97 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -17,71 +17,103 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
- * Metrics pertaining to shuffle data read in a given task.
+ * A collection of accumulators that represent metrics about reading shuffle data.
+ * Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleReadMetrics extends Serializable {
+class ShuffleReadMetrics private (
+ _remoteBlocksFetched: Accumulator[Int],
+ _localBlocksFetched: Accumulator[Int],
+ _remoteBytesRead: Accumulator[Long],
+ _localBytesRead: Accumulator[Long],
+ _fetchWaitTime: Accumulator[Long],
+ _recordsRead: Accumulator[Long])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED),
+ TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ))
+ }
+
/**
- * Number of remote blocks fetched in this shuffle by this task
+ * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task.
+ *
+ * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in
+ * many places only to merge their values together later. In the future, we should revisit
+ * whether this is needed.
+ *
+ * A better alternative is [[TaskMetrics.registerTempShuffleReadMetrics]] followed by
+ * [[TaskMetrics.mergeShuffleReadMetrics]].
*/
- private var _remoteBlocksFetched: Int = _
- def remoteBlocksFetched: Int = _remoteBlocksFetched
- private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
- private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
+ private[spark] def this() {
+ this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap)
+ }
/**
- * Number of local blocks fetched in this shuffle by this task
+ * Number of remote blocks fetched in this shuffle by this task.
*/
- private var _localBlocksFetched: Int = _
- def localBlocksFetched: Int = _localBlocksFetched
- private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
- private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
+ def remoteBlocksFetched: Int = _remoteBlocksFetched.localValue
/**
- * Time the task spent waiting for remote shuffle blocks. This only includes the time
- * blocking on shuffle input data. For instance if block B is being fetched while the task is
- * still not finished processing block A, it is not considered to be blocking on block B.
+ * Number of local blocks fetched in this shuffle by this task.
*/
- private var _fetchWaitTime: Long = _
- def fetchWaitTime: Long = _fetchWaitTime
- private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
- private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
+ def localBlocksFetched: Int = _localBlocksFetched.localValue
/**
- * Total number of remote bytes read from the shuffle by this task
+ * Total number of remote bytes read from the shuffle by this task.
*/
- private var _remoteBytesRead: Long = _
- def remoteBytesRead: Long = _remoteBytesRead
- private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
- private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
+ def remoteBytesRead: Long = _remoteBytesRead.localValue
/**
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
- private var _localBytesRead: Long = _
- def localBytesRead: Long = _localBytesRead
- private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
+ def localBytesRead: Long = _localBytesRead.localValue
/**
- * Total bytes fetched in the shuffle by this task (both remote and local).
+ * Time the task spent waiting for remote shuffle blocks. This only includes the time
+ * blocking on shuffle input data. For instance if block B is being fetched while the task is
+ * still not finished processing block A, it is not considered to be blocking on block B.
+ */
+ def fetchWaitTime: Long = _fetchWaitTime.localValue
+
+ /**
+ * Total number of records read from the shuffle by this task.
*/
- def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
+ def recordsRead: Long = _recordsRead.localValue
/**
- * Number of blocks fetched in this shuffle by this task (remote or local)
+ * Total bytes fetched in the shuffle by this task (both remote and local).
*/
- def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
+ def totalBytesRead: Long = remoteBytesRead + localBytesRead
/**
- * Total number of records read from the shuffle by this task
+ * Number of blocks fetched in this shuffle by this task (remote or local).
*/
- private var _recordsRead: Long = _
- def recordsRead: Long = _recordsRead
- private[spark] def incRecordsRead(value: Long) = _recordsRead += value
- private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
+ def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched
+
+ private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v)
+ private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v)
+ private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v)
+ private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v)
+ private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v)
+ private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
+
+ private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v)
+ private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v)
+ private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v)
+ private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v)
+ private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
+ private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
+
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
index 24795f8600..c7aaabb561 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
@@ -17,40 +17,66 @@
package org.apache.spark.executor
+import org.apache.spark.{Accumulator, InternalAccumulator}
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
- * Metrics pertaining to shuffle data written in a given task.
+ * A collection of accumulators that represent metrics about writing shuffle data.
+ * Operations are not thread-safe.
*/
@DeveloperApi
-class ShuffleWriteMetrics extends Serializable {
+class ShuffleWriteMetrics private (
+ _bytesWritten: Accumulator[Long],
+ _recordsWritten: Accumulator[Long],
+ _writeTime: Accumulator[Long])
+ extends Serializable {
+
+ private[executor] def this(accumMap: Map[String, Accumulator[_]]) {
+ this(
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN),
+ TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME))
+ }
/**
- * Number of bytes written for the shuffle by this task
+ * Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task.
+ *
+ * This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in
+ * many places only to merge their values together later. In the future, we should revisit
+ * whether this is needed.
+ *
+ * A better alternative is [[TaskMetrics.registerShuffleWriteMetrics]].
*/
- @volatile private var _bytesWritten: Long = _
- def bytesWritten: Long = _bytesWritten
- private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
- private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value
+ private[spark] def this() {
+ this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap)
+ }
/**
- * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
+ * Number of bytes written for the shuffle by this task.
*/
- @volatile private var _writeTime: Long = _
- def writeTime: Long = _writeTime
- private[spark] def incWriteTime(value: Long) = _writeTime += value
- private[spark] def decWriteTime(value: Long) = _writeTime -= value
+ def bytesWritten: Long = _bytesWritten.localValue
/**
- * Total number of records written to the shuffle by this task
+ * Total number of records written to the shuffle by this task.
*/
- @volatile private var _recordsWritten: Long = _
- def recordsWritten: Long = _recordsWritten
- private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
- private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
- private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
+ def recordsWritten: Long = _recordsWritten.localValue
+
+ /**
+ * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds.
+ */
+ def writeTime: Long = _writeTime.localValue
+
+ private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v)
+ private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v)
+ private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v)
+ private[spark] def decBytesWritten(v: Long): Unit = {
+ _bytesWritten.setValue(bytesWritten - v)
+ }
+ private[spark] def decRecordsWritten(v: Long): Unit = {
+ _recordsWritten.setValue(recordsWritten - v)
+ }
// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 32ef5a9b56..8d10bf588e 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,90 +17,161 @@
package org.apache.spark.executor
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentHashMap
-
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.DataReadMethod.DataReadMethod
+import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}
-import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
* Metrics tracked during the execution of a task.
*
- * This class is used to house metrics both for in-progress and completed tasks. In executors,
- * both the task thread and the heartbeat thread write to the TaskMetrics. The heartbeat thread
- * reads it to send in-progress metrics, and the task thread reads it to send metrics along with
- * the completed task.
+ * This class is wrapper around a collection of internal accumulators that represent metrics
+ * associated with a task. The local values of these accumulators are sent from the executor
+ * to the driver when the task completes. These values are then merged into the corresponding
+ * accumulator previously registered on the driver.
+ *
+ * The accumulator updates are also sent to the driver periodically (on executor heartbeat)
+ * and when the task failed with an exception. The [[TaskMetrics]] object itself should never
+ * be sent to the driver.
*
- * So, when adding new fields, take into consideration that the whole object can be serialized for
- * shipping off at any time to consumers of the SparkListener interface.
+ * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on.
+ * Each accumulator in this initial set must be uniquely named and marked
+ * as internal. Additional accumulators registered later need not satisfy
+ * these requirements.
*/
@DeveloperApi
-class TaskMetrics extends Serializable {
+class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable {
+
+ import InternalAccumulator._
+
+ // Needed for Java tests
+ def this() {
+ this(InternalAccumulator.create())
+ }
+
+ /**
+ * All accumulators registered with this task.
+ */
+ private val accums = new ArrayBuffer[Accumulable[_, _]]
+ accums ++= initialAccums
+
+ /**
+ * A map for quickly accessing the initial set of accumulators by name.
+ */
+ private val initialAccumsMap: Map[String, Accumulator[_]] = {
+ val map = new mutable.HashMap[String, Accumulator[_]]
+ initialAccums.foreach { a =>
+ val name = a.name.getOrElse {
+ throw new IllegalArgumentException(
+ "initial accumulators passed to TaskMetrics must be named")
+ }
+ require(a.isInternal,
+ s"initial accumulator '$name' passed to TaskMetrics must be marked as internal")
+ require(!map.contains(name),
+ s"detected duplicate accumulator name '$name' when constructing TaskMetrics")
+ map(name) = a
+ }
+ map.toMap
+ }
+
+ // Each metric is internally represented as an accumulator
+ private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME)
+ private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME)
+ private val _resultSize = getAccum(RESULT_SIZE)
+ private val _jvmGCTime = getAccum(JVM_GC_TIME)
+ private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME)
+ private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED)
+ private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED)
+ private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY)
+ private val _updatedBlockStatuses =
+ TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES)
+
/**
- * Host's name the task runs on
+ * Time taken on the executor to deserialize this task.
*/
- private var _hostname: String = _
- def hostname: String = _hostname
- private[spark] def setHostname(value: String) = _hostname = value
+ def executorDeserializeTime: Long = _executorDeserializeTime.localValue
/**
- * Time taken on the executor to deserialize this task
+ * Time the executor spends actually running the task (including fetching shuffle data).
*/
- private var _executorDeserializeTime: Long = _
- def executorDeserializeTime: Long = _executorDeserializeTime
- private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value
+ def executorRunTime: Long = _executorRunTime.localValue
+ /**
+ * The number of bytes this task transmitted back to the driver as the TaskResult.
+ */
+ def resultSize: Long = _resultSize.localValue
/**
- * Time the executor spends actually running the task (including fetching shuffle data)
+ * Amount of time the JVM spent in garbage collection while executing this task.
*/
- private var _executorRunTime: Long = _
- def executorRunTime: Long = _executorRunTime
- private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value
+ def jvmGCTime: Long = _jvmGCTime.localValue
/**
- * The number of bytes this task transmitted back to the driver as the TaskResult
+ * Amount of time spent serializing the task result.
*/
- private var _resultSize: Long = _
- def resultSize: Long = _resultSize
- private[spark] def setResultSize(value: Long) = _resultSize = value
+ def resultSerializationTime: Long = _resultSerializationTime.localValue
+ /**
+ * The number of in-memory bytes spilled by this task.
+ */
+ def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue
/**
- * Amount of time the JVM spent in garbage collection while executing this task
+ * The number of on-disk bytes spilled by this task.
*/
- private var _jvmGCTime: Long = _
- def jvmGCTime: Long = _jvmGCTime
- private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value
+ def diskBytesSpilled: Long = _diskBytesSpilled.localValue
/**
- * Amount of time spent serializing the task result
+ * Peak memory used by internal data structures created during shuffles, aggregations and
+ * joins. The value of this accumulator should be approximately the sum of the peak sizes
+ * across all such data structures created in this task. For SQL jobs, this only tracks all
+ * unsafe operators and ExternalSort.
*/
- private var _resultSerializationTime: Long = _
- def resultSerializationTime: Long = _resultSerializationTime
- private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value
+ def peakExecutionMemory: Long = _peakExecutionMemory.localValue
/**
- * The number of in-memory bytes spilled by this task
+ * Storage statuses of any blocks that have been updated as a result of this task.
*/
- private var _memoryBytesSpilled: Long = _
- def memoryBytesSpilled: Long = _memoryBytesSpilled
- private[spark] def incMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled += value
- private[spark] def decMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled -= value
+ def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue
+
+ @deprecated("use updatedBlockStatuses instead", "2.0.0")
+ def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
+ if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
+ }
+
+ // Setters and increment-ers
+ private[spark] def setExecutorDeserializeTime(v: Long): Unit =
+ _executorDeserializeTime.setValue(v)
+ private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v)
+ private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v)
+ private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
+ private[spark] def setResultSerializationTime(v: Long): Unit =
+ _resultSerializationTime.setValue(v)
+ private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
+ private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
+ private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
+ private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
+ _updatedBlockStatuses.add(v)
+ private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
+ _updatedBlockStatuses.setValue(v)
/**
- * The number of on-disk bytes spilled by this task
+ * Get a Long accumulator from the given map by name, assuming it exists.
+ * Note: this only searches the initial set of accumulators passed into the constructor.
*/
- private var _diskBytesSpilled: Long = _
- def diskBytesSpilled: Long = _diskBytesSpilled
- private[spark] def incDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled += value
- private[spark] def decDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled -= value
+ private[spark] def getAccum(name: String): Accumulator[Long] = {
+ TaskMetrics.getAccum[Long](initialAccumsMap, name)
+ }
+
+
+ /* ========================== *
+ | INPUT METRICS |
+ * ========================== */
private var _inputMetrics: Option[InputMetrics] = None
@@ -116,7 +187,8 @@ class TaskMetrics extends Serializable {
private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = {
synchronized {
val metrics = _inputMetrics.getOrElse {
- val metrics = new InputMetrics(readMethod)
+ val metrics = new InputMetrics(initialAccumsMap)
+ metrics.setReadMethod(readMethod)
_inputMetrics = Some(metrics)
metrics
}
@@ -128,18 +200,17 @@ class TaskMetrics extends Serializable {
if (metrics.readMethod == readMethod) {
metrics
} else {
- new InputMetrics(readMethod)
+ val m = new InputMetrics
+ m.setReadMethod(readMethod)
+ m
}
}
}
- /**
- * This should only be used when recreating TaskMetrics, not when updating input metrics in
- * executors
- */
- private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
- _inputMetrics = inputMetrics
- }
+
+ /* ============================ *
+ | OUTPUT METRICS |
+ * ============================ */
private var _outputMetrics: Option[OutputMetrics] = None
@@ -149,23 +220,24 @@ class TaskMetrics extends Serializable {
*/
def outputMetrics: Option[OutputMetrics] = _outputMetrics
- @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
- def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
- _outputMetrics = om
- }
-
/**
* Get or create a new [[OutputMetrics]] associated with this task.
*/
private[spark] def registerOutputMetrics(
writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized {
_outputMetrics.getOrElse {
- val metrics = new OutputMetrics(writeMethod)
+ val metrics = new OutputMetrics(initialAccumsMap)
+ metrics.setWriteMethod(writeMethod)
_outputMetrics = Some(metrics)
metrics
}
}
+
+ /* ================================== *
+ | SHUFFLE READ METRICS |
+ * ================================== */
+
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
/**
@@ -175,20 +247,12 @@ class TaskMetrics extends Serializable {
def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics
/**
- * This should only be used when recreating TaskMetrics, not when updating read metrics in
- * executors.
- */
- private[spark] def setShuffleReadMetrics(shuffleReadMetrics: Option[ShuffleReadMetrics]) {
- _shuffleReadMetrics = shuffleReadMetrics
- }
-
- /**
* Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
*
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
* issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
* each dependency and merge these metrics before reporting them to the driver.
- */
+ */
@transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]
/**
@@ -210,19 +274,21 @@ class TaskMetrics extends Serializable {
*/
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
- val merged = new ShuffleReadMetrics
- for (depMetrics <- tempShuffleReadMetrics) {
- merged.incFetchWaitTime(depMetrics.fetchWaitTime)
- merged.incLocalBlocksFetched(depMetrics.localBlocksFetched)
- merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
- merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
- merged.incLocalBytesRead(depMetrics.localBytesRead)
- merged.incRecordsRead(depMetrics.recordsRead)
- }
- _shuffleReadMetrics = Some(merged)
+ val metrics = new ShuffleReadMetrics(initialAccumsMap)
+ metrics.setRemoteBlocksFetched(tempShuffleReadMetrics.map(_.remoteBlocksFetched).sum)
+ metrics.setLocalBlocksFetched(tempShuffleReadMetrics.map(_.localBlocksFetched).sum)
+ metrics.setFetchWaitTime(tempShuffleReadMetrics.map(_.fetchWaitTime).sum)
+ metrics.setRemoteBytesRead(tempShuffleReadMetrics.map(_.remoteBytesRead).sum)
+ metrics.setLocalBytesRead(tempShuffleReadMetrics.map(_.localBytesRead).sum)
+ metrics.setRecordsRead(tempShuffleReadMetrics.map(_.recordsRead).sum)
+ _shuffleReadMetrics = Some(metrics)
}
}
+ /* =================================== *
+ | SHUFFLE WRITE METRICS |
+ * =================================== */
+
private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None
/**
@@ -230,86 +296,120 @@ class TaskMetrics extends Serializable {
*/
def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
- @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
- def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
- _shuffleWriteMetrics = swm
- }
-
/**
* Get or create a new [[ShuffleWriteMetrics]] associated with this task.
*/
private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized {
_shuffleWriteMetrics.getOrElse {
- val metrics = new ShuffleWriteMetrics
+ val metrics = new ShuffleWriteMetrics(initialAccumsMap)
_shuffleWriteMetrics = Some(metrics)
metrics
}
}
- private var _updatedBlockStatuses: Seq[(BlockId, BlockStatus)] =
- Seq.empty[(BlockId, BlockStatus)]
- /**
- * Storage statuses of any blocks that have been updated as a result of this task.
- */
- def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses
+ /* ========================== *
+ | OTHER THINGS |
+ * ========================== */
- @deprecated("setting updated blocks is for internal use only", "2.0.0")
- def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
- _updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)])
+ private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
+ accums += a
}
- private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
- _updatedBlockStatuses ++= v
+ /**
+ * Return the latest updates of accumulators in this task.
+ *
+ * The [[AccumulableInfo.update]] field is always defined and the [[AccumulableInfo.value]]
+ * field is always empty, since this represents the partial updates recorded in this task,
+ * not the aggregated value across multiple tasks.
+ */
+ def accumulatorUpdates(): Seq[AccumulableInfo] = accums.map { a =>
+ new AccumulableInfo(a.id, a.name, Some(a.localValue), None, a.isInternal, a.countFailedValues)
}
- private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = {
- _updatedBlockStatuses = v
+ // If we are reconstructing this TaskMetrics on the driver, some metrics may already be set.
+ // If so, initialize all relevant metrics classes so listeners can access them downstream.
+ {
+ var (hasShuffleRead, hasShuffleWrite, hasInput, hasOutput) = (false, false, false, false)
+ initialAccums
+ .filter { a => a.localValue != a.zero }
+ .foreach { a =>
+ a.name.get match {
+ case sr if sr.startsWith(SHUFFLE_READ_METRICS_PREFIX) => hasShuffleRead = true
+ case sw if sw.startsWith(SHUFFLE_WRITE_METRICS_PREFIX) => hasShuffleWrite = true
+ case in if in.startsWith(INPUT_METRICS_PREFIX) => hasInput = true
+ case out if out.startsWith(OUTPUT_METRICS_PREFIX) => hasOutput = true
+ case _ =>
+ }
+ }
+ if (hasShuffleRead) { _shuffleReadMetrics = Some(new ShuffleReadMetrics(initialAccumsMap)) }
+ if (hasShuffleWrite) { _shuffleWriteMetrics = Some(new ShuffleWriteMetrics(initialAccumsMap)) }
+ if (hasInput) { _inputMetrics = Some(new InputMetrics(initialAccumsMap)) }
+ if (hasOutput) { _outputMetrics = Some(new OutputMetrics(initialAccumsMap)) }
}
- @deprecated("use updatedBlockStatuses instead", "2.0.0")
- def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
- if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None
- }
+}
- private[spark] def updateInputMetrics(): Unit = synchronized {
- inputMetrics.foreach(_.updateBytesRead())
- }
+private[spark] object TaskMetrics extends Logging {
- @throws(classOf[IOException])
- private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
- in.defaultReadObject()
- // Get the hostname from cached data, since hostname is the order of number of nodes in
- // cluster, so using cached hostname will decrease the object number and alleviate the GC
- // overhead.
- _hostname = TaskMetrics.getCachedHostName(_hostname)
- }
-
- private var _accumulatorUpdates: Map[Long, Any] = Map.empty
- @transient private var _accumulatorsUpdater: () => Map[Long, Any] = null
+ def empty: TaskMetrics = new TaskMetrics
- private[spark] def updateAccumulators(): Unit = synchronized {
- _accumulatorUpdates = _accumulatorsUpdater()
+ /**
+ * Get an accumulator from the given map by name, assuming it exists.
+ */
+ def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = {
+ require(accumMap.contains(name), s"metric '$name' is missing")
+ val accum = accumMap(name)
+ try {
+ // Note: we can't do pattern matching here because types are erased by compile time
+ accum.asInstanceOf[Accumulator[T]]
+ } catch {
+ case e: ClassCastException =>
+ throw new SparkException(s"accumulator $name was of unexpected type", e)
+ }
}
/**
- * Return the latest updates of accumulators in this task.
+ * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only.
+ *
+ * Executors only send accumulator updates back to the driver, not [[TaskMetrics]]. However, we
+ * need the latter to post task end events to listeners, so we need to reconstruct the metrics
+ * on the driver.
+ *
+ * This assumes the provided updates contain the initial set of accumulators representing
+ * internal task level metrics.
*/
- def accumulatorUpdates(): Map[Long, Any] = _accumulatorUpdates
-
- private[spark] def setAccumulatorsUpdater(accumulatorsUpdater: () => Map[Long, Any]): Unit = {
- _accumulatorsUpdater = accumulatorsUpdater
+ def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
+ // Initial accumulators are passed into the TaskMetrics constructor first because these
+ // are required to be uniquely named. The rest of the accumulators from this task are
+ // registered later because they need not satisfy this requirement.
+ val (initialAccumInfos, otherAccumInfos) = accumUpdates
+ .filter { info => info.update.isDefined }
+ .partition { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
+ val initialAccums = initialAccumInfos.map { info =>
+ val accum = InternalAccumulator.create(info.name.get)
+ accum.setValueAny(info.update.get)
+ accum
+ }
+ // We don't know the types of the rest of the accumulators, so we try to find the same ones
+ // that were previously registered here on the driver and make copies of them. It is important
+ // that we copy the accumulators here since they are used across many tasks and we want to
+ // maintain a snapshot of their local task values when we post them to listeners downstream.
+ val otherAccums = otherAccumInfos.flatMap { info =>
+ val id = info.id
+ val acc = Accumulators.get(id).map { a =>
+ val newAcc = a.copy()
+ newAcc.setValueAny(info.update.get)
+ newAcc
+ }
+ if (acc.isEmpty) {
+ logWarning(s"encountered unregistered accumulator $id when reconstructing task metrics.")
+ }
+ acc
+ }
+ val metrics = new TaskMetrics(initialAccums)
+ otherAccums.foreach(metrics.registerAccumulator)
+ metrics
}
-}
-
-private[spark] object TaskMetrics {
- private val hostNameCache = new ConcurrentHashMap[String, String]()
-
- def empty: TaskMetrics = new TaskMetrics
-
- def getCachedHostName(host: String): String = {
- val canonicalHost = hostNameCache.putIfAbsent(host, host)
- if (canonicalHost != null) canonicalHost else host
- }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 3587e7eb1a..d9b0824b38 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -153,8 +153,7 @@ class CoGroupedRDD[K: ClassTag](
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+ context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index a79ab86d49..3204e6adce 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -212,6 +212,8 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
+ // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
+
val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
// Sets the thread local variable for the file's name
@@ -222,14 +224,17 @@ class HadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
- split.inputSplit.value match {
- case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
+ val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match {
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ case _ => None
+ }
+
+ def updateBytesRead(): Unit = {
+ getBytesReadCallback.foreach { getBytesRead =>
+ inputMetrics.setBytesRead(getBytesRead())
}
}
- inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
@@ -252,6 +257,9 @@ class HadoopRDD[K, V](
if (!finished) {
inputMetrics.incRecordsRead(1)
}
+ if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ updateBytesRead()
+ }
(key, value)
}
@@ -272,8 +280,8 @@ class HadoopRDD[K, V](
} finally {
reader = null
}
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
+ if (getBytesReadCallback.isDefined) {
+ updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 5cc9c81cc6..4d2816e335 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -133,14 +133,17 @@ class NewHadoopRDD[K, V](
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
- val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
- split.serializableHadoopSplit.value match {
- case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
+ val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
+ case _: FileSplit | _: CombineFileSplit =>
+ SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+ case _ => None
+ }
+
+ def updateBytesRead(): Unit = {
+ getBytesReadCallback.foreach { getBytesRead =>
+ inputMetrics.setBytesRead(getBytesRead())
}
}
- inputMetrics.setBytesReadCallback(bytesReadCallback)
val format = inputFormatClass.newInstance
format match {
@@ -182,6 +185,9 @@ class NewHadoopRDD[K, V](
if (!finished) {
inputMetrics.incRecordsRead(1)
}
+ if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
+ updateBytesRead()
+ }
(reader.getCurrentKey, reader.getCurrentValue)
}
@@ -201,8 +207,8 @@ class NewHadoopRDD[K, V](
} finally {
reader = null
}
- if (bytesReadCallback.isDefined) {
- inputMetrics.updateBytesRead()
+ if (getBytesReadCallback.isDefined) {
+ updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index 146cfb9ba8..9d45fff921 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -19,47 +19,58 @@ package org.apache.spark.scheduler
import org.apache.spark.annotation.DeveloperApi
+
/**
* :: DeveloperApi ::
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
+ *
+ * Note: once this is JSON serialized the types of `update` and `value` will be lost and be
+ * cast to strings. This is because the user can define an accumulator of any type and it will
+ * be difficult to preserve the type in consumers of the event log. This does not apply to
+ * internal accumulators that represent task level metrics.
+ *
+ * @param id accumulator ID
+ * @param name accumulator name
+ * @param update partial value from a task, may be None if used on driver to describe a stage
+ * @param value total accumulated value so far, maybe None if used on executors to describe a task
+ * @param internal whether this accumulator was internal
+ * @param countFailedValues whether to count this accumulator's partial value if the task failed
*/
@DeveloperApi
-class AccumulableInfo private[spark] (
- val id: Long,
- val name: String,
- val update: Option[String], // represents a partial update within a task
- val value: String,
- val internal: Boolean) {
-
- override def equals(other: Any): Boolean = other match {
- case acc: AccumulableInfo =>
- this.id == acc.id && this.name == acc.name &&
- this.update == acc.update && this.value == acc.value &&
- this.internal == acc.internal
- case _ => false
- }
+case class AccumulableInfo private[spark] (
+ id: Long,
+ name: Option[String],
+ update: Option[Any], // represents a partial update within a task
+ value: Option[Any],
+ private[spark] val internal: Boolean,
+ private[spark] val countFailedValues: Boolean)
- override def hashCode(): Int = {
- val state = Seq(id, name, update, value, internal)
- state.map(_.hashCode).reduceLeft(31 * _ + _)
- }
-}
+/**
+ * A collection of deprecated constructors. This will be removed soon.
+ */
object AccumulableInfo {
+
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(
id: Long,
name: String,
update: Option[String],
value: String,
internal: Boolean): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal)
+ new AccumulableInfo(
+ id, Option(name), update, Option(value), internal, countFailedValues = false)
}
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal = false)
+ new AccumulableInfo(
+ id, Option(name), update, Option(value), internal = false, countFailedValues = false)
}
+ @deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, None, value, internal = false)
+ new AccumulableInfo(
+ id, Option(name), None, Option(value), internal = false, countFailedValues = false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 6b01a10fc1..897479b500 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -208,11 +208,10 @@ class DAGScheduler(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics): Unit = {
+ accumUpdates: Seq[AccumulableInfo],
+ taskInfo: TaskInfo): Unit = {
eventProcessLoop.post(
- CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
+ CompletionEvent(task, reason, result, accumUpdates, taskInfo))
}
/**
@@ -222,9 +221,10 @@ class DAGScheduler(
*/
def executorHeartbeatReceived(
execId: String,
- taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
+ // (taskId, stageId, stageAttemptId, accumUpdates)
+ accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
- listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
+ listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
blockManagerMaster.driverEndpoint.askWithRetry[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
@@ -1074,39 +1074,43 @@ class DAGScheduler(
}
}
- /** Merge updates from a task to our local accumulator values */
+ /**
+ * Merge local values from a task into the corresponding accumulators previously registered
+ * here on the driver.
+ *
+ * Although accumulators themselves are not thread-safe, this method is called only from one
+ * thread, the one that runs the scheduling loop. This means we only handle one task
+ * completion event at a time so we don't need to worry about locking the accumulators.
+ * This still doesn't stop the caller from updating the accumulator outside the scheduler,
+ * but that's not our problem since there's nothing we can do about that.
+ */
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
- if (event.accumUpdates != null) {
- try {
- Accumulators.add(event.accumUpdates)
-
- event.accumUpdates.foreach { case (id, partialValue) =>
- // In this instance, although the reference in Accumulators.originals is a WeakRef,
- // it's guaranteed to exist since the event.accumUpdates Map exists
-
- val acc = Accumulators.originals(id).get match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
- case None => throw new NullPointerException("Non-existent reference to Accumulator")
- }
-
- // To avoid UI cruft, ignore cases where value wasn't updated
- if (acc.name.isDefined && partialValue != acc.zero) {
- val name = acc.name.get
- val value = s"${acc.value}"
- stage.latestInfo.accumulables(id) =
- new AccumulableInfo(id, name, None, value, acc.isInternal)
- event.taskInfo.accumulables +=
- new AccumulableInfo(id, name, Some(s"$partialValue"), value, acc.isInternal)
- }
+ try {
+ event.accumUpdates.foreach { ainfo =>
+ assert(ainfo.update.isDefined, "accumulator from task should have a partial value")
+ val id = ainfo.id
+ val partialValue = ainfo.update.get
+ // Find the corresponding accumulator on the driver and update it
+ val acc: Accumulable[Any, Any] = Accumulators.get(id) match {
+ case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
+ case None =>
+ throw new SparkException(s"attempted to access non-existent accumulator $id")
+ }
+ acc ++= partialValue
+ // To avoid UI cruft, ignore cases where value wasn't updated
+ if (acc.name.isDefined && partialValue != acc.zero) {
+ val name = acc.name
+ stage.latestInfo.accumulables(id) = new AccumulableInfo(
+ id, name, None, Some(acc.value), acc.isInternal, acc.countFailedValues)
+ event.taskInfo.accumulables += new AccumulableInfo(
+ id, name, Some(partialValue), Some(acc.value), acc.isInternal, acc.countFailedValues)
}
- } catch {
- // If we see an exception during accumulator update, just log the
- // error and move on.
- case e: Exception =>
- logError(s"Failed to update accumulators for $task", e)
}
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Failed to update accumulators for task ${task.partitionId}", e)
}
}
@@ -1116,6 +1120,7 @@ class DAGScheduler(
*/
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
+ val taskId = event.taskInfo.id
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
@@ -1125,12 +1130,26 @@ class DAGScheduler(
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
- // The success case is dealt with separately below, since we need to compute accumulator
- // updates before posting.
+ // Reconstruct task metrics. Note: this may be null if the task has failed.
+ val taskMetrics: TaskMetrics =
+ if (event.accumUpdates.nonEmpty) {
+ try {
+ TaskMetrics.fromAccumulatorUpdates(event.accumUpdates)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
+ null
+ }
+ } else {
+ null
+ }
+
+ // The success case is dealt with separately below.
+ // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
- listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
- event.taskInfo, event.taskMetrics))
+ listenerBus.post(SparkListenerTaskEnd(
+ stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
if (!stageIdToStage.contains(task.stageId)) {
@@ -1142,7 +1161,7 @@ class DAGScheduler(
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
- event.reason, event.taskInfo, event.taskMetrics))
+ event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
@@ -1291,7 +1310,8 @@ class DAGScheduler(
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits
case exceptionFailure: ExceptionFailure =>
- // Do nothing here, left up to the TaskScheduler to decide how to handle user failures
+ // Tasks failed with exceptions might still have accumulator updates.
+ updateAccumulators(event)
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
@@ -1637,7 +1657,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
- case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
+ case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index dda3b6cc7f..d5cd2da7a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -73,9 +73,8 @@ private[scheduler] case class CompletionEvent(
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics)
+ accumUpdates: Seq[AccumulableInfo],
+ taskInfo: TaskInfo)
extends DAGSchedulerEvent
private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 6590cf6ffd..885f70e89f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
* See [[Task]] for more information.
*
* @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
* @param taskBinary broadcasted version of the serialized RDD and the function to apply on each
* partition of the given RDD. Once deserialized, the type should be
* (RDD[T], (TaskContext, Iterator[T]) => U).
@@ -37,6 +38,9 @@ import org.apache.spark.rdd.RDD
* @param locs preferred task execution locations for locality scheduling
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
+ * @param _initialAccums initial set of accumulators to be used in this task for tracking
+ * internal metrics. Other accumulators will be registered later when
+ * they are deserialized on the executors.
*/
private[spark] class ResultTask[T, U](
stageId: Int,
@@ -45,8 +49,8 @@ private[spark] class ResultTask[T, U](
partition: Partition,
locs: Seq[TaskLocation],
val outputId: Int,
- internalAccumulators: Seq[Accumulator[Long]])
- extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
+ _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create())
+ extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums)
with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ea97ef0e74..89207dd175 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -33,10 +33,14 @@ import org.apache.spark.shuffle.ShuffleWriter
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
* @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized,
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
+ * @param _initialAccums initial set of accumulators to be used in this task for tracking
+ * internal metrics. Other accumulators will be registered later when
+ * they are deserialized on the executors.
*/
private[spark] class ShuffleMapTask(
stageId: Int,
@@ -44,8 +48,8 @@ private[spark] class ShuffleMapTask(
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation],
- internalAccumulators: Seq[Accumulator[Long]])
- extends Task[MapStatus](stageId, stageAttemptId, partition.index, internalAccumulators)
+ _initialAccums: Seq[Accumulator[_]])
+ extends Task[MapStatus](stageId, stageAttemptId, partition.index, _initialAccums)
with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 6c6883d703..ed3adbd81c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -18,6 +18,7 @@
package org.apache.spark.scheduler
import java.util.Properties
+import javax.annotation.Nullable
import scala.collection.Map
import scala.collection.mutable
@@ -60,7 +61,7 @@ case class SparkListenerTaskEnd(
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
- taskMetrics: TaskMetrics)
+ @Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent
@DeveloperApi
@@ -111,12 +112,12 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
/**
* Periodic updates from executors.
* @param execId executor id
- * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
+ * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates)
*/
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
- taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
+ accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
extends SparkListenerEvent
@DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 7ea24a217b..c1c8b47128 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -74,10 +74,10 @@ private[scheduler] abstract class Stage(
val name: String = callSite.shortForm
val details: String = callSite.longForm
- private var _internalAccumulators: Seq[Accumulator[Long]] = Seq.empty
+ private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty
/** Internal accumulators shared across all tasks in this stage. */
- def internalAccumulators: Seq[Accumulator[Long]] = _internalAccumulators
+ def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators
/**
* Re-initialize the internal accumulators associated with this stage.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index fca57928ec..a49f3716e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
+import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
@@ -41,32 +41,29 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
* and divides the task output to multiple buckets (based on the task's partitioner).
*
* @param stageId id of the stage this task belongs to
+ * @param stageAttemptId attempt id of the stage this task belongs to
* @param partitionId index of the number in the RDD
+ * @param initialAccumulators initial set of accumulators to be used in this task for tracking
+ * internal metrics. Other accumulators will be registered later when
+ * they are deserialized on the executors.
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
- internalAccumulators: Seq[Accumulator[Long]]) extends Serializable {
+ val initialAccumulators: Seq[Accumulator[_]]) extends Serializable {
/**
- * The key of the Map is the accumulator id and the value of the Map is the latest accumulator
- * local value.
- */
- type AccumulatorUpdates = Map[Long, Any]
-
- /**
- * Called by [[Executor]] to run this task.
+ * Called by [[org.apache.spark.executor.Executor]] to run this task.
*
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
* @return the result of the task along with updates of Accumulators.
*/
final def run(
- taskAttemptId: Long,
- attemptNumber: Int,
- metricsSystem: MetricsSystem)
- : (T, AccumulatorUpdates) = {
+ taskAttemptId: Long,
+ attemptNumber: Int,
+ metricsSystem: MetricsSystem): T = {
context = new TaskContextImpl(
stageId,
partitionId,
@@ -74,16 +71,14 @@ private[spark] abstract class Task[T](
attemptNumber,
taskMemoryManager,
metricsSystem,
- internalAccumulators)
+ initialAccumulators)
TaskContext.setTaskContext(context)
- context.taskMetrics.setHostname(Utils.localHostName())
- context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
- (runTask(context), context.collectAccumulators())
+ runTask(context)
} finally {
context.markTaskCompleted()
try {
@@ -141,6 +136,18 @@ private[spark] abstract class Task[T](
def executorDeserializeTime: Long = _executorDeserializeTime
/**
+ * Collect the latest values of accumulators used in this task. If the task failed,
+ * filter out the accumulators whose values should not be included on failures.
+ */
+ def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo] = {
+ if (context != null) {
+ context.taskMetrics.accumulatorUpdates().filter { a => !taskFailed || a.countFailedValues }
+ } else {
+ Seq.empty[AccumulableInfo]
+ }
+ }
+
+ /**
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
* be called multiple times.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index b82c7f3fa5..03135e63d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -20,11 +20,9 @@ package org.apache.spark.scheduler
import java.io._
import java.nio.ByteBuffer
-import scala.collection.Map
-import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkEnv
-import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockId
import org.apache.spark.util.Utils
@@ -36,31 +34,24 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int)
extends TaskResult[T] with Serializable
/** A TaskResult that contains the task's return value and accumulator updates. */
-private[spark]
-class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any],
- var metrics: TaskMetrics)
+private[spark] class DirectTaskResult[T](
+ var valueBytes: ByteBuffer,
+ var accumUpdates: Seq[AccumulableInfo])
extends TaskResult[T] with Externalizable {
private var valueObjectDeserialized = false
private var valueObject: T = _
- def this() = this(null.asInstanceOf[ByteBuffer], null, null)
+ def this() = this(null.asInstanceOf[ByteBuffer], null)
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
-
- out.writeInt(valueBytes.remaining);
+ out.writeInt(valueBytes.remaining)
Utils.writeByteBuffer(valueBytes, out)
-
out.writeInt(accumUpdates.size)
- for ((key, value) <- accumUpdates) {
- out.writeLong(key)
- out.writeObject(value)
- }
- out.writeObject(metrics)
+ accumUpdates.foreach(out.writeObject)
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
-
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
in.readFully(byteVal)
@@ -70,13 +61,12 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
if (numUpdates == 0) {
accumUpdates = null
} else {
- val _accumUpdates = mutable.Map[Long, Any]()
+ val _accumUpdates = new ArrayBuffer[AccumulableInfo]
for (i <- 0 until numUpdates) {
- _accumUpdates(in.readLong()) = in.readObject()
+ _accumUpdates += in.readObject.asInstanceOf[AccumulableInfo]
}
accumUpdates = _accumUpdates
}
- metrics = in.readObject().asInstanceOf[TaskMetrics]
valueObjectDeserialized = false
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index f4965994d8..c94c4f55e9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler
import java.nio.ByteBuffer
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{ExecutorService, RejectedExecutionException}
import scala.language.existentials
import scala.util.control.NonFatal
@@ -35,9 +35,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
extends Logging {
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
- private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
- THREADS, "task-result-getter")
+ // Exposed for testing.
+ protected val getTaskResultExecutor: ExecutorService =
+ ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter")
+
+ // Exposed for testing.
protected val serializer = new ThreadLocal[SerializerInstance] {
override def initialValue(): SerializerInstance = {
sparkEnv.closureSerializer.newInstance()
@@ -45,7 +48,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
}
def enqueueSuccessfulTask(
- taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ serializedData: ByteBuffer): Unit = {
getTaskResultExecutor.execute(new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions {
try {
@@ -82,7 +87,19 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
(deserializedResult, size)
}
- result.metrics.setResultSize(size)
+ // Set the task result size in the accumulator updates received from the executors.
+ // We need to do this here on the driver because if we did this on the executors then
+ // we would have to serialize the result again after updating the size.
+ result.accumUpdates = result.accumUpdates.map { a =>
+ if (a.name == Some(InternalAccumulator.RESULT_SIZE)) {
+ assert(a.update == Some(0L),
+ "task result size should not have been set on the executors")
+ a.copy(update = Some(size.toLong))
+ } else {
+ a
+ }
+ }
+
scheduler.handleSuccessfulTask(taskSetManager, tid, result)
} catch {
case cnf: ClassNotFoundException =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 7c0b007db7..fccd6e0699 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -65,8 +65,10 @@ private[spark] trait TaskScheduler {
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
*/
- def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
- blockManagerId: BlockManagerId): Boolean
+ def executorHeartbeatReceived(
+ execId: String,
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ blockManagerId: BlockManagerId): Boolean
/**
* Get an application ID associated with the job.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6e3ef0e54f..29341dfe30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -30,7 +30,6 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
@@ -380,17 +379,17 @@ private[spark] class TaskSchedulerImpl(
*/
override def executorHeartbeatReceived(
execId: String,
- taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
+ accumUpdates: Array[(Long, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
-
- val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
- taskMetrics.flatMap { case (id, metrics) =>
+ // (taskId, stageId, stageAttemptId, accumUpdates)
+ val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized {
+ accumUpdates.flatMap { case (id, updates) =>
taskIdToTaskSetManager.get(id).map { taskSetMgr =>
- (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics)
+ (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, updates)
}
}
}
- dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
+ dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)
}
def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index aa39b59d8c..cf97877476 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -621,8 +621,7 @@ private[spark] class TaskSetManager(
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
- sched.dagScheduler.taskEnded(
- tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
+ sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
@@ -653,8 +652,7 @@ private[spark] class TaskSetManager(
info.markFailed()
val index = info.index
copiesRunning(index) -= 1
- var taskMetrics : TaskMetrics = null
-
+ var accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
reason.asInstanceOf[TaskFailedReason].toErrorString
val failureException: Option[Throwable] = reason match {
@@ -669,7 +667,8 @@ private[spark] class TaskSetManager(
None
case ef: ExceptionFailure =>
- taskMetrics = ef.metrics.orNull
+ // ExceptionFailure's might have accumulator updates
+ accumUpdates = ef.accumUpdates
if (ef.className == classOf[NotSerializableException].getName) {
// If the task result wasn't serializable, there's no point in trying to re-execute it.
logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying"
@@ -721,7 +720,7 @@ private[spark] class TaskSetManager(
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTimeMillis())
- sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
+ sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED
&& reason.isInstanceOf[TaskFailedReason]
@@ -793,7 +792,8 @@ private[spark] class TaskSetManager(
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
- sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null)
+ sched.dagScheduler.taskEnded(
+ tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index a57e5b0bfb..acbe16001f 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -103,8 +103,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
+ context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 078718ba11..9c92a50150 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -237,7 +237,8 @@ private[v1] object AllStagesResource {
}
def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = {
- new AccumulableInfo(acc.id, acc.name, acc.update, acc.value)
+ new AccumulableInfo(
+ acc.id, acc.name.orNull, acc.update.map(_.toString), acc.value.map(_.toString).orNull)
}
def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 4a9f8b3052..b2aa8bfbe7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -325,12 +325,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
+ val metrics = new TaskMetrics
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
- stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
+ stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo, Some(metrics)))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
@@ -387,9 +388,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
(Some(e.toErrorString), None)
}
- if (!metrics.isEmpty) {
+ metrics.foreach { m =>
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
- updateAggregateMetrics(stageData, info.executorId, metrics.get, oldMetrics)
+ updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
@@ -489,19 +490,18 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
- for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
+ for ((taskId, sid, sAttempt, accumUpdates) <- executorMetricsUpdate.accumUpdates) {
val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
logWarning("Metrics update for task in unknown stage " + sid)
new StageUIData
})
val taskData = stageData.taskData.get(taskId)
- taskData.map { t =>
+ val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates)
+ taskData.foreach { t =>
if (!t.taskInfo.finished) {
- updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics,
- t.taskMetrics)
-
+ updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.taskMetrics)
// Overwrite task metrics
- t.taskMetrics = Some(taskMetrics)
+ t.taskMetrics = Some(metrics)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 914f6183cc..29c5ff0b5c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -271,8 +271,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}
val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
- def accumulableRow(acc: AccumulableInfo): Elem =
- <tr><td>{acc.name}</td><td>{acc.value}</td></tr>
+ def accumulableRow(acc: AccumulableInfo): Seq[Node] = {
+ (acc.name, acc.value) match {
+ case (Some(name), Some(value)) => <tr><td>{name}</td><td>{value}</td></tr>
+ case _ => Seq.empty[Node]
+ }
+ }
val accumulableTable = UIUtils.listingTable(
accumulableHeaders,
accumulableRow,
@@ -404,13 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)
- val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
- info.accumulables
- .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
- .map { acc => acc.update.getOrElse("0").toLong }
- .getOrElse(0L)
- .toDouble
- }
+ val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.peakExecutionMemory.toDouble
+ }
val peakExecutionMemoryQuantiles = {
<td>
<span data-toggle="tooltip"
@@ -891,15 +891,15 @@ private[ui] class TaskDataSource(
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = getGettingResultTime(info, currentTime)
- val (taskInternalAccumulables, taskExternalAccumulables) =
- info.accumulables.partition(_.internal)
- val externalAccumulableReadable = taskExternalAccumulables.map { acc =>
- StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
- }
- val peakExecutionMemoryUsed = taskInternalAccumulables
- .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
- .map { acc => acc.update.getOrElse("0").toLong }
- .getOrElse(0L)
+ val externalAccumulableReadable = info.accumulables
+ .filterNot(_.internal)
+ .flatMap { a =>
+ (a.name, a.update) match {
+ case (Some(name), Some(update)) => Some(StringEscapeUtils.escapeHtml4(s"$name: $update"))
+ case _ => None
+ }
+ }
+ val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L)
val maybeInput = metrics.flatMap(_.inputMetrics)
val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index efa22b9993..dc8070cf8a 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -233,14 +233,14 @@ private[spark] object JsonProtocol {
def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
val execId = metricsUpdate.execId
- val taskMetrics = metricsUpdate.taskMetrics
+ val accumUpdates = metricsUpdate.accumUpdates
("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
("Executor ID" -> execId) ~
- ("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) =>
+ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
("Task ID" -> taskId) ~
("Stage ID" -> stageId) ~
("Stage Attempt ID" -> stageAttemptId) ~
- ("Task Metrics" -> taskMetricsToJson(metrics))
+ ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList))
})
}
@@ -265,7 +265,7 @@ private[spark] object JsonProtocol {
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
("Accumulables" -> JArray(
- stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
+ stageInfo.accumulables.values.map(accumulableInfoToJson).toList))
}
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@@ -284,11 +284,44 @@ private[spark] object JsonProtocol {
}
def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
+ val name = accumulableInfo.name
("ID" -> accumulableInfo.id) ~
- ("Name" -> accumulableInfo.name) ~
- ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~
- ("Value" -> accumulableInfo.value) ~
- ("Internal" -> accumulableInfo.internal)
+ ("Name" -> name) ~
+ ("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~
+ ("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~
+ ("Internal" -> accumulableInfo.internal) ~
+ ("Count Failed Values" -> accumulableInfo.countFailedValues)
+ }
+
+ /**
+ * Serialize the value of an accumulator to JSON.
+ *
+ * For accumulators representing internal task metrics, this looks up the relevant
+ * [[AccumulatorParam]] to serialize the value accordingly. For all other accumulators,
+ * this will simply serialize the value as a string.
+ *
+ * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing.
+ */
+ private[util] def accumValueToJson(name: Option[String], value: Any): JValue = {
+ import AccumulatorParam._
+ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
+ (value, InternalAccumulator.getParam(name.get)) match {
+ case (v: Int, IntAccumulatorParam) => JInt(v)
+ case (v: Long, LongAccumulatorParam) => JInt(v)
+ case (v: String, StringAccumulatorParam) => JString(v)
+ case (v, UpdatedBlockStatusesAccumulatorParam) =>
+ JArray(v.asInstanceOf[Seq[(BlockId, BlockStatus)]].toList.map { case (id, status) =>
+ ("Block ID" -> id.toString) ~
+ ("Status" -> blockStatusToJson(status))
+ })
+ case (v, p) =>
+ throw new IllegalArgumentException(s"unexpected combination of accumulator value " +
+ s"type (${v.getClass.getName}) and param (${p.getClass.getName}) in '${name.get}'")
+ }
+ } else {
+ // For all external accumulators, just use strings
+ JString(value.toString)
+ }
}
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -303,9 +336,9 @@ private[spark] object JsonProtocol {
}.getOrElse(JNothing)
val shuffleWriteMetrics: JValue =
taskMetrics.shuffleWriteMetrics.map { wm =>
- ("Shuffle Bytes Written" -> wm.shuffleBytesWritten) ~
- ("Shuffle Write Time" -> wm.shuffleWriteTime) ~
- ("Shuffle Records Written" -> wm.shuffleRecordsWritten)
+ ("Shuffle Bytes Written" -> wm.bytesWritten) ~
+ ("Shuffle Write Time" -> wm.writeTime) ~
+ ("Shuffle Records Written" -> wm.recordsWritten)
}.getOrElse(JNothing)
val inputMetrics: JValue =
taskMetrics.inputMetrics.map { im =>
@@ -324,7 +357,6 @@ private[spark] object JsonProtocol {
("Block ID" -> id.toString) ~
("Status" -> blockStatusToJson(status))
})
- ("Host Name" -> taskMetrics.hostname) ~
("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
("Executor Run Time" -> taskMetrics.executorRunTime) ~
("Result Size" -> taskMetrics.resultSize) ~
@@ -352,12 +384,12 @@ private[spark] object JsonProtocol {
("Message" -> fetchFailed.message)
case exceptionFailure: ExceptionFailure =>
val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
- val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing)
+ val accumUpdates = JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList)
("Class Name" -> exceptionFailure.className) ~
("Description" -> exceptionFailure.description) ~
("Stack Trace" -> stackTrace) ~
("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
- ("Metrics" -> metrics)
+ ("Accumulator Updates" -> accumUpdates)
case taskCommitDenied: TaskCommitDenied =>
("Job ID" -> taskCommitDenied.jobID) ~
("Partition ID" -> taskCommitDenied.partitionID) ~
@@ -619,14 +651,15 @@ private[spark] object JsonProtocol {
def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
val execInfo = (json \ "Executor ID").extract[String]
- val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
+ val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
val taskId = (json \ "Task ID").extract[Long]
val stageId = (json \ "Stage ID").extract[Int]
val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
- val metrics = taskMetricsFromJson(json \ "Task Metrics")
- (taskId, stageId, stageAttemptId, metrics)
+ val updates =
+ (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
+ (taskId, stageId, stageAttemptId, updates)
}
- SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics)
+ SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
}
/** --------------------------------------------------------------------- *
@@ -647,7 +680,7 @@ private[spark] object JsonProtocol {
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match {
- case Some(values) => values.map(accumulableInfoFromJson(_))
+ case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
}
@@ -675,7 +708,7 @@ private[spark] object JsonProtocol {
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match {
- case Some(values) => values.map(accumulableInfoFromJson(_))
+ case Some(values) => values.map(accumulableInfoFromJson)
case None => Seq[AccumulableInfo]()
}
@@ -690,11 +723,43 @@ private[spark] object JsonProtocol {
def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
val id = (json \ "ID").extract[Long]
- val name = (json \ "Name").extract[String]
- val update = Utils.jsonOption(json \ "Update").map(_.extract[String])
- val value = (json \ "Value").extract[String]
+ val name = (json \ "Name").extractOpt[String]
+ val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
+ val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false)
- AccumulableInfo(id, name, update, value, internal)
+ val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false)
+ new AccumulableInfo(id, name, update, value, internal, countFailedValues)
+ }
+
+ /**
+ * Deserialize the value of an accumulator from JSON.
+ *
+ * For accumulators representing internal task metrics, this looks up the relevant
+ * [[AccumulatorParam]] to deserialize the value accordingly. For all other
+ * accumulators, this will simply deserialize the value as a string.
+ *
+ * The behavior here must match that of [[accumValueToJson]]. Exposed for testing.
+ */
+ private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = {
+ import AccumulatorParam._
+ if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
+ (value, InternalAccumulator.getParam(name.get)) match {
+ case (JInt(v), IntAccumulatorParam) => v.toInt
+ case (JInt(v), LongAccumulatorParam) => v.toLong
+ case (JString(v), StringAccumulatorParam) => v
+ case (JArray(v), UpdatedBlockStatusesAccumulatorParam) =>
+ v.map { blockJson =>
+ val id = BlockId((blockJson \ "Block ID").extract[String])
+ val status = blockStatusFromJson(blockJson \ "Status")
+ (id, status)
+ }
+ case (v, p) =>
+ throw new IllegalArgumentException(s"unexpected combination of accumulator " +
+ s"value in JSON ($v) and accumulator param (${p.getClass.getName}) in '${name.get}'")
+ }
+ } else {
+ value.extract[String]
+ }
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
@@ -702,7 +767,6 @@ private[spark] object JsonProtocol {
return TaskMetrics.empty
}
val metrics = new TaskMetrics
- metrics.setHostname((json \ "Host Name").extract[String])
metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
metrics.setResultSize((json \ "Result Size").extract[Long])
@@ -787,10 +851,12 @@ private[spark] object JsonProtocol {
val className = (json \ "Class Name").extract[String]
val description = (json \ "Description").extract[String]
val stackTrace = stackTraceFromJson(json \ "Stack Trace")
- val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace").
- map(_.extract[String]).orNull
- val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson)
- ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None)
+ val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull
+ // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
+ val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates")
+ .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
+ .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulatorUpdates())
+ ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
case `taskResultLost` => TaskResultLost
case `taskKilled` => TaskKilled
case `taskCommitDenied` =>
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index df9e0502e7..5afd6d6e22 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -682,8 +682,7 @@ private[spark] class ExternalSorter[K, V, C](
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
- context.internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemoryUsedBytes)
+ context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}