diff options
Diffstat (limited to 'core/src/main')
38 files changed, 1184 insertions, 592 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index d3d79a27ea..128a82579b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -444,13 +444,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { @Override public Option<MapStatus> stop(boolean success) { try { - // Update task metrics from accumulators (null in UnsafeShuffleWriterSuite) - Map<String, Accumulator<Object>> internalAccumulators = - taskContext.internalMetricsToAccumulators(); - if (internalAccumulators != null) { - internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY()) - .add(getPeakMemoryUsedBytes()); - } + taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes()); if (stopping) { return Option.apply(null); diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index a456d420b8..bde136141f 100644 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -35,40 +35,67 @@ import org.apache.spark.util.Utils * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. * + * All accumulators created on the driver to be used on the executors must be registered with + * [[Accumulators]]. This is already done automatically for accumulators created by the user. + * Internal accumulators must be explicitly registered by the caller. + * + * Operations are not thread-safe. + * + * @param id ID of this accumulator; for internal use only. * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `R` and `T` * @param name human-readable name for use in Spark's web UI * @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported * to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be * thread safe so that they can be reported correctly. + * @param countFailedValues whether to accumulate values from failed tasks. This is set to true + * for system and time metrics like serialization time or bytes spilled, + * and false for things with absolute values like number of input rows. + * This should be used for internal metrics only. * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ -class Accumulable[R, T] private[spark] ( - initialValue: R, +class Accumulable[R, T] private ( + val id: Long, + @transient initialValue: R, param: AccumulableParam[R, T], val name: Option[String], - internal: Boolean) + internal: Boolean, + private[spark] val countFailedValues: Boolean) extends Serializable { private[spark] def this( - @transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = { - this(initialValue, param, None, internal) + initialValue: R, + param: AccumulableParam[R, T], + name: Option[String], + internal: Boolean, + countFailedValues: Boolean) = { + this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues) } - def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = - this(initialValue, param, name, false) + private[spark] def this( + initialValue: R, + param: AccumulableParam[R, T], + name: Option[String], + internal: Boolean) = { + this(initialValue, param, name, internal, false /* countFailedValues */) + } - def this(@transient initialValue: R, param: AccumulableParam[R, T]) = - this(initialValue, param, None) + def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = + this(initialValue, param, name, false /* internal */) - val id: Long = Accumulators.newId + def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) - @volatile @transient private var value_ : R = initialValue // Current value on master - val zero = param.zero(initialValue) // Zero value to be passed to workers + @volatile @transient private var value_ : R = initialValue // Current value on driver + val zero = param.zero(initialValue) // Zero value to be passed to executors private var deserialized = false - Accumulators.register(this) + // In many places we create internal accumulators without access to the active context cleaner, + // so if we register them here then we may never unregister these accumulators. To avoid memory + // leaks, we require the caller to explicitly register internal accumulators elsewhere. + if (!internal) { + Accumulators.register(this) + } /** * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver @@ -78,6 +105,17 @@ class Accumulable[R, T] private[spark] ( private[spark] def isInternal: Boolean = internal /** + * Return a copy of this [[Accumulable]]. + * + * The copy will have the same ID as the original and will not be registered with + * [[Accumulators]] again. This method exists so that the caller can avoid passing the + * same mutable instance around. + */ + private[spark] def copy(): Accumulable[R, T] = { + new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues) + } + + /** * Add more data to this accumulator / accumulable * @param term the data to add */ @@ -106,7 +144,7 @@ class Accumulable[R, T] private[spark] ( def merge(term: R) { value_ = param.addInPlace(value_, term)} /** - * Access the accumulator's current value; only allowed on master. + * Access the accumulator's current value; only allowed on driver. */ def value: R = { if (!deserialized) { @@ -128,7 +166,7 @@ class Accumulable[R, T] private[spark] ( def localValue: R = value_ /** - * Set the accumulator's value; only allowed on master. + * Set the accumulator's value; only allowed on driver. */ def value_= (newValue: R) { if (!deserialized) { @@ -139,22 +177,24 @@ class Accumulable[R, T] private[spark] ( } /** - * Set the accumulator's value; only allowed on master + * Set the accumulator's value. For internal use only. */ - def setValue(newValue: R) { - this.value = newValue - } + def setValue(newValue: R): Unit = { value_ = newValue } + + /** + * Set the accumulator's value. For internal use only. + */ + private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) } // Called by Java when deserializing an object private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() value_ = zero deserialized = true + // Automatically register the accumulator when it is deserialized with the task closure. - // - // Note internal accumulators sent with task are deserialized before the TaskContext is created - // and are registered in the TaskContext constructor. Other internal accumulators, such SQL - // metrics, still need to register here. + // This is for external accumulators and internal ones that do not represent task level + // metrics, e.g. internal SQL metrics, which are per-operator. val taskContext = TaskContext.get() if (taskContext != null) { taskContext.registerAccumulator(this) diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala index 007136e6ae..558bd447e2 100644 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -17,9 +17,14 @@ package org.apache.spark -import scala.collection.{mutable, Map} +import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable import scala.ref.WeakReference +import org.apache.spark.storage.{BlockId, BlockStatus} + /** * A simpler value of [[Accumulable]] where the result type being accumulated is the same @@ -49,14 +54,18 @@ import scala.ref.WeakReference * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `T` + * @param name human-readable name associated with this accumulator + * @param internal whether this accumulator is used internally within Spark only + * @param countFailedValues whether to accumulate values from failed tasks * @tparam T result type */ class Accumulator[T] private[spark] ( @transient private[spark] val initialValue: T, param: AccumulatorParam[T], name: Option[String], - internal: Boolean) - extends Accumulable[T, T](initialValue, param, name, internal) { + internal: Boolean, + override val countFailedValues: Boolean = false) + extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) { def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { this(initialValue, param, name, false) @@ -75,43 +84,63 @@ private[spark] object Accumulators extends Logging { * This global map holds the original accumulator objects that are created on the driver. * It keeps weak references to these objects so that accumulators can be garbage-collected * once the RDDs and user-code that reference them are cleaned up. + * TODO: Don't use a global map; these should be tied to a SparkContext at the very least. */ + @GuardedBy("Accumulators") val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() - private var lastId: Long = 0 + private val nextId = new AtomicLong(0L) - def newId(): Long = synchronized { - lastId += 1 - lastId - } + /** + * Return a globally unique ID for a new [[Accumulable]]. + * Note: Once you copy the [[Accumulable]] the ID is no longer unique. + */ + def newId(): Long = nextId.getAndIncrement + /** + * Register an [[Accumulable]] created on the driver such that it can be used on the executors. + * + * All accumulators registered here can later be used as a container for accumulating partial + * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. + * Note: if an accumulator is registered here, it should also be registered with the active + * context cleaner for cleanup so as to avoid memory leaks. + * + * If an [[Accumulable]] with the same ID was already registered, this does nothing instead + * of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct + * [[org.apache.spark.executor.TaskMetrics]] from accumulator updates. + */ def register(a: Accumulable[_, _]): Unit = synchronized { - originals(a.id) = new WeakReference[Accumulable[_, _]](a) + if (!originals.contains(a.id)) { + originals(a.id) = new WeakReference[Accumulable[_, _]](a) + } } - def remove(accId: Long) { - synchronized { - originals.remove(accId) - } + /** + * Unregister the [[Accumulable]] with the given ID, if any. + */ + def remove(accId: Long): Unit = synchronized { + originals.remove(accId) } - // Add values to the original accumulators with some given IDs - def add(values: Map[Long, Any]): Unit = synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - // Since we are now storing weak references, we must check whether the underlying data - // is valid. - originals(id).get match { - case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value - case None => - throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") - } - } else { - logWarning(s"Ignoring accumulator update for unknown accumulator id $id") + /** + * Return the [[Accumulable]] registered with the given ID, if any. + */ + def get(id: Long): Option[Accumulable[_, _]] = synchronized { + originals.get(id).map { weakRef => + // Since we are storing weak references, we must check whether the underlying data is valid. + weakRef.get.getOrElse { + throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") } } } + /** + * Clear all registered [[Accumulable]]s. For testing only. + */ + def clear(): Unit = synchronized { + originals.clear() + } + } @@ -156,5 +185,23 @@ object AccumulatorParam { def zero(initialValue: Float): Float = 0f } - // TODO: Add AccumulatorParams for other types, e.g. lists and strings + // Note: when merging values, this param just adopts the newer value. This is used only + // internally for things that shouldn't really be accumulated across tasks, like input + // read method, which should be the same across all tasks in the same stage. + private[spark] object StringAccumulatorParam extends AccumulatorParam[String] { + def addInPlace(t1: String, t2: String): String = t2 + def zero(initialValue: String): String = "" + } + + // Note: this is expensive as it makes a copy of the list every time the caller adds an item. + // A better way to use this is to first accumulate the values yourself then them all at once. + private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] { + def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2 + def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T] + } + + // For the internal metric that records what blocks are updated in a particular task + private[spark] object UpdatedBlockStatusesAccumulatorParam + extends ListAccumulatorParam[(BlockId, BlockStatus)] + } diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 62629000cf..e493d9a3cf 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -57,8 +57,7 @@ case class Aggregator[K, V, C] ( Option(context).foreach { c => c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) - c.internalMetricsToAccumulators( - InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes) + c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes) } } } diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index e03977828b..45b20c0e8d 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} */ private[spark] case class Heartbeat( executorId: String, - taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics + accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates blockManagerId: BlockManagerId) /** @@ -119,14 +119,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) context.reply(true) // Messages received from executors - case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) => + case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( - executorId, taskMetrics, blockManagerId) + executorId, accumUpdates, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala index 6ea997c079..c191122c06 100644 --- a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -17,42 +17,193 @@ package org.apache.spark +import org.apache.spark.storage.{BlockId, BlockStatus} -// This is moved to its own file because many more things will be added to it in SPARK-10620. + +/** + * A collection of fields and methods concerned with internal accumulators that represent + * task level metrics. + */ private[spark] object InternalAccumulator { - val PEAK_EXECUTION_MEMORY = "peakExecutionMemory" - val TEST_ACCUMULATOR = "testAccumulator" - - // For testing only. - // This needs to be a def since we don't want to reuse the same accumulator across stages. - private def maybeTestAccumulator: Option[Accumulator[Long]] = { - if (sys.props.contains("spark.testing")) { - Some(new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true)) - } else { - None + + import AccumulatorParam._ + + // Prefixes used in names of internal task level metrics + val METRICS_PREFIX = "internal.metrics." + val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read." + val SHUFFLE_WRITE_METRICS_PREFIX = METRICS_PREFIX + "shuffle.write." + val OUTPUT_METRICS_PREFIX = METRICS_PREFIX + "output." + val INPUT_METRICS_PREFIX = METRICS_PREFIX + "input." + + // Names of internal task level metrics + val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime" + val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime" + val RESULT_SIZE = METRICS_PREFIX + "resultSize" + val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime" + val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime" + val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled" + val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled" + val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory" + val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses" + val TEST_ACCUM = METRICS_PREFIX + "testAccumulator" + + // scalastyle:off + + // Names of shuffle read metrics + object shuffleRead { + val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "localBlocksFetched" + val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead" + val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead" + val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime" + val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead" + } + + // Names of shuffle write metrics + object shuffleWrite { + val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten" + val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten" + val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime" + } + + // Names of output metrics + object output { + val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod" + val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten" + val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten" + } + + // Names of input metrics + object input { + val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod" + val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead" + val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead" + } + + // scalastyle:on + + /** + * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]]. + */ + def create(name: String): Accumulator[_] = { + require(name.startsWith(METRICS_PREFIX), + s"internal accumulator name must start with '$METRICS_PREFIX': $name") + getParam(name) match { + case p @ LongAccumulatorParam => newMetric[Long](0L, name, p) + case p @ IntAccumulatorParam => newMetric[Int](0, name, p) + case p @ StringAccumulatorParam => newMetric[String]("", name, p) + case p @ UpdatedBlockStatusesAccumulatorParam => + newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p) + case p => throw new IllegalArgumentException( + s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.") + } + } + + /** + * Get the [[AccumulatorParam]] associated with the internal metric name, + * which must begin with [[METRICS_PREFIX]]. + */ + def getParam(name: String): AccumulatorParam[_] = { + require(name.startsWith(METRICS_PREFIX), + s"internal accumulator name must start with '$METRICS_PREFIX': $name") + name match { + case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam + case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam + case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam + case input.READ_METHOD => StringAccumulatorParam + case output.WRITE_METHOD => StringAccumulatorParam + case _ => LongAccumulatorParam } } /** * Accumulators for tracking internal metrics. + */ + def create(): Seq[Accumulator[_]] = { + Seq[String]( + EXECUTOR_DESERIALIZE_TIME, + EXECUTOR_RUN_TIME, + RESULT_SIZE, + JVM_GC_TIME, + RESULT_SERIALIZATION_TIME, + MEMORY_BYTES_SPILLED, + DISK_BYTES_SPILLED, + PEAK_EXECUTION_MEMORY, + UPDATED_BLOCK_STATUSES).map(create) ++ + createShuffleReadAccums() ++ + createShuffleWriteAccums() ++ + createInputAccums() ++ + createOutputAccums() ++ + sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq + } + + /** + * Accumulators for tracking shuffle read metrics. + */ + def createShuffleReadAccums(): Seq[Accumulator[_]] = { + Seq[String]( + shuffleRead.REMOTE_BLOCKS_FETCHED, + shuffleRead.LOCAL_BLOCKS_FETCHED, + shuffleRead.REMOTE_BYTES_READ, + shuffleRead.LOCAL_BYTES_READ, + shuffleRead.FETCH_WAIT_TIME, + shuffleRead.RECORDS_READ).map(create) + } + + /** + * Accumulators for tracking shuffle write metrics. + */ + def createShuffleWriteAccums(): Seq[Accumulator[_]] = { + Seq[String]( + shuffleWrite.BYTES_WRITTEN, + shuffleWrite.RECORDS_WRITTEN, + shuffleWrite.WRITE_TIME).map(create) + } + + /** + * Accumulators for tracking input metrics. + */ + def createInputAccums(): Seq[Accumulator[_]] = { + Seq[String]( + input.READ_METHOD, + input.BYTES_READ, + input.RECORDS_READ).map(create) + } + + /** + * Accumulators for tracking output metrics. + */ + def createOutputAccums(): Seq[Accumulator[_]] = { + Seq[String]( + output.WRITE_METHOD, + output.BYTES_WRITTEN, + output.RECORDS_WRITTEN).map(create) + } + + /** + * Accumulators for tracking internal metrics. * * These accumulators are created with the stage such that all tasks in the stage will * add to the same set of accumulators. We do this to report the distribution of accumulator * values across all tasks within each stage. */ - def create(sc: SparkContext): Seq[Accumulator[Long]] = { - val internalAccumulators = Seq( - // Execution memory refers to the memory used by internal data structures created - // during shuffles, aggregations and joins. The value of this accumulator should be - // approximately the sum of the peak sizes across all such data structures created - // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. - new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) - ) ++ maybeTestAccumulator.toSeq - internalAccumulators.foreach { accumulator => - sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator)) + def create(sc: SparkContext): Seq[Accumulator[_]] = { + val accums = create() + accums.foreach { accum => + Accumulators.register(accum) + sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) } - internalAccumulators + accums + } + + /** + * Create a new accumulator representing an internal task metric. + */ + private def newMetric[T]( + initialValue: T, + name: String, + param: AccumulatorParam[T]): Accumulator[T] = { + new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true) } + } diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 7704abc134..9f49cf1c4c 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -64,7 +64,7 @@ object TaskContext { * An empty task context that does not represent an actual task. */ private[spark] def empty(): TaskContextImpl = { - new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty) + new TaskContextImpl(0, 0, 0, 0, null, null) } } @@ -138,7 +138,6 @@ abstract class TaskContext extends Serializable { */ def taskAttemptId(): Long - /** ::DeveloperApi:: */ @DeveloperApi def taskMetrics(): TaskMetrics @@ -161,20 +160,4 @@ abstract class TaskContext extends Serializable { */ private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit - /** - * Return the local values of internal accumulators that belong to this task. The key of the Map - * is the accumulator id and the value of the Map is the latest accumulator local value. - */ - private[spark] def collectInternalAccumulators(): Map[Long, Any] - - /** - * Return the local values of accumulators that belong to this task. The key of the Map is the - * accumulator id and the value of the Map is the latest accumulator local value. - */ - private[spark] def collectAccumulators(): Map[Long, Any] - - /** - * Accumulators for tracking internal metrics indexed by the name. - */ - private[spark] val internalMetricsToAccumulators: Map[String, Accumulator[Long]] } diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 94ff884b74..27ca46f73d 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -17,7 +17,7 @@ package org.apache.spark -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager @@ -32,11 +32,15 @@ private[spark] class TaskContextImpl( override val attemptNumber: Int, override val taskMemoryManager: TaskMemoryManager, @transient private val metricsSystem: MetricsSystem, - internalAccumulators: Seq[Accumulator[Long]], - val taskMetrics: TaskMetrics = TaskMetrics.empty) + initialAccumulators: Seq[Accumulator[_]] = InternalAccumulator.create()) extends TaskContext with Logging { + /** + * Metrics associated with this task. + */ + override val taskMetrics: TaskMetrics = new TaskMetrics(initialAccumulators) + // List of callback functions to execute when the task completes. @transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener] @@ -91,24 +95,8 @@ private[spark] class TaskContextImpl( override def getMetricsSources(sourceName: String): Seq[Source] = metricsSystem.getSourcesByName(sourceName) - @transient private val accumulators = new HashMap[Long, Accumulable[_, _]] - - private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = synchronized { - accumulators(a.id) = a - } - - private[spark] override def collectInternalAccumulators(): Map[Long, Any] = synchronized { - accumulators.filter(_._2.isInternal).mapValues(_.localValue).toMap + private[spark] override def registerAccumulator(a: Accumulable[_, _]): Unit = { + taskMetrics.registerAccumulator(a) } - private[spark] override def collectAccumulators(): Map[Long, Any] = synchronized { - accumulators.mapValues(_.localValue).toMap - } - - private[spark] override val internalMetricsToAccumulators: Map[String, Accumulator[Long]] = { - // Explicitly register internal accumulators here because these are - // not captured in the task closure and are already deserialized - internalAccumulators.foreach(registerAccumulator) - internalAccumulators.map { a => (a.name.get, a) }.toMap - } } diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 13241b77bf..68340cc704 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -19,8 +19,11 @@ package org.apache.spark import java.io.{ObjectInputStream, ObjectOutputStream} +import scala.util.Try + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -115,22 +118,34 @@ case class ExceptionFailure( description: String, stackTrace: Array[StackTraceElement], fullStackTrace: String, - metrics: Option[TaskMetrics], - private val exceptionWrapper: Option[ThrowableSerializationWrapper]) + exceptionWrapper: Option[ThrowableSerializationWrapper], + accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]) extends TaskFailedReason { + @deprecated("use accumUpdates instead", "2.0.0") + val metrics: Option[TaskMetrics] = { + if (accumUpdates.nonEmpty) { + Try(TaskMetrics.fromAccumulatorUpdates(accumUpdates)).toOption + } else { + None + } + } + /** * `preserveCause` is used to keep the exception itself so it is available to the * driver. This may be set to `false` in the event that the exception is not in fact * serializable. */ - private[spark] def this(e: Throwable, metrics: Option[TaskMetrics], preserveCause: Boolean) { - this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics, - if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None) + private[spark] def this( + e: Throwable, + accumUpdates: Seq[AccumulableInfo], + preserveCause: Boolean) { + this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), + if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None, accumUpdates) } - private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) { - this(e, metrics, preserveCause = true) + private[spark] def this(e: Throwable, accumUpdates: Seq[AccumulableInfo]) { + this(e, accumUpdates, preserveCause = true) } def exception: Option[Throwable] = exceptionWrapper.flatMap { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 8ba3f5e241..06b5101b1f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -370,6 +370,14 @@ object SparkHadoopUtil { val SPARK_YARN_CREDS_COUNTER_DELIM = "-" + /** + * Number of records to update input metrics when reading from HadoopRDDs. + * + * Each update is potentially expensive because we need to use reflection to access the + * Hadoop FileSystem API of interest (only available in 2.5), so we should do this sparingly. + */ + private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000 + def get: SparkHadoopUtil = { // Check each time to support changing to/from YARN val yarnMode = java.lang.Boolean.valueOf( diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 030ae41db4..51c000ea5c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -31,7 +31,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.rpc.RpcTimeout -import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task} +import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTaskResult, Task} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util._ @@ -210,7 +210,7 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() var threwException = true - val (value, accumUpdates) = try { + val value = try { val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, @@ -249,10 +249,11 @@ private[spark] class Executor( m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) - m.updateAccumulators() } - val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull) + // Note: accumulator updates must be collected after TaskMetrics is updated + val accumUpdates = task.collectAccumulatorUpdates() + val directResult = new DirectTaskResult(valueBytes, accumUpdates) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit @@ -297,21 +298,25 @@ private[spark] class Executor( // the default uncaught exception handler, which will terminate the Executor. logError(s"Exception in $taskName (TID $taskId)", t) - val metrics: Option[TaskMetrics] = Option(task).flatMap { task => - task.metrics.map { m => + // Collect latest accumulator values to report back to the driver + val accumulatorUpdates: Seq[AccumulableInfo] = + if (task != null) { + task.metrics.foreach { m => m.setExecutorRunTime(System.currentTimeMillis() - taskStart) m.setJvmGCTime(computeTotalGcTime() - startGCTime) - m.updateAccumulators() - m } + task.collectAccumulatorUpdates(taskFailed = true) + } else { + Seq.empty[AccumulableInfo] } + val serializedTaskEndReason = { try { - ser.serialize(new ExceptionFailure(t, metrics)) + ser.serialize(new ExceptionFailure(t, accumulatorUpdates)) } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace - ser.serialize(new ExceptionFailure(t, metrics, false)) + ser.serialize(new ExceptionFailure(t, accumulatorUpdates, preserveCause = false)) } } execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) @@ -418,33 +423,21 @@ private[spark] class Executor( /** Reports heartbeat and metrics for active tasks to the driver. */ private def reportHeartBeat(): Unit = { - // list of (task id, metrics) to send back to the driver - val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() + // list of (task id, accumUpdates) to send back to the driver + val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulableInfo])]() val curGCTime = computeTotalGcTime() for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { taskRunner.task.metrics.foreach { metrics => metrics.mergeShuffleReadMetrics() - metrics.updateInputMetrics() metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - metrics.updateAccumulators() - - if (isLocal) { - // JobProgressListener will hold an reference of it during - // onExecutorMetricsUpdate(), then JobProgressListener can not see - // the changes of metrics any more, so make a deep copy of it - val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics)) - tasksMetrics += ((taskRunner.taskId, copiedMetrics)) - } else { - // It will be copied by serialization - tasksMetrics += ((taskRunner.taskId, metrics)) - } + accumUpdates += ((taskRunner.taskId, metrics.accumulatorUpdates())) } } } - val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) + val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) try { val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 8f1d7f89a4..ed9e157ce7 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -17,13 +17,15 @@ package org.apache.spark.executor +import org.apache.spark.{Accumulator, InternalAccumulator} import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: - * Method by which input data was read. Network means that the data was read over the network + * Method by which input data was read. Network means that the data was read over the network * from a remote block manager (which may have stored the data on-disk or in-memory). + * Operations are not thread-safe. */ @DeveloperApi object DataReadMethod extends Enumeration with Serializable { @@ -34,44 +36,75 @@ object DataReadMethod extends Enumeration with Serializable { /** * :: DeveloperApi :: - * Metrics about reading input data. + * A collection of accumulators that represents metrics about reading data from external systems. */ @DeveloperApi -case class InputMetrics(readMethod: DataReadMethod.Value) { +class InputMetrics private ( + _bytesRead: Accumulator[Long], + _recordsRead: Accumulator[Long], + _readMethod: Accumulator[String]) + extends Serializable { + + private[executor] def this(accumMap: Map[String, Accumulator[_]]) { + this( + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.BYTES_READ), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.input.RECORDS_READ), + TaskMetrics.getAccum[String](accumMap, InternalAccumulator.input.READ_METHOD)) + } /** - * This is volatile so that it is visible to the updater thread. + * Create a new [[InputMetrics]] that is not associated with any particular task. + * + * This mainly exists because of SPARK-5225, where we are forced to use a dummy [[InputMetrics]] + * because we want to ignore metrics from a second read method. In the future, we should revisit + * whether this is needed. + * + * A better alternative is [[TaskMetrics.registerInputMetrics]]. */ - @volatile @transient var bytesReadCallback: Option[() => Long] = None + private[executor] def this() { + this(InternalAccumulator.createInputAccums() + .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]) + } /** - * Total bytes read. + * Total number of bytes read. */ - private var _bytesRead: Long = _ - def bytesRead: Long = _bytesRead - def incBytesRead(bytes: Long): Unit = _bytesRead += bytes + def bytesRead: Long = _bytesRead.localValue /** - * Total records read. + * Total number of records read. */ - private var _recordsRead: Long = _ - def recordsRead: Long = _recordsRead - def incRecordsRead(records: Long): Unit = _recordsRead += records + def recordsRead: Long = _recordsRead.localValue /** - * Invoke the bytesReadCallback and mutate bytesRead. + * The source from which this task reads its input. */ - def updateBytesRead() { - bytesReadCallback.foreach { c => - _bytesRead = c() - } + def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue) + + @deprecated("incrementing input metrics is for internal use only", "2.0.0") + def incBytesRead(v: Long): Unit = _bytesRead.add(v) + @deprecated("incrementing input metrics is for internal use only", "2.0.0") + def incRecordsRead(v: Long): Unit = _recordsRead.add(v) + private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v) + private[spark] def setReadMethod(v: DataReadMethod.Value): Unit = + _readMethod.setValue(v.toString) + +} + +/** + * Deprecated methods to preserve case class matching behavior before Spark 2.0. + */ +object InputMetrics { + + @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0") + def apply(readMethod: DataReadMethod.Value): InputMetrics = { + val im = new InputMetrics + im.setReadMethod(readMethod) + im } - /** - * Register a function that can be called to get up-to-date information on how many bytes the task - * has read from an input source. - */ - def setBytesReadCallback(f: Option[() => Long]) { - bytesReadCallback = f + @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0") + def unapply(input: InputMetrics): Option[DataReadMethod.Value] = { + Some(input.readMethod) } } diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index ad132d004c..0b37d559c7 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -17,12 +17,14 @@ package org.apache.spark.executor +import org.apache.spark.{Accumulator, InternalAccumulator} import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: * Method by which output data was written. + * Operations are not thread-safe. */ @DeveloperApi object DataWriteMethod extends Enumeration with Serializable { @@ -33,21 +35,70 @@ object DataWriteMethod extends Enumeration with Serializable { /** * :: DeveloperApi :: - * Metrics about writing output data. + * A collection of accumulators that represents metrics about writing data to external systems. */ @DeveloperApi -case class OutputMetrics(writeMethod: DataWriteMethod.Value) { +class OutputMetrics private ( + _bytesWritten: Accumulator[Long], + _recordsWritten: Accumulator[Long], + _writeMethod: Accumulator[String]) + extends Serializable { + + private[executor] def this(accumMap: Map[String, Accumulator[_]]) { + this( + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.BYTES_WRITTEN), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.output.RECORDS_WRITTEN), + TaskMetrics.getAccum[String](accumMap, InternalAccumulator.output.WRITE_METHOD)) + } + + /** + * Create a new [[OutputMetrics]] that is not associated with any particular task. + * + * This is only used for preserving matching behavior on [[OutputMetrics]], which used to be + * a case class before Spark 2.0. Once we remove support for matching on [[OutputMetrics]] + * we can remove this constructor as well. + */ + private[executor] def this() { + this(InternalAccumulator.createOutputAccums() + .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]]) + } + + /** + * Total number of bytes written. + */ + def bytesWritten: Long = _bytesWritten.localValue + /** - * Total bytes written + * Total number of records written. */ - private var _bytesWritten: Long = _ - def bytesWritten: Long = _bytesWritten - private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value + def recordsWritten: Long = _recordsWritten.localValue /** - * Total records written + * The source to which this task writes its output. */ - private var _recordsWritten: Long = 0L - def recordsWritten: Long = _recordsWritten - private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value + def writeMethod: DataWriteMethod.Value = DataWriteMethod.withName(_writeMethod.localValue) + + private[spark] def setBytesWritten(v: Long): Unit = _bytesWritten.setValue(v) + private[spark] def setRecordsWritten(v: Long): Unit = _recordsWritten.setValue(v) + private[spark] def setWriteMethod(v: DataWriteMethod.Value): Unit = + _writeMethod.setValue(v.toString) + +} + +/** + * Deprecated methods to preserve case class matching behavior before Spark 2.0. + */ +object OutputMetrics { + + @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0") + def apply(writeMethod: DataWriteMethod.Value): OutputMetrics = { + val om = new OutputMetrics + om.setWriteMethod(writeMethod) + om + } + + @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0") + def unapply(output: OutputMetrics): Option[DataWriteMethod.Value] = { + Some(output.writeMethod) + } } diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index e985b35ace..50bb645d97 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -17,71 +17,103 @@ package org.apache.spark.executor +import org.apache.spark.{Accumulator, InternalAccumulator} import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: - * Metrics pertaining to shuffle data read in a given task. + * A collection of accumulators that represent metrics about reading shuffle data. + * Operations are not thread-safe. */ @DeveloperApi -class ShuffleReadMetrics extends Serializable { +class ShuffleReadMetrics private ( + _remoteBlocksFetched: Accumulator[Int], + _localBlocksFetched: Accumulator[Int], + _remoteBytesRead: Accumulator[Long], + _localBytesRead: Accumulator[Long], + _fetchWaitTime: Accumulator[Long], + _recordsRead: Accumulator[Long]) + extends Serializable { + + private[executor] def this(accumMap: Map[String, Accumulator[_]]) { + this( + TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.REMOTE_BLOCKS_FETCHED), + TaskMetrics.getAccum[Int](accumMap, InternalAccumulator.shuffleRead.LOCAL_BLOCKS_FETCHED), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.REMOTE_BYTES_READ), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.LOCAL_BYTES_READ), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.FETCH_WAIT_TIME), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleRead.RECORDS_READ)) + } + /** - * Number of remote blocks fetched in this shuffle by this task + * Create a new [[ShuffleReadMetrics]] that is not associated with any particular task. + * + * This mainly exists for legacy reasons, because we use dummy [[ShuffleReadMetrics]] in + * many places only to merge their values together later. In the future, we should revisit + * whether this is needed. + * + * A better alternative is [[TaskMetrics.registerTempShuffleReadMetrics]] followed by + * [[TaskMetrics.mergeShuffleReadMetrics]]. */ - private var _remoteBlocksFetched: Int = _ - def remoteBlocksFetched: Int = _remoteBlocksFetched - private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value - private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value + private[spark] def this() { + this(InternalAccumulator.createShuffleReadAccums().map { a => (a.name.get, a) }.toMap) + } /** - * Number of local blocks fetched in this shuffle by this task + * Number of remote blocks fetched in this shuffle by this task. */ - private var _localBlocksFetched: Int = _ - def localBlocksFetched: Int = _localBlocksFetched - private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value - private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value + def remoteBlocksFetched: Int = _remoteBlocksFetched.localValue /** - * Time the task spent waiting for remote shuffle blocks. This only includes the time - * blocking on shuffle input data. For instance if block B is being fetched while the task is - * still not finished processing block A, it is not considered to be blocking on block B. + * Number of local blocks fetched in this shuffle by this task. */ - private var _fetchWaitTime: Long = _ - def fetchWaitTime: Long = _fetchWaitTime - private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value - private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value + def localBlocksFetched: Int = _localBlocksFetched.localValue /** - * Total number of remote bytes read from the shuffle by this task + * Total number of remote bytes read from the shuffle by this task. */ - private var _remoteBytesRead: Long = _ - def remoteBytesRead: Long = _remoteBytesRead - private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value - private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value + def remoteBytesRead: Long = _remoteBytesRead.localValue /** * Shuffle data that was read from the local disk (as opposed to from a remote executor). */ - private var _localBytesRead: Long = _ - def localBytesRead: Long = _localBytesRead - private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value + def localBytesRead: Long = _localBytesRead.localValue /** - * Total bytes fetched in the shuffle by this task (both remote and local). + * Time the task spent waiting for remote shuffle blocks. This only includes the time + * blocking on shuffle input data. For instance if block B is being fetched while the task is + * still not finished processing block A, it is not considered to be blocking on block B. + */ + def fetchWaitTime: Long = _fetchWaitTime.localValue + + /** + * Total number of records read from the shuffle by this task. */ - def totalBytesRead: Long = _remoteBytesRead + _localBytesRead + def recordsRead: Long = _recordsRead.localValue /** - * Number of blocks fetched in this shuffle by this task (remote or local) + * Total bytes fetched in the shuffle by this task (both remote and local). */ - def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched + def totalBytesRead: Long = remoteBytesRead + localBytesRead /** - * Total number of records read from the shuffle by this task + * Number of blocks fetched in this shuffle by this task (remote or local). */ - private var _recordsRead: Long = _ - def recordsRead: Long = _recordsRead - private[spark] def incRecordsRead(value: Long) = _recordsRead += value - private[spark] def decRecordsRead(value: Long) = _recordsRead -= value + def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched + + private[spark] def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.add(v) + private[spark] def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.add(v) + private[spark] def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead.add(v) + private[spark] def incLocalBytesRead(v: Long): Unit = _localBytesRead.add(v) + private[spark] def incFetchWaitTime(v: Long): Unit = _fetchWaitTime.add(v) + private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v) + + private[spark] def setRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched.setValue(v) + private[spark] def setLocalBlocksFetched(v: Int): Unit = _localBlocksFetched.setValue(v) + private[spark] def setRemoteBytesRead(v: Long): Unit = _remoteBytesRead.setValue(v) + private[spark] def setLocalBytesRead(v: Long): Unit = _localBytesRead.setValue(v) + private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v) + private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v) + } diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 24795f8600..c7aaabb561 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,40 +17,66 @@ package org.apache.spark.executor +import org.apache.spark.{Accumulator, InternalAccumulator} import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: - * Metrics pertaining to shuffle data written in a given task. + * A collection of accumulators that represent metrics about writing shuffle data. + * Operations are not thread-safe. */ @DeveloperApi -class ShuffleWriteMetrics extends Serializable { +class ShuffleWriteMetrics private ( + _bytesWritten: Accumulator[Long], + _recordsWritten: Accumulator[Long], + _writeTime: Accumulator[Long]) + extends Serializable { + + private[executor] def this(accumMap: Map[String, Accumulator[_]]) { + this( + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.BYTES_WRITTEN), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.RECORDS_WRITTEN), + TaskMetrics.getAccum[Long](accumMap, InternalAccumulator.shuffleWrite.WRITE_TIME)) + } /** - * Number of bytes written for the shuffle by this task + * Create a new [[ShuffleWriteMetrics]] that is not associated with any particular task. + * + * This mainly exists for legacy reasons, because we use dummy [[ShuffleWriteMetrics]] in + * many places only to merge their values together later. In the future, we should revisit + * whether this is needed. + * + * A better alternative is [[TaskMetrics.registerShuffleWriteMetrics]]. */ - @volatile private var _bytesWritten: Long = _ - def bytesWritten: Long = _bytesWritten - private[spark] def incBytesWritten(value: Long) = _bytesWritten += value - private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value + private[spark] def this() { + this(InternalAccumulator.createShuffleWriteAccums().map { a => (a.name.get, a) }.toMap) + } /** - * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds + * Number of bytes written for the shuffle by this task. */ - @volatile private var _writeTime: Long = _ - def writeTime: Long = _writeTime - private[spark] def incWriteTime(value: Long) = _writeTime += value - private[spark] def decWriteTime(value: Long) = _writeTime -= value + def bytesWritten: Long = _bytesWritten.localValue /** - * Total number of records written to the shuffle by this task + * Total number of records written to the shuffle by this task. */ - @volatile private var _recordsWritten: Long = _ - def recordsWritten: Long = _recordsWritten - private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value - private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value - private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value + def recordsWritten: Long = _recordsWritten.localValue + + /** + * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds. + */ + def writeTime: Long = _writeTime.localValue + + private[spark] def incBytesWritten(v: Long): Unit = _bytesWritten.add(v) + private[spark] def incRecordsWritten(v: Long): Unit = _recordsWritten.add(v) + private[spark] def incWriteTime(v: Long): Unit = _writeTime.add(v) + private[spark] def decBytesWritten(v: Long): Unit = { + _bytesWritten.setValue(bytesWritten - v) + } + private[spark] def decRecordsWritten(v: Long): Unit = { + _recordsWritten.setValue(recordsWritten - v) + } // Legacy methods for backward compatibility. // TODO: remove these once we make this class private. diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 32ef5a9b56..8d10bf588e 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,90 +17,161 @@ package org.apache.spark.executor -import java.io.{IOException, ObjectInputStream} -import java.util.concurrent.ConcurrentHashMap - +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.DataReadMethod.DataReadMethod +import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.{BlockId, BlockStatus} -import org.apache.spark.util.Utils /** * :: DeveloperApi :: * Metrics tracked during the execution of a task. * - * This class is used to house metrics both for in-progress and completed tasks. In executors, - * both the task thread and the heartbeat thread write to the TaskMetrics. The heartbeat thread - * reads it to send in-progress metrics, and the task thread reads it to send metrics along with - * the completed task. + * This class is wrapper around a collection of internal accumulators that represent metrics + * associated with a task. The local values of these accumulators are sent from the executor + * to the driver when the task completes. These values are then merged into the corresponding + * accumulator previously registered on the driver. + * + * The accumulator updates are also sent to the driver periodically (on executor heartbeat) + * and when the task failed with an exception. The [[TaskMetrics]] object itself should never + * be sent to the driver. * - * So, when adding new fields, take into consideration that the whole object can be serialized for - * shipping off at any time to consumers of the SparkListener interface. + * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on. + * Each accumulator in this initial set must be uniquely named and marked + * as internal. Additional accumulators registered later need not satisfy + * these requirements. */ @DeveloperApi -class TaskMetrics extends Serializable { +class TaskMetrics(initialAccums: Seq[Accumulator[_]]) extends Serializable { + + import InternalAccumulator._ + + // Needed for Java tests + def this() { + this(InternalAccumulator.create()) + } + + /** + * All accumulators registered with this task. + */ + private val accums = new ArrayBuffer[Accumulable[_, _]] + accums ++= initialAccums + + /** + * A map for quickly accessing the initial set of accumulators by name. + */ + private val initialAccumsMap: Map[String, Accumulator[_]] = { + val map = new mutable.HashMap[String, Accumulator[_]] + initialAccums.foreach { a => + val name = a.name.getOrElse { + throw new IllegalArgumentException( + "initial accumulators passed to TaskMetrics must be named") + } + require(a.isInternal, + s"initial accumulator '$name' passed to TaskMetrics must be marked as internal") + require(!map.contains(name), + s"detected duplicate accumulator name '$name' when constructing TaskMetrics") + map(name) = a + } + map.toMap + } + + // Each metric is internally represented as an accumulator + private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME) + private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME) + private val _resultSize = getAccum(RESULT_SIZE) + private val _jvmGCTime = getAccum(JVM_GC_TIME) + private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME) + private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED) + private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED) + private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY) + private val _updatedBlockStatuses = + TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES) + /** - * Host's name the task runs on + * Time taken on the executor to deserialize this task. */ - private var _hostname: String = _ - def hostname: String = _hostname - private[spark] def setHostname(value: String) = _hostname = value + def executorDeserializeTime: Long = _executorDeserializeTime.localValue /** - * Time taken on the executor to deserialize this task + * Time the executor spends actually running the task (including fetching shuffle data). */ - private var _executorDeserializeTime: Long = _ - def executorDeserializeTime: Long = _executorDeserializeTime - private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value + def executorRunTime: Long = _executorRunTime.localValue + /** + * The number of bytes this task transmitted back to the driver as the TaskResult. + */ + def resultSize: Long = _resultSize.localValue /** - * Time the executor spends actually running the task (including fetching shuffle data) + * Amount of time the JVM spent in garbage collection while executing this task. */ - private var _executorRunTime: Long = _ - def executorRunTime: Long = _executorRunTime - private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value + def jvmGCTime: Long = _jvmGCTime.localValue /** - * The number of bytes this task transmitted back to the driver as the TaskResult + * Amount of time spent serializing the task result. */ - private var _resultSize: Long = _ - def resultSize: Long = _resultSize - private[spark] def setResultSize(value: Long) = _resultSize = value + def resultSerializationTime: Long = _resultSerializationTime.localValue + /** + * The number of in-memory bytes spilled by this task. + */ + def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue /** - * Amount of time the JVM spent in garbage collection while executing this task + * The number of on-disk bytes spilled by this task. */ - private var _jvmGCTime: Long = _ - def jvmGCTime: Long = _jvmGCTime - private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value + def diskBytesSpilled: Long = _diskBytesSpilled.localValue /** - * Amount of time spent serializing the task result + * Peak memory used by internal data structures created during shuffles, aggregations and + * joins. The value of this accumulator should be approximately the sum of the peak sizes + * across all such data structures created in this task. For SQL jobs, this only tracks all + * unsafe operators and ExternalSort. */ - private var _resultSerializationTime: Long = _ - def resultSerializationTime: Long = _resultSerializationTime - private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value + def peakExecutionMemory: Long = _peakExecutionMemory.localValue /** - * The number of in-memory bytes spilled by this task + * Storage statuses of any blocks that have been updated as a result of this task. */ - private var _memoryBytesSpilled: Long = _ - def memoryBytesSpilled: Long = _memoryBytesSpilled - private[spark] def incMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled += value - private[spark] def decMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled -= value + def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue + + @deprecated("use updatedBlockStatuses instead", "2.0.0") + def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = { + if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None + } + + // Setters and increment-ers + private[spark] def setExecutorDeserializeTime(v: Long): Unit = + _executorDeserializeTime.setValue(v) + private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v) + private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v) + private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v) + private[spark] def setResultSerializationTime(v: Long): Unit = + _resultSerializationTime.setValue(v) + private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v) + private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v) + private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v) + private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = + _updatedBlockStatuses.add(v) + private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = + _updatedBlockStatuses.setValue(v) /** - * The number of on-disk bytes spilled by this task + * Get a Long accumulator from the given map by name, assuming it exists. + * Note: this only searches the initial set of accumulators passed into the constructor. */ - private var _diskBytesSpilled: Long = _ - def diskBytesSpilled: Long = _diskBytesSpilled - private[spark] def incDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled += value - private[spark] def decDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled -= value + private[spark] def getAccum(name: String): Accumulator[Long] = { + TaskMetrics.getAccum[Long](initialAccumsMap, name) + } + + + /* ========================== * + | INPUT METRICS | + * ========================== */ private var _inputMetrics: Option[InputMetrics] = None @@ -116,7 +187,8 @@ class TaskMetrics extends Serializable { private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = { synchronized { val metrics = _inputMetrics.getOrElse { - val metrics = new InputMetrics(readMethod) + val metrics = new InputMetrics(initialAccumsMap) + metrics.setReadMethod(readMethod) _inputMetrics = Some(metrics) metrics } @@ -128,18 +200,17 @@ class TaskMetrics extends Serializable { if (metrics.readMethod == readMethod) { metrics } else { - new InputMetrics(readMethod) + val m = new InputMetrics + m.setReadMethod(readMethod) + m } } } - /** - * This should only be used when recreating TaskMetrics, not when updating input metrics in - * executors - */ - private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) { - _inputMetrics = inputMetrics - } + + /* ============================ * + | OUTPUT METRICS | + * ============================ */ private var _outputMetrics: Option[OutputMetrics] = None @@ -149,23 +220,24 @@ class TaskMetrics extends Serializable { */ def outputMetrics: Option[OutputMetrics] = _outputMetrics - @deprecated("setting OutputMetrics is for internal use only", "2.0.0") - def outputMetrics_=(om: Option[OutputMetrics]): Unit = { - _outputMetrics = om - } - /** * Get or create a new [[OutputMetrics]] associated with this task. */ private[spark] def registerOutputMetrics( writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized { _outputMetrics.getOrElse { - val metrics = new OutputMetrics(writeMethod) + val metrics = new OutputMetrics(initialAccumsMap) + metrics.setWriteMethod(writeMethod) _outputMetrics = Some(metrics) metrics } } + + /* ================================== * + | SHUFFLE READ METRICS | + * ================================== */ + private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None /** @@ -175,20 +247,12 @@ class TaskMetrics extends Serializable { def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics /** - * This should only be used when recreating TaskMetrics, not when updating read metrics in - * executors. - */ - private[spark] def setShuffleReadMetrics(shuffleReadMetrics: Option[ShuffleReadMetrics]) { - _shuffleReadMetrics = shuffleReadMetrics - } - - /** * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency. * * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for * each dependency and merge these metrics before reporting them to the driver. - */ + */ @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics] /** @@ -210,19 +274,21 @@ class TaskMetrics extends Serializable { */ private[spark] def mergeShuffleReadMetrics(): Unit = synchronized { if (tempShuffleReadMetrics.nonEmpty) { - val merged = new ShuffleReadMetrics - for (depMetrics <- tempShuffleReadMetrics) { - merged.incFetchWaitTime(depMetrics.fetchWaitTime) - merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) - merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) - merged.incRemoteBytesRead(depMetrics.remoteBytesRead) - merged.incLocalBytesRead(depMetrics.localBytesRead) - merged.incRecordsRead(depMetrics.recordsRead) - } - _shuffleReadMetrics = Some(merged) + val metrics = new ShuffleReadMetrics(initialAccumsMap) + metrics.setRemoteBlocksFetched(tempShuffleReadMetrics.map(_.remoteBlocksFetched).sum) + metrics.setLocalBlocksFetched(tempShuffleReadMetrics.map(_.localBlocksFetched).sum) + metrics.setFetchWaitTime(tempShuffleReadMetrics.map(_.fetchWaitTime).sum) + metrics.setRemoteBytesRead(tempShuffleReadMetrics.map(_.remoteBytesRead).sum) + metrics.setLocalBytesRead(tempShuffleReadMetrics.map(_.localBytesRead).sum) + metrics.setRecordsRead(tempShuffleReadMetrics.map(_.recordsRead).sum) + _shuffleReadMetrics = Some(metrics) } } + /* =================================== * + | SHUFFLE WRITE METRICS | + * =================================== */ + private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None /** @@ -230,86 +296,120 @@ class TaskMetrics extends Serializable { */ def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics - @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0") - def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = { - _shuffleWriteMetrics = swm - } - /** * Get or create a new [[ShuffleWriteMetrics]] associated with this task. */ private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized { _shuffleWriteMetrics.getOrElse { - val metrics = new ShuffleWriteMetrics + val metrics = new ShuffleWriteMetrics(initialAccumsMap) _shuffleWriteMetrics = Some(metrics) metrics } } - private var _updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = - Seq.empty[(BlockId, BlockStatus)] - /** - * Storage statuses of any blocks that have been updated as a result of this task. - */ - def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses + /* ========================== * + | OTHER THINGS | + * ========================== */ - @deprecated("setting updated blocks is for internal use only", "2.0.0") - def updatedBlocks_=(ub: Option[Seq[(BlockId, BlockStatus)]]): Unit = { - _updatedBlockStatuses = ub.getOrElse(Seq.empty[(BlockId, BlockStatus)]) + private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = { + accums += a } - private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = { - _updatedBlockStatuses ++= v + /** + * Return the latest updates of accumulators in this task. + * + * The [[AccumulableInfo.update]] field is always defined and the [[AccumulableInfo.value]] + * field is always empty, since this represents the partial updates recorded in this task, + * not the aggregated value across multiple tasks. + */ + def accumulatorUpdates(): Seq[AccumulableInfo] = accums.map { a => + new AccumulableInfo(a.id, a.name, Some(a.localValue), None, a.isInternal, a.countFailedValues) } - private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit = { - _updatedBlockStatuses = v + // If we are reconstructing this TaskMetrics on the driver, some metrics may already be set. + // If so, initialize all relevant metrics classes so listeners can access them downstream. + { + var (hasShuffleRead, hasShuffleWrite, hasInput, hasOutput) = (false, false, false, false) + initialAccums + .filter { a => a.localValue != a.zero } + .foreach { a => + a.name.get match { + case sr if sr.startsWith(SHUFFLE_READ_METRICS_PREFIX) => hasShuffleRead = true + case sw if sw.startsWith(SHUFFLE_WRITE_METRICS_PREFIX) => hasShuffleWrite = true + case in if in.startsWith(INPUT_METRICS_PREFIX) => hasInput = true + case out if out.startsWith(OUTPUT_METRICS_PREFIX) => hasOutput = true + case _ => + } + } + if (hasShuffleRead) { _shuffleReadMetrics = Some(new ShuffleReadMetrics(initialAccumsMap)) } + if (hasShuffleWrite) { _shuffleWriteMetrics = Some(new ShuffleWriteMetrics(initialAccumsMap)) } + if (hasInput) { _inputMetrics = Some(new InputMetrics(initialAccumsMap)) } + if (hasOutput) { _outputMetrics = Some(new OutputMetrics(initialAccumsMap)) } } - @deprecated("use updatedBlockStatuses instead", "2.0.0") - def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = { - if (_updatedBlockStatuses.nonEmpty) Some(_updatedBlockStatuses) else None - } +} - private[spark] def updateInputMetrics(): Unit = synchronized { - inputMetrics.foreach(_.updateBytesRead()) - } +private[spark] object TaskMetrics extends Logging { - @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { - in.defaultReadObject() - // Get the hostname from cached data, since hostname is the order of number of nodes in - // cluster, so using cached hostname will decrease the object number and alleviate the GC - // overhead. - _hostname = TaskMetrics.getCachedHostName(_hostname) - } - - private var _accumulatorUpdates: Map[Long, Any] = Map.empty - @transient private var _accumulatorsUpdater: () => Map[Long, Any] = null + def empty: TaskMetrics = new TaskMetrics - private[spark] def updateAccumulators(): Unit = synchronized { - _accumulatorUpdates = _accumulatorsUpdater() + /** + * Get an accumulator from the given map by name, assuming it exists. + */ + def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = { + require(accumMap.contains(name), s"metric '$name' is missing") + val accum = accumMap(name) + try { + // Note: we can't do pattern matching here because types are erased by compile time + accum.asInstanceOf[Accumulator[T]] + } catch { + case e: ClassCastException => + throw new SparkException(s"accumulator $name was of unexpected type", e) + } } /** - * Return the latest updates of accumulators in this task. + * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only. + * + * Executors only send accumulator updates back to the driver, not [[TaskMetrics]]. However, we + * need the latter to post task end events to listeners, so we need to reconstruct the metrics + * on the driver. + * + * This assumes the provided updates contain the initial set of accumulators representing + * internal task level metrics. */ - def accumulatorUpdates(): Map[Long, Any] = _accumulatorUpdates - - private[spark] def setAccumulatorsUpdater(accumulatorsUpdater: () => Map[Long, Any]): Unit = { - _accumulatorsUpdater = accumulatorsUpdater + def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = { + // Initial accumulators are passed into the TaskMetrics constructor first because these + // are required to be uniquely named. The rest of the accumulators from this task are + // registered later because they need not satisfy this requirement. + val (initialAccumInfos, otherAccumInfos) = accumUpdates + .filter { info => info.update.isDefined } + .partition { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) } + val initialAccums = initialAccumInfos.map { info => + val accum = InternalAccumulator.create(info.name.get) + accum.setValueAny(info.update.get) + accum + } + // We don't know the types of the rest of the accumulators, so we try to find the same ones + // that were previously registered here on the driver and make copies of them. It is important + // that we copy the accumulators here since they are used across many tasks and we want to + // maintain a snapshot of their local task values when we post them to listeners downstream. + val otherAccums = otherAccumInfos.flatMap { info => + val id = info.id + val acc = Accumulators.get(id).map { a => + val newAcc = a.copy() + newAcc.setValueAny(info.update.get) + newAcc + } + if (acc.isEmpty) { + logWarning(s"encountered unregistered accumulator $id when reconstructing task metrics.") + } + acc + } + val metrics = new TaskMetrics(initialAccums) + otherAccums.foreach(metrics.registerAccumulator) + metrics } -} - -private[spark] object TaskMetrics { - private val hostNameCache = new ConcurrentHashMap[String, String]() - - def empty: TaskMetrics = new TaskMetrics - - def getCachedHostName(host: String): String = { - val canonicalHost = hostNameCache.putIfAbsent(host, host) - if (canonicalHost != null) canonicalHost else host - } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 3587e7eb1a..d9b0824b38 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -153,8 +153,7 @@ class CoGroupedRDD[K: ClassTag]( } context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) - context.internalMetricsToAccumulators( - InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes) + context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index a79ab86d49..3204e6adce 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -212,6 +212,8 @@ class HadoopRDD[K, V]( logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() + // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD + val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) // Sets the thread local variable for the file's name @@ -222,14 +224,17 @@ class HadoopRDD[K, V]( // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes - val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { - split.inputSplit.value match { - case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - case _ => None + val getBytesReadCallback: Option[() => Long] = split.inputSplit.value match { + case _: FileSplit | _: CombineFileSplit => + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + case _ => None + } + + def updateBytesRead(): Unit = { + getBytesReadCallback.foreach { getBytesRead => + inputMetrics.setBytesRead(getBytesRead()) } } - inputMetrics.setBytesReadCallback(bytesReadCallback) var reader: RecordReader[K, V] = null val inputFormat = getInputFormat(jobConf) @@ -252,6 +257,9 @@ class HadoopRDD[K, V]( if (!finished) { inputMetrics.incRecordsRead(1) } + if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + updateBytesRead() + } (key, value) } @@ -272,8 +280,8 @@ class HadoopRDD[K, V]( } finally { reader = null } - if (bytesReadCallback.isDefined) { - inputMetrics.updateBytesRead() + if (getBytesReadCallback.isDefined) { + updateBytesRead() } else if (split.inputSplit.value.isInstanceOf[FileSplit] || split.inputSplit.value.isInstanceOf[CombineFileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 5cc9c81cc6..4d2816e335 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -133,14 +133,17 @@ class NewHadoopRDD[K, V]( // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes - val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { - split.serializableHadoopSplit.value match { - case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - case _ => None + val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match { + case _: FileSplit | _: CombineFileSplit => + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + case _ => None + } + + def updateBytesRead(): Unit = { + getBytesReadCallback.foreach { getBytesRead => + inputMetrics.setBytesRead(getBytesRead()) } } - inputMetrics.setBytesReadCallback(bytesReadCallback) val format = inputFormatClass.newInstance format match { @@ -182,6 +185,9 @@ class NewHadoopRDD[K, V]( if (!finished) { inputMetrics.incRecordsRead(1) } + if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { + updateBytesRead() + } (reader.getCurrentKey, reader.getCurrentValue) } @@ -201,8 +207,8 @@ class NewHadoopRDD[K, V]( } finally { reader = null } - if (bytesReadCallback.isDefined) { - inputMetrics.updateBytesRead() + if (getBytesReadCallback.isDefined) { + updateBytesRead() } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index 146cfb9ba8..9d45fff921 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -19,47 +19,58 @@ package org.apache.spark.scheduler import org.apache.spark.annotation.DeveloperApi + /** * :: DeveloperApi :: * Information about an [[org.apache.spark.Accumulable]] modified during a task or stage. + * + * Note: once this is JSON serialized the types of `update` and `value` will be lost and be + * cast to strings. This is because the user can define an accumulator of any type and it will + * be difficult to preserve the type in consumers of the event log. This does not apply to + * internal accumulators that represent task level metrics. + * + * @param id accumulator ID + * @param name accumulator name + * @param update partial value from a task, may be None if used on driver to describe a stage + * @param value total accumulated value so far, maybe None if used on executors to describe a task + * @param internal whether this accumulator was internal + * @param countFailedValues whether to count this accumulator's partial value if the task failed */ @DeveloperApi -class AccumulableInfo private[spark] ( - val id: Long, - val name: String, - val update: Option[String], // represents a partial update within a task - val value: String, - val internal: Boolean) { - - override def equals(other: Any): Boolean = other match { - case acc: AccumulableInfo => - this.id == acc.id && this.name == acc.name && - this.update == acc.update && this.value == acc.value && - this.internal == acc.internal - case _ => false - } +case class AccumulableInfo private[spark] ( + id: Long, + name: Option[String], + update: Option[Any], // represents a partial update within a task + value: Option[Any], + private[spark] val internal: Boolean, + private[spark] val countFailedValues: Boolean) - override def hashCode(): Int = { - val state = Seq(id, name, update, value, internal) - state.map(_.hashCode).reduceLeft(31 * _ + _) - } -} +/** + * A collection of deprecated constructors. This will be removed soon. + */ object AccumulableInfo { + + @deprecated("do not create AccumulableInfo", "2.0.0") def apply( id: Long, name: String, update: Option[String], value: String, internal: Boolean): AccumulableInfo = { - new AccumulableInfo(id, name, update, value, internal) + new AccumulableInfo( + id, Option(name), update, Option(value), internal, countFailedValues = false) } + @deprecated("do not create AccumulableInfo", "2.0.0") def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = { - new AccumulableInfo(id, name, update, value, internal = false) + new AccumulableInfo( + id, Option(name), update, Option(value), internal = false, countFailedValues = false) } + @deprecated("do not create AccumulableInfo", "2.0.0") def apply(id: Long, name: String, value: String): AccumulableInfo = { - new AccumulableInfo(id, name, None, value, internal = false) + new AccumulableInfo( + id, Option(name), None, Option(value), internal = false, countFailedValues = false) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6b01a10fc1..897479b500 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -208,11 +208,10 @@ class DAGScheduler( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Map[Long, Any], - taskInfo: TaskInfo, - taskMetrics: TaskMetrics): Unit = { + accumUpdates: Seq[AccumulableInfo], + taskInfo: TaskInfo): Unit = { eventProcessLoop.post( - CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)) + CompletionEvent(task, reason, result, accumUpdates, taskInfo)) } /** @@ -222,9 +221,10 @@ class DAGScheduler( */ def executorHeartbeatReceived( execId: String, - taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics) + // (taskId, stageId, stageAttemptId, accumUpdates) + accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics)) + listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) blockManagerMaster.driverEndpoint.askWithRetry[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } @@ -1074,39 +1074,43 @@ class DAGScheduler( } } - /** Merge updates from a task to our local accumulator values */ + /** + * Merge local values from a task into the corresponding accumulators previously registered + * here on the driver. + * + * Although accumulators themselves are not thread-safe, this method is called only from one + * thread, the one that runs the scheduling loop. This means we only handle one task + * completion event at a time so we don't need to worry about locking the accumulators. + * This still doesn't stop the caller from updating the accumulator outside the scheduler, + * but that's not our problem since there's nothing we can do about that. + */ private def updateAccumulators(event: CompletionEvent): Unit = { val task = event.task val stage = stageIdToStage(task.stageId) - if (event.accumUpdates != null) { - try { - Accumulators.add(event.accumUpdates) - - event.accumUpdates.foreach { case (id, partialValue) => - // In this instance, although the reference in Accumulators.originals is a WeakRef, - // it's guaranteed to exist since the event.accumUpdates Map exists - - val acc = Accumulators.originals(id).get match { - case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] - case None => throw new NullPointerException("Non-existent reference to Accumulator") - } - - // To avoid UI cruft, ignore cases where value wasn't updated - if (acc.name.isDefined && partialValue != acc.zero) { - val name = acc.name.get - val value = s"${acc.value}" - stage.latestInfo.accumulables(id) = - new AccumulableInfo(id, name, None, value, acc.isInternal) - event.taskInfo.accumulables += - new AccumulableInfo(id, name, Some(s"$partialValue"), value, acc.isInternal) - } + try { + event.accumUpdates.foreach { ainfo => + assert(ainfo.update.isDefined, "accumulator from task should have a partial value") + val id = ainfo.id + val partialValue = ainfo.update.get + // Find the corresponding accumulator on the driver and update it + val acc: Accumulable[Any, Any] = Accumulators.get(id) match { + case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] + case None => + throw new SparkException(s"attempted to access non-existent accumulator $id") + } + acc ++= partialValue + // To avoid UI cruft, ignore cases where value wasn't updated + if (acc.name.isDefined && partialValue != acc.zero) { + val name = acc.name + stage.latestInfo.accumulables(id) = new AccumulableInfo( + id, name, None, Some(acc.value), acc.isInternal, acc.countFailedValues) + event.taskInfo.accumulables += new AccumulableInfo( + id, name, Some(partialValue), Some(acc.value), acc.isInternal, acc.countFailedValues) } - } catch { - // If we see an exception during accumulator update, just log the - // error and move on. - case e: Exception => - logError(s"Failed to update accumulators for $task", e) } + } catch { + case NonFatal(e) => + logError(s"Failed to update accumulators for task ${task.partitionId}", e) } } @@ -1116,6 +1120,7 @@ class DAGScheduler( */ private[scheduler] def handleTaskCompletion(event: CompletionEvent) { val task = event.task + val taskId = event.taskInfo.id val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) @@ -1125,12 +1130,26 @@ class DAGScheduler( event.taskInfo.attemptNumber, // this is a task attempt number event.reason) - // The success case is dealt with separately below, since we need to compute accumulator - // updates before posting. + // Reconstruct task metrics. Note: this may be null if the task has failed. + val taskMetrics: TaskMetrics = + if (event.accumUpdates.nonEmpty) { + try { + TaskMetrics.fromAccumulatorUpdates(event.accumUpdates) + } catch { + case NonFatal(e) => + logError(s"Error when attempting to reconstruct metrics for task $taskId", e) + null + } + } else { + null + } + + // The success case is dealt with separately below. + // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here. if (event.reason != Success) { val attemptId = task.stageAttemptId - listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason, - event.taskInfo, event.taskMetrics)) + listenerBus.post(SparkListenerTaskEnd( + stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics)) } if (!stageIdToStage.contains(task.stageId)) { @@ -1142,7 +1161,7 @@ class DAGScheduler( event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, - event.reason, event.taskInfo, event.taskMetrics)) + event.reason, event.taskInfo, taskMetrics)) stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => @@ -1291,7 +1310,8 @@ class DAGScheduler( // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits case exceptionFailure: ExceptionFailure => - // Do nothing here, left up to the TaskScheduler to decide how to handle user failures + // Tasks failed with exceptions might still have accumulator updates. + updateAccumulators(event) case TaskResultLost => // Do nothing here; the TaskScheduler handles these failures and resubmits the task. @@ -1637,7 +1657,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) - case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => + case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason, exception) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index dda3b6cc7f..d5cd2da7a1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -73,9 +73,8 @@ private[scheduler] case class CompletionEvent( task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Map[Long, Any], - taskInfo: TaskInfo, - taskMetrics: TaskMetrics) + accumUpdates: Seq[AccumulableInfo], + taskInfo: TaskInfo) extends DAGSchedulerEvent private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 6590cf6ffd..885f70e89f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to + * @param stageAttemptId attempt id of the stage this task belongs to * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each * partition of the given RDD. Once deserialized, the type should be * (RDD[T], (TaskContext, Iterator[T]) => U). @@ -37,6 +38,9 @@ import org.apache.spark.rdd.RDD * @param locs preferred task execution locations for locality scheduling * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). + * @param _initialAccums initial set of accumulators to be used in this task for tracking + * internal metrics. Other accumulators will be registered later when + * they are deserialized on the executors. */ private[spark] class ResultTask[T, U]( stageId: Int, @@ -45,8 +49,8 @@ private[spark] class ResultTask[T, U]( partition: Partition, locs: Seq[TaskLocation], val outputId: Int, - internalAccumulators: Seq[Accumulator[Long]]) - extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators) + _initialAccums: Seq[Accumulator[_]] = InternalAccumulator.create()) + extends Task[U](stageId, stageAttemptId, partition.index, _initialAccums) with Serializable { @transient private[this] val preferredLocs: Seq[TaskLocation] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index ea97ef0e74..89207dd175 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -33,10 +33,14 @@ import org.apache.spark.shuffle.ShuffleWriter * See [[org.apache.spark.scheduler.Task]] for more information. * * @param stageId id of the stage this task belongs to + * @param stageAttemptId attempt id of the stage this task belongs to * @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling + * @param _initialAccums initial set of accumulators to be used in this task for tracking + * internal metrics. Other accumulators will be registered later when + * they are deserialized on the executors. */ private[spark] class ShuffleMapTask( stageId: Int, @@ -44,8 +48,8 @@ private[spark] class ShuffleMapTask( taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation], - internalAccumulators: Seq[Accumulator[Long]]) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, internalAccumulators) + _initialAccums: Seq[Accumulator[_]]) + extends Task[MapStatus](stageId, stageAttemptId, partition.index, _initialAccums) with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 6c6883d703..ed3adbd81c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import javax.annotation.Nullable import scala.collection.Map import scala.collection.mutable @@ -60,7 +61,7 @@ case class SparkListenerTaskEnd( taskType: String, reason: TaskEndReason, taskInfo: TaskInfo, - taskMetrics: TaskMetrics) + @Nullable taskMetrics: TaskMetrics) extends SparkListenerEvent @DeveloperApi @@ -111,12 +112,12 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends /** * Periodic updates from executors. * @param execId executor id - * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics) + * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - taskMetrics: Seq[(Long, Int, Int, TaskMetrics)]) + accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 7ea24a217b..c1c8b47128 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -74,10 +74,10 @@ private[scheduler] abstract class Stage( val name: String = callSite.shortForm val details: String = callSite.longForm - private var _internalAccumulators: Seq[Accumulator[Long]] = Seq.empty + private var _internalAccumulators: Seq[Accumulator[_]] = Seq.empty /** Internal accumulators shared across all tasks in this stage. */ - def internalAccumulators: Seq[Accumulator[Long]] = _internalAccumulators + def internalAccumulators: Seq[Accumulator[_]] = _internalAccumulators /** * Re-initialize the internal accumulators associated with this stage. diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index fca57928ec..a49f3716e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.io.{DataInputStream, DataOutputStream} import java.nio.ByteBuffer import scala.collection.mutable.HashMap @@ -41,32 +41,29 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti * and divides the task output to multiple buckets (based on the task's partitioner). * * @param stageId id of the stage this task belongs to + * @param stageAttemptId attempt id of the stage this task belongs to * @param partitionId index of the number in the RDD + * @param initialAccumulators initial set of accumulators to be used in this task for tracking + * internal metrics. Other accumulators will be registered later when + * they are deserialized on the executors. */ private[spark] abstract class Task[T]( val stageId: Int, val stageAttemptId: Int, val partitionId: Int, - internalAccumulators: Seq[Accumulator[Long]]) extends Serializable { + val initialAccumulators: Seq[Accumulator[_]]) extends Serializable { /** - * The key of the Map is the accumulator id and the value of the Map is the latest accumulator - * local value. - */ - type AccumulatorUpdates = Map[Long, Any] - - /** - * Called by [[Executor]] to run this task. + * Called by [[org.apache.spark.executor.Executor]] to run this task. * * @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext. * @param attemptNumber how many times this task has been attempted (0 for the first attempt) * @return the result of the task along with updates of Accumulators. */ final def run( - taskAttemptId: Long, - attemptNumber: Int, - metricsSystem: MetricsSystem) - : (T, AccumulatorUpdates) = { + taskAttemptId: Long, + attemptNumber: Int, + metricsSystem: MetricsSystem): T = { context = new TaskContextImpl( stageId, partitionId, @@ -74,16 +71,14 @@ private[spark] abstract class Task[T]( attemptNumber, taskMemoryManager, metricsSystem, - internalAccumulators) + initialAccumulators) TaskContext.setTaskContext(context) - context.taskMetrics.setHostname(Utils.localHostName()) - context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try { - (runTask(context), context.collectAccumulators()) + runTask(context) } finally { context.markTaskCompleted() try { @@ -141,6 +136,18 @@ private[spark] abstract class Task[T]( def executorDeserializeTime: Long = _executorDeserializeTime /** + * Collect the latest values of accumulators used in this task. If the task failed, + * filter out the accumulators whose values should not be included on failures. + */ + def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo] = { + if (context != null) { + context.taskMetrics.accumulatorUpdates().filter { a => !taskFailed || a.countFailedValues } + } else { + Seq.empty[AccumulableInfo] + } + } + + /** * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark * code and user code to properly handle the flag. This function should be idempotent so it can * be called multiple times. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index b82c7f3fa5..03135e63d7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -20,11 +20,9 @@ package org.apache.spark.scheduler import java.io._ import java.nio.ByteBuffer -import scala.collection.Map -import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkEnv -import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockId import org.apache.spark.util.Utils @@ -36,31 +34,24 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int) extends TaskResult[T] with Serializable /** A TaskResult that contains the task's return value and accumulator updates. */ -private[spark] -class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long, Any], - var metrics: TaskMetrics) +private[spark] class DirectTaskResult[T]( + var valueBytes: ByteBuffer, + var accumUpdates: Seq[AccumulableInfo]) extends TaskResult[T] with Externalizable { private var valueObjectDeserialized = false private var valueObject: T = _ - def this() = this(null.asInstanceOf[ByteBuffer], null, null) + def this() = this(null.asInstanceOf[ByteBuffer], null) override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - - out.writeInt(valueBytes.remaining); + out.writeInt(valueBytes.remaining) Utils.writeByteBuffer(valueBytes, out) - out.writeInt(accumUpdates.size) - for ((key, value) <- accumUpdates) { - out.writeLong(key) - out.writeObject(value) - } - out.writeObject(metrics) + accumUpdates.foreach(out.writeObject) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - val blen = in.readInt() val byteVal = new Array[Byte](blen) in.readFully(byteVal) @@ -70,13 +61,12 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long if (numUpdates == 0) { accumUpdates = null } else { - val _accumUpdates = mutable.Map[Long, Any]() + val _accumUpdates = new ArrayBuffer[AccumulableInfo] for (i <- 0 until numUpdates) { - _accumUpdates(in.readLong()) = in.readObject() + _accumUpdates += in.readObject.asInstanceOf[AccumulableInfo] } accumUpdates = _accumUpdates } - metrics = in.readObject().asInstanceOf[TaskMetrics] valueObjectDeserialized = false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index f4965994d8..c94c4f55e9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.{ExecutorService, RejectedExecutionException} import scala.language.existentials import scala.util.control.NonFatal @@ -35,9 +35,12 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul extends Logging { private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4) - private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool( - THREADS, "task-result-getter") + // Exposed for testing. + protected val getTaskResultExecutor: ExecutorService = + ThreadUtils.newDaemonFixedThreadPool(THREADS, "task-result-getter") + + // Exposed for testing. protected val serializer = new ThreadLocal[SerializerInstance] { override def initialValue(): SerializerInstance = { sparkEnv.closureSerializer.newInstance() @@ -45,7 +48,9 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul } def enqueueSuccessfulTask( - taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { + taskSetManager: TaskSetManager, + tid: Long, + serializedData: ByteBuffer): Unit = { getTaskResultExecutor.execute(new Runnable { override def run(): Unit = Utils.logUncaughtExceptions { try { @@ -82,7 +87,19 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul (deserializedResult, size) } - result.metrics.setResultSize(size) + // Set the task result size in the accumulator updates received from the executors. + // We need to do this here on the driver because if we did this on the executors then + // we would have to serialize the result again after updating the size. + result.accumUpdates = result.accumUpdates.map { a => + if (a.name == Some(InternalAccumulator.RESULT_SIZE)) { + assert(a.update == Some(0L), + "task result size should not have been set on the executors") + a.copy(update = Some(size.toLong)) + } else { + a + } + } + scheduler.handleSuccessfulTask(taskSetManager, tid, result) } catch { case cnf: ClassNotFoundException => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 7c0b007db7..fccd6e0699 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -65,8 +65,10 @@ private[spark] trait TaskScheduler { * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ - def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], - blockManagerId: BlockManagerId): Boolean + def executorHeartbeatReceived( + execId: String, + accumUpdates: Array[(Long, Seq[AccumulableInfo])], + blockManagerId: BlockManagerId): Boolean /** * Get an application ID associated with the job. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6e3ef0e54f..29341dfe30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -30,7 +30,6 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.BlockManagerId @@ -380,17 +379,17 @@ private[spark] class TaskSchedulerImpl( */ override def executorHeartbeatReceived( execId: String, - taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics + accumUpdates: Array[(Long, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { - - val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized { - taskMetrics.flatMap { case (id, metrics) => + // (taskId, stageId, stageAttemptId, accumUpdates) + val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { + accumUpdates.flatMap { case (id, updates) => taskIdToTaskSetManager.get(id).map { taskSetMgr => - (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, metrics) + (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, updates) } } } - dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId) + dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId) } def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index aa39b59d8c..cf97877476 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -621,8 +621,7 @@ private[spark] class TaskSetManager( // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. - sched.dagScheduler.taskEnded( - tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) + sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) if (!successful(index)) { tasksSuccessful += 1 logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( @@ -653,8 +652,7 @@ private[spark] class TaskSetManager( info.markFailed() val index = info.index copiesRunning(index) -= 1 - var taskMetrics : TaskMetrics = null - + var accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo] val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + reason.asInstanceOf[TaskFailedReason].toErrorString val failureException: Option[Throwable] = reason match { @@ -669,7 +667,8 @@ private[spark] class TaskSetManager( None case ef: ExceptionFailure => - taskMetrics = ef.metrics.orNull + // ExceptionFailure's might have accumulator updates + accumUpdates = ef.accumUpdates if (ef.className == classOf[NotSerializableException].getName) { // If the task result wasn't serializable, there's no point in trying to re-execute it. logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying" @@ -721,7 +720,7 @@ private[spark] class TaskSetManager( // always add to failed executors failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). put(info.executorId, clock.getTimeMillis()) - sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics) + sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) addPendingTask(index) if (!isZombie && state != TaskState.KILLED && reason.isInstanceOf[TaskFailedReason] @@ -793,7 +792,8 @@ private[spark] class TaskSetManager( addPendingTask(index) // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our // stage finishes when a total of tasks.size tasks finish. - sched.dagScheduler.taskEnded(tasks(index), Resubmitted, null, null, info, null) + sched.dagScheduler.taskEnded( + tasks(index), Resubmitted, null, Seq.empty[AccumulableInfo], info) } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index a57e5b0bfb..acbe16001f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -103,8 +103,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) - context.internalMetricsToAccumulators( - InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes) + context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => aggregatedIter diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 078718ba11..9c92a50150 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -237,7 +237,8 @@ private[v1] object AllStagesResource { } def convertAccumulableInfo(acc: InternalAccumulableInfo): AccumulableInfo = { - new AccumulableInfo(acc.id, acc.name, acc.update, acc.value) + new AccumulableInfo( + acc.id, acc.name.orNull, acc.update.map(_.toString), acc.value.map(_.toString).orNull) } def convertUiTaskMetrics(internal: InternalTaskMetrics): TaskMetrics = { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 4a9f8b3052..b2aa8bfbe7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -325,12 +325,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { val taskInfo = taskStart.taskInfo if (taskInfo != null) { + val metrics = new TaskMetrics val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), { logWarning("Task start for unknown stage " + taskStart.stageId) new StageUIData }) stageData.numActiveTasks += 1 - stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo)) + stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo, Some(metrics))) } for ( activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId); @@ -387,9 +388,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { (Some(e.toErrorString), None) } - if (!metrics.isEmpty) { + metrics.foreach { m => val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics) - updateAggregateMetrics(stageData, info.executorId, metrics.get, oldMetrics) + updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) } val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info)) @@ -489,19 +490,18 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { - for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) { + for ((taskId, sid, sAttempt, accumUpdates) <- executorMetricsUpdate.accumUpdates) { val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), { logWarning("Metrics update for task in unknown stage " + sid) new StageUIData }) val taskData = stageData.taskData.get(taskId) - taskData.map { t => + val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates) + taskData.foreach { t => if (!t.taskInfo.finished) { - updateAggregateMetrics(stageData, executorMetricsUpdate.execId, taskMetrics, - t.taskMetrics) - + updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.taskMetrics) // Overwrite task metrics - t.taskMetrics = Some(taskMetrics) + t.taskMetrics = Some(metrics) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 914f6183cc..29c5ff0b5c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -271,8 +271,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value") - def accumulableRow(acc: AccumulableInfo): Elem = - <tr><td>{acc.name}</td><td>{acc.value}</td></tr> + def accumulableRow(acc: AccumulableInfo): Seq[Node] = { + (acc.name, acc.value) match { + case (Some(name), Some(value)) => <tr><td>{name}</td><td>{value}</td></tr> + case _ => Seq.empty[Node] + } + } val accumulableTable = UIUtils.listingTable( accumulableHeaders, accumulableRow, @@ -404,13 +408,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { </td> +: getFormattedTimeQuantiles(gettingResultTimes) - val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) => - info.accumulables - .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY } - .map { acc => acc.update.getOrElse("0").toLong } - .getOrElse(0L) - .toDouble - } + val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.peakExecutionMemory.toDouble + } val peakExecutionMemoryQuantiles = { <td> <span data-toggle="tooltip" @@ -891,15 +891,15 @@ private[ui] class TaskDataSource( val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) val gettingResultTime = getGettingResultTime(info, currentTime) - val (taskInternalAccumulables, taskExternalAccumulables) = - info.accumulables.partition(_.internal) - val externalAccumulableReadable = taskExternalAccumulables.map { acc => - StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}") - } - val peakExecutionMemoryUsed = taskInternalAccumulables - .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY } - .map { acc => acc.update.getOrElse("0").toLong } - .getOrElse(0L) + val externalAccumulableReadable = info.accumulables + .filterNot(_.internal) + .flatMap { a => + (a.name, a.update) match { + case (Some(name), Some(update)) => Some(StringEscapeUtils.escapeHtml4(s"$name: $update")) + case _ => None + } + } + val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L) val maybeInput = metrics.flatMap(_.inputMetrics) val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index efa22b9993..dc8070cf8a 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -233,14 +233,14 @@ private[spark] object JsonProtocol { def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { val execId = metricsUpdate.execId - val taskMetrics = metricsUpdate.taskMetrics + val accumUpdates = metricsUpdate.accumUpdates ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~ ("Executor ID" -> execId) ~ - ("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) => + ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => ("Task ID" -> taskId) ~ ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId) ~ - ("Task Metrics" -> taskMetricsToJson(metrics)) + ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList)) }) } @@ -265,7 +265,7 @@ private[spark] object JsonProtocol { ("Completion Time" -> completionTime) ~ ("Failure Reason" -> failureReason) ~ ("Accumulables" -> JArray( - stageInfo.accumulables.values.map(accumulableInfoToJson).toList)) + stageInfo.accumulables.values.map(accumulableInfoToJson).toList)) } def taskInfoToJson(taskInfo: TaskInfo): JValue = { @@ -284,11 +284,44 @@ private[spark] object JsonProtocol { } def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = { + val name = accumulableInfo.name ("ID" -> accumulableInfo.id) ~ - ("Name" -> accumulableInfo.name) ~ - ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~ - ("Value" -> accumulableInfo.value) ~ - ("Internal" -> accumulableInfo.internal) + ("Name" -> name) ~ + ("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~ + ("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~ + ("Internal" -> accumulableInfo.internal) ~ + ("Count Failed Values" -> accumulableInfo.countFailedValues) + } + + /** + * Serialize the value of an accumulator to JSON. + * + * For accumulators representing internal task metrics, this looks up the relevant + * [[AccumulatorParam]] to serialize the value accordingly. For all other accumulators, + * this will simply serialize the value as a string. + * + * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing. + */ + private[util] def accumValueToJson(name: Option[String], value: Any): JValue = { + import AccumulatorParam._ + if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { + (value, InternalAccumulator.getParam(name.get)) match { + case (v: Int, IntAccumulatorParam) => JInt(v) + case (v: Long, LongAccumulatorParam) => JInt(v) + case (v: String, StringAccumulatorParam) => JString(v) + case (v, UpdatedBlockStatusesAccumulatorParam) => + JArray(v.asInstanceOf[Seq[(BlockId, BlockStatus)]].toList.map { case (id, status) => + ("Block ID" -> id.toString) ~ + ("Status" -> blockStatusToJson(status)) + }) + case (v, p) => + throw new IllegalArgumentException(s"unexpected combination of accumulator value " + + s"type (${v.getClass.getName}) and param (${p.getClass.getName}) in '${name.get}'") + } + } else { + // For all external accumulators, just use strings + JString(value.toString) + } } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { @@ -303,9 +336,9 @@ private[spark] object JsonProtocol { }.getOrElse(JNothing) val shuffleWriteMetrics: JValue = taskMetrics.shuffleWriteMetrics.map { wm => - ("Shuffle Bytes Written" -> wm.shuffleBytesWritten) ~ - ("Shuffle Write Time" -> wm.shuffleWriteTime) ~ - ("Shuffle Records Written" -> wm.shuffleRecordsWritten) + ("Shuffle Bytes Written" -> wm.bytesWritten) ~ + ("Shuffle Write Time" -> wm.writeTime) ~ + ("Shuffle Records Written" -> wm.recordsWritten) }.getOrElse(JNothing) val inputMetrics: JValue = taskMetrics.inputMetrics.map { im => @@ -324,7 +357,6 @@ private[spark] object JsonProtocol { ("Block ID" -> id.toString) ~ ("Status" -> blockStatusToJson(status)) }) - ("Host Name" -> taskMetrics.hostname) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ ("Result Size" -> taskMetrics.resultSize) ~ @@ -352,12 +384,12 @@ private[spark] object JsonProtocol { ("Message" -> fetchFailed.message) case exceptionFailure: ExceptionFailure => val stackTrace = stackTraceToJson(exceptionFailure.stackTrace) - val metrics = exceptionFailure.metrics.map(taskMetricsToJson).getOrElse(JNothing) + val accumUpdates = JArray(exceptionFailure.accumUpdates.map(accumulableInfoToJson).toList) ("Class Name" -> exceptionFailure.className) ~ ("Description" -> exceptionFailure.description) ~ ("Stack Trace" -> stackTrace) ~ ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~ - ("Metrics" -> metrics) + ("Accumulator Updates" -> accumUpdates) case taskCommitDenied: TaskCommitDenied => ("Job ID" -> taskCommitDenied.jobID) ~ ("Partition ID" -> taskCommitDenied.partitionID) ~ @@ -619,14 +651,15 @@ private[spark] object JsonProtocol { def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = { val execInfo = (json \ "Executor ID").extract[String] - val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json => + val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json => val taskId = (json \ "Task ID").extract[Long] val stageId = (json \ "Stage ID").extract[Int] val stageAttemptId = (json \ "Stage Attempt ID").extract[Int] - val metrics = taskMetricsFromJson(json \ "Task Metrics") - (taskId, stageId, stageAttemptId, metrics) + val updates = + (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) + (taskId, stageId, stageAttemptId, updates) } - SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics) + SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) } /** --------------------------------------------------------------------- * @@ -647,7 +680,7 @@ private[spark] object JsonProtocol { val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String]) val accumulatedValues = (json \ "Accumulables").extractOpt[List[JValue]] match { - case Some(values) => values.map(accumulableInfoFromJson(_)) + case Some(values) => values.map(accumulableInfoFromJson) case None => Seq[AccumulableInfo]() } @@ -675,7 +708,7 @@ private[spark] object JsonProtocol { val finishTime = (json \ "Finish Time").extract[Long] val failed = (json \ "Failed").extract[Boolean] val accumulables = (json \ "Accumulables").extractOpt[Seq[JValue]] match { - case Some(values) => values.map(accumulableInfoFromJson(_)) + case Some(values) => values.map(accumulableInfoFromJson) case None => Seq[AccumulableInfo]() } @@ -690,11 +723,43 @@ private[spark] object JsonProtocol { def accumulableInfoFromJson(json: JValue): AccumulableInfo = { val id = (json \ "ID").extract[Long] - val name = (json \ "Name").extract[String] - val update = Utils.jsonOption(json \ "Update").map(_.extract[String]) - val value = (json \ "Value").extract[String] + val name = (json \ "Name").extractOpt[String] + val update = Utils.jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) } + val value = Utils.jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) } val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) - AccumulableInfo(id, name, update, value, internal) + val countFailedValues = (json \ "Count Failed Values").extractOpt[Boolean].getOrElse(false) + new AccumulableInfo(id, name, update, value, internal, countFailedValues) + } + + /** + * Deserialize the value of an accumulator from JSON. + * + * For accumulators representing internal task metrics, this looks up the relevant + * [[AccumulatorParam]] to deserialize the value accordingly. For all other + * accumulators, this will simply deserialize the value as a string. + * + * The behavior here must match that of [[accumValueToJson]]. Exposed for testing. + */ + private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = { + import AccumulatorParam._ + if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) { + (value, InternalAccumulator.getParam(name.get)) match { + case (JInt(v), IntAccumulatorParam) => v.toInt + case (JInt(v), LongAccumulatorParam) => v.toLong + case (JString(v), StringAccumulatorParam) => v + case (JArray(v), UpdatedBlockStatusesAccumulatorParam) => + v.map { blockJson => + val id = BlockId((blockJson \ "Block ID").extract[String]) + val status = blockStatusFromJson(blockJson \ "Status") + (id, status) + } + case (v, p) => + throw new IllegalArgumentException(s"unexpected combination of accumulator " + + s"value in JSON ($v) and accumulator param (${p.getClass.getName}) in '${name.get}'") + } + } else { + value.extract[String] + } } def taskMetricsFromJson(json: JValue): TaskMetrics = { @@ -702,7 +767,6 @@ private[spark] object JsonProtocol { return TaskMetrics.empty } val metrics = new TaskMetrics - metrics.setHostname((json \ "Host Name").extract[String]) metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long]) metrics.setResultSize((json \ "Result Size").extract[Long]) @@ -787,10 +851,12 @@ private[spark] object JsonProtocol { val className = (json \ "Class Name").extract[String] val description = (json \ "Description").extract[String] val stackTrace = stackTraceFromJson(json \ "Stack Trace") - val fullStackTrace = Utils.jsonOption(json \ "Full Stack Trace"). - map(_.extract[String]).orNull - val metrics = Utils.jsonOption(json \ "Metrics").map(taskMetricsFromJson) - ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None) + val fullStackTrace = (json \ "Full Stack Trace").extractOpt[String].orNull + // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x + val accumUpdates = Utils.jsonOption(json \ "Accumulator Updates") + .map(_.extract[List[JValue]].map(accumulableInfoFromJson)) + .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulatorUpdates()) + ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates) case `taskResultLost` => TaskResultLost case `taskKilled` => TaskKilled case `taskCommitDenied` => diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index df9e0502e7..5afd6d6e22 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -682,8 +682,7 @@ private[spark] class ExternalSorter[K, V, C]( context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) - context.internalMetricsToAccumulators( - InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemoryUsedBytes) + context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) lengths } |