aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-21 01:06:22 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 01:06:22 -0700
commitcb51680d2213ef3443d1c02930c1e76fe6eb2e31 (patch)
treec80396f74f3ea04681932ca6de23c7b77c60f3b8 /core/src/main/scala/org/apache
parent228128ce2571c3058cb13ae502a5328745eeeef5 (diff)
downloadspark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.tar.gz
spark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.tar.bz2
spark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.zip
[SPARK-14753][CORE] remove internal flag in Accumulable
## What changes were proposed in this pull request? the `Accumulable.internal` flag is only used to avoid registering internal accumulators for 2 certain cases: 1. `TaskMetrics.createTempShuffleReadMetrics`: the accumulators in the temp shuffle read metrics should not be registered. 2. `TaskMetrics.fromAccumulatorUpdates`: the created task metrics is only used to post event, accumulators inside it should not be registered. For 1, we can create a `TempShuffleReadMetrics` that don't create accumulators, just keep the data and merge it at last. For 2, we can un-register these accumulators immediately. TODO: remove `internal` flag in `AccumulableInfo` with followup PR ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12525 from cloud-fan/acc.
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
10 files changed, 70 insertions, 72 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index 601b503d12..e8f053c150 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
/**
- * A data type that can be accumulated, ie has an commutative and associative "add" operation,
+ * A data type that can be accumulated, i.e. has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
*
* You must define how to add data, and how to merge two of these together. For some data types,
@@ -36,19 +36,12 @@ import org.apache.spark.util.Utils
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
- * All accumulators created on the driver to be used on the executors must be registered with
- * [[Accumulators]]. This is already done automatically for accumulators created by the user.
- * Internal accumulators must be explicitly registered by the caller.
- *
* Operations are not thread-safe.
*
* @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
- * @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
- * to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
- * thread safe so that they can be reported correctly.
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true
* for system and time metrics like serialization time or bytes spilled,
* and false for things with absolute values like number of input rows.
@@ -62,7 +55,6 @@ class Accumulable[R, T] private (
@transient private val initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
- internal: Boolean,
private[spark] val countFailedValues: Boolean)
extends Serializable {
@@ -70,41 +62,21 @@ class Accumulable[R, T] private (
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
- internal: Boolean,
countFailedValues: Boolean) = {
- this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
+ this(Accumulators.newId(), initialValue, param, name, countFailedValues)
}
- private[spark] def this(
- initialValue: R,
- param: AccumulableParam[R, T],
- name: Option[String],
- internal: Boolean) = {
- this(initialValue, param, name, internal, false /* countFailedValues */)
+ private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = {
+ this(initialValue, param, name, false /* countFailedValues */)
}
- def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
- this(initialValue, param, name, false /* internal */)
-
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
@volatile @transient private var value_ : R = initialValue // Current value on driver
val zero = param.zero(initialValue) // Zero value to be passed to executors
private var deserialized = false
- // In many places we create internal accumulators without access to the active context cleaner,
- // so if we register them here then we may never unregister these accumulators. To avoid memory
- // leaks, we require the caller to explicitly register internal accumulators elsewhere.
- if (!internal) {
- Accumulators.register(this)
- }
-
- /**
- * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
- * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
- * reported correctly.
- */
- private[spark] def isInternal: Boolean = internal
+ Accumulators.register(this)
/**
* Return a copy of this [[Accumulable]].
@@ -114,7 +86,7 @@ class Accumulable[R, T] private (
* same mutable instance around.
*/
private[spark] def copy(): Accumulable[R, T] = {
- new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
+ new Accumulable[R, T](id, initialValue, param, name, countFailedValues)
}
/**
@@ -192,7 +164,8 @@ class Accumulable[R, T] private (
* Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal, countFailedValues)
+ val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
+ new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
}
// Called by Java when deserializing an object
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 985752933a..0c17f014c9 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -56,7 +56,6 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @param name human-readable name associated with this accumulator
- * @param internal whether this accumulator is used internally within Spark only
* @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
*/
@@ -64,19 +63,9 @@ class Accumulator[T] private[spark] (
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@transient private val initialValue: T,
param: AccumulatorParam[T],
- name: Option[String],
- internal: Boolean,
- private[spark] override val countFailedValues: Boolean = false)
- extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
-
- def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
- this(initialValue, param, name, false /* internal */)
- }
-
- def this(initialValue: T, param: AccumulatorParam[T]) = {
- this(initialValue, param, None, false /* internal */)
- }
-}
+ name: Option[String] = None,
+ countFailedValues: Boolean = false)
+ extends Accumulable[T, T](initialValue, param, name, countFailedValues)
// TODO: The multi-thread support in accumulators is kind of lame; check
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index e8f83c6d14..43e555670d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -36,7 +36,8 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
- override val taskMetrics: TaskMetrics = new TaskMetrics)
+ // The default value is only used in tests.
+ override val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
with Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 8e9a332b7c..f012a74db6 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -101,9 +101,9 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
/**
* Resets the value of the current metrics (`this`) and and merges all the independent
- * [[ShuffleReadMetrics]] into `this`.
+ * [[TempShuffleReadMetrics]] into `this`.
*/
- private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
+ private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {
_remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
_localBlocksFetched.setValue(_localBlocksFetched.zero)
_remoteBytesRead.setValue(_remoteBytesRead.zero)
@@ -119,5 +119,32 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
_recordsRead.add(metric.recordsRead)
}
}
+}
+/**
+ * A temporary shuffle read metrics holder that is used to collect shuffle read metrics for each
+ * shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at
+ * last.
+ */
+private[spark] class TempShuffleReadMetrics {
+ private[this] var _remoteBlocksFetched = 0
+ private[this] var _localBlocksFetched = 0
+ private[this] var _remoteBytesRead = 0L
+ private[this] var _localBytesRead = 0L
+ private[this] var _fetchWaitTime = 0L
+ private[this] var _recordsRead = 0L
+
+ def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v
+ def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v
+ def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
+ def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
+ def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
+ def incRecordsRead(v: Long): Unit = _recordsRead += v
+
+ def remoteBlocksFetched: Int = _remoteBlocksFetched
+ def localBlocksFetched: Int = _localBlocksFetched
+ def remoteBytesRead: Long = _remoteBytesRead
+ def localBytesRead: Long = _localBytesRead
+ def fetchWaitTime: Long = _fetchWaitTime
+ def recordsRead: Long = _recordsRead
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 4558fbb4d9..8513d053f2 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -143,23 +143,23 @@ class TaskMetrics private[spark] () extends Serializable {
val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics()
/**
- * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
+ * A list of [[TempShuffleReadMetrics]], one per shuffle dependency.
*
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
- * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
- * each dependency and merge these metrics before reporting them to the driver.
+ * issues from readers in different threads, in-progress tasks use a [[TempShuffleReadMetrics]]
+ * for each dependency and merge these metrics before reporting them to the driver.
*/
- @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]
+ @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[TempShuffleReadMetrics]
/**
- * Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency.
+ * Create a [[TempShuffleReadMetrics]] for a particular shuffle dependency.
*
* All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which
* merges the temporary values synchronously. Otherwise, all temporary data collected will
* be lost.
*/
- private[spark] def createTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
- val readMetrics = new ShuffleReadMetrics
+ private[spark] def createTempShuffleReadMetrics(): TempShuffleReadMetrics = synchronized {
+ val readMetrics = new TempShuffleReadMetrics
tempShuffleReadMetrics += readMetrics
readMetrics
}
@@ -195,9 +195,8 @@ class TaskMetrics private[spark] () extends Serializable {
| OTHER THINGS |
* ========================== */
- private[spark] def registerAccums(sc: SparkContext): Unit = {
+ private[spark] def registerForCleanup(sc: SparkContext): Unit = {
internalAccums.foreach { accum =>
- Accumulators.register(accum)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
}
}
@@ -244,7 +243,14 @@ private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) ext
private[spark] object TaskMetrics extends Logging {
- def empty: TaskMetrics = new TaskMetrics
+ /**
+ * Create an empty task metrics that doesn't register its accumulators.
+ */
+ def empty: TaskMetrics = {
+ val metrics = new TaskMetrics
+ metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
+ metrics
+ }
/**
* Create a new accumulator representing an internal task metric.
@@ -253,7 +259,7 @@ private[spark] object TaskMetrics extends Logging {
initialValue: T,
name: String,
param: AccumulatorParam[T]): Accumulator[T] = {
- new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
+ new Accumulator[T](initialValue, param, Some(name), countFailedValues = true)
}
def createLongAccum(name: String): Accumulator[Long] = {
@@ -281,6 +287,9 @@ private[spark] object TaskMetrics extends Logging {
def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
val definedAccumUpdates = accumUpdates.filter(_.update.isDefined)
val metrics = new ListenerTaskMetrics(definedAccumUpdates)
+ // We don't register this [[ListenerTaskMetrics]] for cleanup, and this is only used to post
+ // event, we should un-register all accumulators immediately.
+ metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
definedAccumUpdates.filter(_.internal).foreach { accum =>
metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index d5cf6b82e8..02185bf631 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -112,7 +112,7 @@ private[scheduler] abstract class Stage(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
- metrics.registerAccums(rdd.sparkContext)
+ metrics.registerForCleanup(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 58349fe250..c513ed36d1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -36,7 +36,7 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
- val taskMetrics: TaskMetrics = new TaskMetrics,
+ val taskMetrics: TaskMetrics = null,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -81,7 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
- taskMetrics: TaskMetrics = new TaskMetrics,
+ taskMetrics: TaskMetrics = null,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 9f2fa02c69..eb10f3e69b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -46,14 +46,13 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
* @param partitionId index of the number in the RDD
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
- *
- * The default values for `metrics` and `localProperties` are used by tests only.
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
- val metrics: TaskMetrics = new TaskMetrics,
+ // The default value is only used in tests.
+ val metrics: TaskMetrics = TaskMetrics.empty,
@transient var localProperties: Properties = new Properties) extends Serializable {
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 9e4771ce4a..9ab7d96e29 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -326,7 +326,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
- val metrics = new TaskMetrics
+ val metrics = TaskMetrics.empty
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6c50c72a91..a613fbc5cc 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -750,10 +750,10 @@ private[spark] object JsonProtocol {
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
+ val metrics = TaskMetrics.empty
if (json == JNothing) {
- return TaskMetrics.empty
+ return metrics
}
- val metrics = new TaskMetrics
metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
metrics.setResultSize((json \ "Result Size").extract[Long])