From cb51680d2213ef3443d1c02930c1e76fe6eb2e31 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 21 Apr 2016 01:06:22 -0700 Subject: [SPARK-14753][CORE] remove internal flag in Accumulable ## What changes were proposed in this pull request? the `Accumulable.internal` flag is only used to avoid registering internal accumulators for 2 certain cases: 1. `TaskMetrics.createTempShuffleReadMetrics`: the accumulators in the temp shuffle read metrics should not be registered. 2. `TaskMetrics.fromAccumulatorUpdates`: the created task metrics is only used to post event, accumulators inside it should not be registered. For 1, we can create a `TempShuffleReadMetrics` that don't create accumulators, just keep the data and merge it at last. For 2, we can un-register these accumulators immediately. TODO: remove `internal` flag in `AccumulableInfo` with followup PR ## How was this patch tested? existing tests. Author: Wenchen Fan Closes #12525 from cloud-fan/acc. --- .../main/scala/org/apache/spark/Accumulable.scala | 43 ++++------------------ .../main/scala/org/apache/spark/Accumulator.scala | 17 ++------- .../scala/org/apache/spark/TaskContextImpl.scala | 3 +- .../apache/spark/executor/ShuffleReadMetrics.scala | 31 +++++++++++++++- .../org/apache/spark/executor/TaskMetrics.scala | 31 ++++++++++------ .../scala/org/apache/spark/scheduler/Stage.scala | 2 +- .../org/apache/spark/scheduler/StageInfo.scala | 4 +- .../scala/org/apache/spark/scheduler/Task.scala | 5 +-- .../apache/spark/ui/jobs/JobProgressListener.scala | 2 +- .../scala/org/apache/spark/util/JsonProtocol.scala | 4 +- 10 files changed, 70 insertions(+), 72 deletions(-) (limited to 'core/src/main/scala/org/apache') diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index 601b503d12..e8f053c150 100644 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils /** - * A data type that can be accumulated, ie has an commutative and associative "add" operation, + * A data type that can be accumulated, i.e. has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * * You must define how to add data, and how to merge two of these together. For some data types, @@ -36,19 +36,12 @@ import org.apache.spark.util.Utils * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. * - * All accumulators created on the driver to be used on the executors must be registered with - * [[Accumulators]]. This is already done automatically for accumulators created by the user. - * Internal accumulators must be explicitly registered by the caller. - * * Operations are not thread-safe. * * @param id ID of this accumulator; for internal use only. * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `R` and `T` * @param name human-readable name for use in Spark's web UI - * @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported - * to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be - * thread safe so that they can be reported correctly. * @param countFailedValues whether to accumulate values from failed tasks. This is set to true * for system and time metrics like serialization time or bytes spilled, * and false for things with absolute values like number of input rows. @@ -62,7 +55,6 @@ class Accumulable[R, T] private ( @transient private val initialValue: R, param: AccumulableParam[R, T], val name: Option[String], - internal: Boolean, private[spark] val countFailedValues: Boolean) extends Serializable { @@ -70,41 +62,21 @@ class Accumulable[R, T] private ( initialValue: R, param: AccumulableParam[R, T], name: Option[String], - internal: Boolean, countFailedValues: Boolean) = { - this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues) + this(Accumulators.newId(), initialValue, param, name, countFailedValues) } - private[spark] def this( - initialValue: R, - param: AccumulableParam[R, T], - name: Option[String], - internal: Boolean) = { - this(initialValue, param, name, internal, false /* countFailedValues */) + private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = { + this(initialValue, param, name, false /* countFailedValues */) } - def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = - this(initialValue, param, name, false /* internal */) - def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) @volatile @transient private var value_ : R = initialValue // Current value on driver val zero = param.zero(initialValue) // Zero value to be passed to executors private var deserialized = false - // In many places we create internal accumulators without access to the active context cleaner, - // so if we register them here then we may never unregister these accumulators. To avoid memory - // leaks, we require the caller to explicitly register internal accumulators elsewhere. - if (!internal) { - Accumulators.register(this) - } - - /** - * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver - * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be - * reported correctly. - */ - private[spark] def isInternal: Boolean = internal + Accumulators.register(this) /** * Return a copy of this [[Accumulable]]. @@ -114,7 +86,7 @@ class Accumulable[R, T] private ( * same mutable instance around. */ private[spark] def copy(): Accumulable[R, T] = { - new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues) + new Accumulable[R, T](id, initialValue, param, name, countFailedValues) } /** @@ -192,7 +164,8 @@ class Accumulable[R, T] private ( * Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values. */ private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - new AccumulableInfo(id, name, update, value, internal, countFailedValues) + val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) } // Called by Java when deserializing an object diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala index 985752933a..0c17f014c9 100644 --- a/core/src/main/scala/org/apache/spark/Accumulator.scala +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -56,7 +56,6 @@ import org.apache.spark.storage.{BlockId, BlockStatus} * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `T` * @param name human-readable name associated with this accumulator - * @param internal whether this accumulator is used internally within Spark only * @param countFailedValues whether to accumulate values from failed tasks * @tparam T result type */ @@ -64,19 +63,9 @@ class Accumulator[T] private[spark] ( // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile @transient private val initialValue: T, param: AccumulatorParam[T], - name: Option[String], - internal: Boolean, - private[spark] override val countFailedValues: Boolean = false) - extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) { - - def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { - this(initialValue, param, name, false /* internal */) - } - - def this(initialValue: T, param: AccumulatorParam[T]) = { - this(initialValue, param, None, false /* internal */) - } -} + name: Option[String] = None, + countFailedValues: Boolean = false) + extends Accumulable[T, T](initialValue, param, name, countFailedValues) // TODO: The multi-thread support in accumulators is kind of lame; check diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index e8f83c6d14..43e555670d 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -36,7 +36,8 @@ private[spark] class TaskContextImpl( override val taskMemoryManager: TaskMemoryManager, localProperties: Properties, @transient private val metricsSystem: MetricsSystem, - override val taskMetrics: TaskMetrics = new TaskMetrics) + // The default value is only used in tests. + override val taskMetrics: TaskMetrics = TaskMetrics.empty) extends TaskContext with Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 8e9a332b7c..f012a74db6 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -101,9 +101,9 @@ class ShuffleReadMetrics private[spark] () extends Serializable { /** * Resets the value of the current metrics (`this`) and and merges all the independent - * [[ShuffleReadMetrics]] into `this`. + * [[TempShuffleReadMetrics]] into `this`. */ - private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = { + private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = { _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero) _localBlocksFetched.setValue(_localBlocksFetched.zero) _remoteBytesRead.setValue(_remoteBytesRead.zero) @@ -119,5 +119,32 @@ class ShuffleReadMetrics private[spark] () extends Serializable { _recordsRead.add(metric.recordsRead) } } +} +/** + * A temporary shuffle read metrics holder that is used to collect shuffle read metrics for each + * shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at + * last. + */ +private[spark] class TempShuffleReadMetrics { + private[this] var _remoteBlocksFetched = 0 + private[this] var _localBlocksFetched = 0 + private[this] var _remoteBytesRead = 0L + private[this] var _localBytesRead = 0L + private[this] var _fetchWaitTime = 0L + private[this] var _recordsRead = 0L + + def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v + def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v + def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v + def incLocalBytesRead(v: Long): Unit = _localBytesRead += v + def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v + def incRecordsRead(v: Long): Unit = _recordsRead += v + + def remoteBlocksFetched: Int = _remoteBlocksFetched + def localBlocksFetched: Int = _localBlocksFetched + def remoteBytesRead: Long = _remoteBytesRead + def localBytesRead: Long = _localBytesRead + def fetchWaitTime: Long = _fetchWaitTime + def recordsRead: Long = _recordsRead } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 4558fbb4d9..8513d053f2 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -143,23 +143,23 @@ class TaskMetrics private[spark] () extends Serializable { val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics() /** - * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency. + * A list of [[TempShuffleReadMetrics]], one per shuffle dependency. * * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization - * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for - * each dependency and merge these metrics before reporting them to the driver. + * issues from readers in different threads, in-progress tasks use a [[TempShuffleReadMetrics]] + * for each dependency and merge these metrics before reporting them to the driver. */ - @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics] + @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[TempShuffleReadMetrics] /** - * Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency. + * Create a [[TempShuffleReadMetrics]] for a particular shuffle dependency. * * All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which * merges the temporary values synchronously. Otherwise, all temporary data collected will * be lost. */ - private[spark] def createTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized { - val readMetrics = new ShuffleReadMetrics + private[spark] def createTempShuffleReadMetrics(): TempShuffleReadMetrics = synchronized { + val readMetrics = new TempShuffleReadMetrics tempShuffleReadMetrics += readMetrics readMetrics } @@ -195,9 +195,8 @@ class TaskMetrics private[spark] () extends Serializable { | OTHER THINGS | * ========================== */ - private[spark] def registerAccums(sc: SparkContext): Unit = { + private[spark] def registerForCleanup(sc: SparkContext): Unit = { internalAccums.foreach { accum => - Accumulators.register(accum) sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum)) } } @@ -244,7 +243,14 @@ private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) ext private[spark] object TaskMetrics extends Logging { - def empty: TaskMetrics = new TaskMetrics + /** + * Create an empty task metrics that doesn't register its accumulators. + */ + def empty: TaskMetrics = { + val metrics = new TaskMetrics + metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id)) + metrics + } /** * Create a new accumulator representing an internal task metric. @@ -253,7 +259,7 @@ private[spark] object TaskMetrics extends Logging { initialValue: T, name: String, param: AccumulatorParam[T]): Accumulator[T] = { - new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true) + new Accumulator[T](initialValue, param, Some(name), countFailedValues = true) } def createLongAccum(name: String): Accumulator[Long] = { @@ -281,6 +287,9 @@ private[spark] object TaskMetrics extends Logging { def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = { val definedAccumUpdates = accumUpdates.filter(_.update.isDefined) val metrics = new ListenerTaskMetrics(definedAccumUpdates) + // We don't register this [[ListenerTaskMetrics]] for cleanup, and this is only used to post + // event, we should un-register all accumulators immediately. + metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id)) definedAccumUpdates.filter(_.internal).foreach { accum => metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index d5cf6b82e8..02185bf631 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -112,7 +112,7 @@ private[scheduler] abstract class Stage( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { val metrics = new TaskMetrics - metrics.registerAccums(rdd.sparkContext) + metrics.registerForCleanup(rdd.sparkContext) _latestInfo = StageInfo.fromStage( this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences) nextAttemptId += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 58349fe250..c513ed36d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -36,7 +36,7 @@ class StageInfo( val rddInfos: Seq[RDDInfo], val parentIds: Seq[Int], val details: String, - val taskMetrics: TaskMetrics = new TaskMetrics, + val taskMetrics: TaskMetrics = null, private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) { /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */ var submissionTime: Option[Long] = None @@ -81,7 +81,7 @@ private[spark] object StageInfo { stage: Stage, attemptId: Int, numTasks: Option[Int] = None, - taskMetrics: TaskMetrics = new TaskMetrics, + taskMetrics: TaskMetrics = null, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 9f2fa02c69..eb10f3e69b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -46,14 +46,13 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti * @param partitionId index of the number in the RDD * @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side. * @param localProperties copy of thread-local properties set by the user on the driver side. - * - * The default values for `metrics` and `localProperties` are used by tests only. */ private[spark] abstract class Task[T]( val stageId: Int, val stageAttemptId: Int, val partitionId: Int, - val metrics: TaskMetrics = new TaskMetrics, + // The default value is only used in tests. + val metrics: TaskMetrics = TaskMetrics.empty, @transient var localProperties: Properties = new Properties) extends Serializable { /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 9e4771ce4a..9ab7d96e29 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -326,7 +326,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized { val taskInfo = taskStart.taskInfo if (taskInfo != null) { - val metrics = new TaskMetrics + val metrics = TaskMetrics.empty val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), { logWarning("Task start for unknown stage " + taskStart.stageId) new StageUIData diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 6c50c72a91..a613fbc5cc 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -750,10 +750,10 @@ private[spark] object JsonProtocol { } def taskMetricsFromJson(json: JValue): TaskMetrics = { + val metrics = TaskMetrics.empty if (json == JNothing) { - return TaskMetrics.empty + return metrics } - val metrics = new TaskMetrics metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long]) metrics.setResultSize((json \ "Result Size").extract[Long]) -- cgit v1.2.3