aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-21 01:06:22 -0700
committerReynold Xin <rxin@databricks.com>2016-04-21 01:06:22 -0700
commitcb51680d2213ef3443d1c02930c1e76fe6eb2e31 (patch)
treec80396f74f3ea04681932ca6de23c7b77c60f3b8 /core
parent228128ce2571c3058cb13ae502a5328745eeeef5 (diff)
downloadspark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.tar.gz
spark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.tar.bz2
spark-cb51680d2213ef3443d1c02930c1e76fe6eb2e31.zip
[SPARK-14753][CORE] remove internal flag in Accumulable
## What changes were proposed in this pull request? the `Accumulable.internal` flag is only used to avoid registering internal accumulators for 2 certain cases: 1. `TaskMetrics.createTempShuffleReadMetrics`: the accumulators in the temp shuffle read metrics should not be registered. 2. `TaskMetrics.fromAccumulatorUpdates`: the created task metrics is only used to post event, accumulators inside it should not be registered. For 1, we can create a `TempShuffleReadMetrics` that don't create accumulators, just keep the data and merge it at last. For 2, we can un-register these accumulators immediately. TODO: remove `internal` flag in `AccumulableInfo` with followup PR ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12525 from cloud-fan/acc.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulable.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulator.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala8
13 files changed, 96 insertions, 119 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index 601b503d12..e8f053c150 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
/**
- * A data type that can be accumulated, ie has an commutative and associative "add" operation,
+ * A data type that can be accumulated, i.e. has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
*
* You must define how to add data, and how to merge two of these together. For some data types,
@@ -36,19 +36,12 @@ import org.apache.spark.util.Utils
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
- * All accumulators created on the driver to be used on the executors must be registered with
- * [[Accumulators]]. This is already done automatically for accumulators created by the user.
- * Internal accumulators must be explicitly registered by the caller.
- *
* Operations are not thread-safe.
*
* @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
- * @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
- * to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
- * thread safe so that they can be reported correctly.
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true
* for system and time metrics like serialization time or bytes spilled,
* and false for things with absolute values like number of input rows.
@@ -62,7 +55,6 @@ class Accumulable[R, T] private (
@transient private val initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
- internal: Boolean,
private[spark] val countFailedValues: Boolean)
extends Serializable {
@@ -70,41 +62,21 @@ class Accumulable[R, T] private (
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
- internal: Boolean,
countFailedValues: Boolean) = {
- this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
+ this(Accumulators.newId(), initialValue, param, name, countFailedValues)
}
- private[spark] def this(
- initialValue: R,
- param: AccumulableParam[R, T],
- name: Option[String],
- internal: Boolean) = {
- this(initialValue, param, name, internal, false /* countFailedValues */)
+ private[spark] def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = {
+ this(initialValue, param, name, false /* countFailedValues */)
}
- def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
- this(initialValue, param, name, false /* internal */)
-
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)
@volatile @transient private var value_ : R = initialValue // Current value on driver
val zero = param.zero(initialValue) // Zero value to be passed to executors
private var deserialized = false
- // In many places we create internal accumulators without access to the active context cleaner,
- // so if we register them here then we may never unregister these accumulators. To avoid memory
- // leaks, we require the caller to explicitly register internal accumulators elsewhere.
- if (!internal) {
- Accumulators.register(this)
- }
-
- /**
- * If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
- * via heartbeats. For internal [[Accumulable]]s, `R` must be thread safe so that they can be
- * reported correctly.
- */
- private[spark] def isInternal: Boolean = internal
+ Accumulators.register(this)
/**
* Return a copy of this [[Accumulable]].
@@ -114,7 +86,7 @@ class Accumulable[R, T] private (
* same mutable instance around.
*/
private[spark] def copy(): Accumulable[R, T] = {
- new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
+ new Accumulable[R, T](id, initialValue, param, name, countFailedValues)
}
/**
@@ -192,7 +164,8 @@ class Accumulable[R, T] private (
* Create an [[AccumulableInfo]] representation of this [[Accumulable]] with the provided values.
*/
private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value, internal, countFailedValues)
+ val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))
+ new AccumulableInfo(id, name, update, value, isInternal, countFailedValues)
}
// Called by Java when deserializing an object
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 985752933a..0c17f014c9 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -56,7 +56,6 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @param name human-readable name associated with this accumulator
- * @param internal whether this accumulator is used internally within Spark only
* @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
*/
@@ -64,19 +63,9 @@ class Accumulator[T] private[spark] (
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@transient private val initialValue: T,
param: AccumulatorParam[T],
- name: Option[String],
- internal: Boolean,
- private[spark] override val countFailedValues: Boolean = false)
- extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {
-
- def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
- this(initialValue, param, name, false /* internal */)
- }
-
- def this(initialValue: T, param: AccumulatorParam[T]) = {
- this(initialValue, param, None, false /* internal */)
- }
-}
+ name: Option[String] = None,
+ countFailedValues: Boolean = false)
+ extends Accumulable[T, T](initialValue, param, name, countFailedValues)
// TODO: The multi-thread support in accumulators is kind of lame; check
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index e8f83c6d14..43e555670d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -36,7 +36,8 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
localProperties: Properties,
@transient private val metricsSystem: MetricsSystem,
- override val taskMetrics: TaskMetrics = new TaskMetrics)
+ // The default value is only used in tests.
+ override val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
with Logging {
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 8e9a332b7c..f012a74db6 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -101,9 +101,9 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
/**
* Resets the value of the current metrics (`this`) and and merges all the independent
- * [[ShuffleReadMetrics]] into `this`.
+ * [[TempShuffleReadMetrics]] into `this`.
*/
- private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
+ private[spark] def setMergeValues(metrics: Seq[TempShuffleReadMetrics]): Unit = {
_remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
_localBlocksFetched.setValue(_localBlocksFetched.zero)
_remoteBytesRead.setValue(_remoteBytesRead.zero)
@@ -119,5 +119,32 @@ class ShuffleReadMetrics private[spark] () extends Serializable {
_recordsRead.add(metric.recordsRead)
}
}
+}
+/**
+ * A temporary shuffle read metrics holder that is used to collect shuffle read metrics for each
+ * shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at
+ * last.
+ */
+private[spark] class TempShuffleReadMetrics {
+ private[this] var _remoteBlocksFetched = 0
+ private[this] var _localBlocksFetched = 0
+ private[this] var _remoteBytesRead = 0L
+ private[this] var _localBytesRead = 0L
+ private[this] var _fetchWaitTime = 0L
+ private[this] var _recordsRead = 0L
+
+ def incRemoteBlocksFetched(v: Int): Unit = _remoteBlocksFetched += v
+ def incLocalBlocksFetched(v: Int): Unit = _localBlocksFetched += v
+ def incRemoteBytesRead(v: Long): Unit = _remoteBytesRead += v
+ def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
+ def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
+ def incRecordsRead(v: Long): Unit = _recordsRead += v
+
+ def remoteBlocksFetched: Int = _remoteBlocksFetched
+ def localBlocksFetched: Int = _localBlocksFetched
+ def remoteBytesRead: Long = _remoteBytesRead
+ def localBytesRead: Long = _localBytesRead
+ def fetchWaitTime: Long = _fetchWaitTime
+ def recordsRead: Long = _recordsRead
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 4558fbb4d9..8513d053f2 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -143,23 +143,23 @@ class TaskMetrics private[spark] () extends Serializable {
val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics()
/**
- * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
+ * A list of [[TempShuffleReadMetrics]], one per shuffle dependency.
*
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
- * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
- * each dependency and merge these metrics before reporting them to the driver.
+ * issues from readers in different threads, in-progress tasks use a [[TempShuffleReadMetrics]]
+ * for each dependency and merge these metrics before reporting them to the driver.
*/
- @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]
+ @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[TempShuffleReadMetrics]
/**
- * Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency.
+ * Create a [[TempShuffleReadMetrics]] for a particular shuffle dependency.
*
* All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which
* merges the temporary values synchronously. Otherwise, all temporary data collected will
* be lost.
*/
- private[spark] def createTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
- val readMetrics = new ShuffleReadMetrics
+ private[spark] def createTempShuffleReadMetrics(): TempShuffleReadMetrics = synchronized {
+ val readMetrics = new TempShuffleReadMetrics
tempShuffleReadMetrics += readMetrics
readMetrics
}
@@ -195,9 +195,8 @@ class TaskMetrics private[spark] () extends Serializable {
| OTHER THINGS |
* ========================== */
- private[spark] def registerAccums(sc: SparkContext): Unit = {
+ private[spark] def registerForCleanup(sc: SparkContext): Unit = {
internalAccums.foreach { accum =>
- Accumulators.register(accum)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
}
}
@@ -244,7 +243,14 @@ private[spark] class ListenerTaskMetrics(accumUpdates: Seq[AccumulableInfo]) ext
private[spark] object TaskMetrics extends Logging {
- def empty: TaskMetrics = new TaskMetrics
+ /**
+ * Create an empty task metrics that doesn't register its accumulators.
+ */
+ def empty: TaskMetrics = {
+ val metrics = new TaskMetrics
+ metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
+ metrics
+ }
/**
* Create a new accumulator representing an internal task metric.
@@ -253,7 +259,7 @@ private[spark] object TaskMetrics extends Logging {
initialValue: T,
name: String,
param: AccumulatorParam[T]): Accumulator[T] = {
- new Accumulator[T](initialValue, param, Some(name), internal = true, countFailedValues = true)
+ new Accumulator[T](initialValue, param, Some(name), countFailedValues = true)
}
def createLongAccum(name: String): Accumulator[Long] = {
@@ -281,6 +287,9 @@ private[spark] object TaskMetrics extends Logging {
def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
val definedAccumUpdates = accumUpdates.filter(_.update.isDefined)
val metrics = new ListenerTaskMetrics(definedAccumUpdates)
+ // We don't register this [[ListenerTaskMetrics]] for cleanup, and this is only used to post
+ // event, we should un-register all accumulators immediately.
+ metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
definedAccumUpdates.filter(_.internal).foreach { accum =>
metrics.internalAccums.find(_.name == accum.name).foreach(_.setValueAny(accum.update.get))
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index d5cf6b82e8..02185bf631 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -112,7 +112,7 @@ private[scheduler] abstract class Stage(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
- metrics.registerAccums(rdd.sparkContext)
+ metrics.registerForCleanup(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
nextAttemptId += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 58349fe250..c513ed36d1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -36,7 +36,7 @@ class StageInfo(
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String,
- val taskMetrics: TaskMetrics = new TaskMetrics,
+ val taskMetrics: TaskMetrics = null,
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
@@ -81,7 +81,7 @@ private[spark] object StageInfo {
stage: Stage,
attemptId: Int,
numTasks: Option[Int] = None,
- taskMetrics: TaskMetrics = new TaskMetrics,
+ taskMetrics: TaskMetrics = null,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 9f2fa02c69..eb10f3e69b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -46,14 +46,13 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti
* @param partitionId index of the number in the RDD
* @param metrics a [[TaskMetrics]] that is created at driver side and sent to executor side.
* @param localProperties copy of thread-local properties set by the user on the driver side.
- *
- * The default values for `metrics` and `localProperties` are used by tests only.
*/
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
- val metrics: TaskMetrics = new TaskMetrics,
+ // The default value is only used in tests.
+ val metrics: TaskMetrics = TaskMetrics.empty,
@transient var localProperties: Properties = new Properties) extends Serializable {
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 9e4771ce4a..9ab7d96e29 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -326,7 +326,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
- val metrics = new TaskMetrics
+ val metrics = TaskMetrics.empty
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6c50c72a91..a613fbc5cc 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -750,10 +750,10 @@ private[spark] object JsonProtocol {
}
def taskMetricsFromJson(json: JValue): TaskMetrics = {
+ val metrics = TaskMetrics.empty
if (json == JNothing) {
- return TaskMetrics.empty
+ return metrics
}
- val metrics = new TaskMetrics
metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
metrics.setResultSize((json \ "Result Size").extract[Long])
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 454c42517c..6063476936 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -176,11 +176,10 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
test("get accum") {
sc = new SparkContext("local", "test")
// Don't register with SparkContext for cleanup
- var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
val accId = acc.id
val ref = WeakReference(acc)
assert(ref.get.isDefined)
- Accumulators.register(ref.get.get)
// Remove the explicit reference to it and allow weak reference to get garbage collected
acc = null
@@ -194,30 +193,19 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
// Getting a normal accumulator. Note: this has to be separate because referencing an
// accumulator above in an `assert` would keep it from being garbage collected.
- val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true, true)
- Accumulators.register(acc2)
+ val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true)
assert(Accumulators.get(acc2.id) === Some(acc2))
// Getting an accumulator that does not exist should return None
assert(Accumulators.get(100000).isEmpty)
}
- test("only external accums are automatically registered") {
- val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), internal = false)
- val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), internal = true)
- assert(!accEx.isInternal)
- assert(accIn.isInternal)
- assert(Accumulators.get(accEx.id).isDefined)
- assert(Accumulators.get(accIn.id).isEmpty)
- }
-
test("copy") {
- val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), true, false)
+ val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false)
val acc2 = acc1.copy()
assert(acc1.id === acc2.id)
assert(acc1.value === acc2.value)
assert(acc1.name === acc2.name)
- assert(acc1.isInternal === acc2.isInternal)
assert(acc1.countFailedValues === acc2.countFailedValues)
assert(acc1 !== acc2)
// Modifying one does not affect the other
@@ -230,15 +218,11 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("register multiple accums with same ID") {
- // Make sure these are internal accums so we don't automatically register them already
- val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
+ // `copy` will create a new Accumulable and register it.
val acc2 = acc1.copy()
assert(acc1 !== acc2)
assert(acc1.id === acc2.id)
- assert(Accumulators.originals.isEmpty)
- assert(Accumulators.get(acc1.id).isEmpty)
- Accumulators.register(acc1)
- Accumulators.register(acc2)
// The second one does not override the first one
assert(Accumulators.originals.size === 1)
assert(Accumulators.get(acc1.id) === Some(acc1))
@@ -275,14 +259,14 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("value is reset on the executors") {
- val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
- val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
+ val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"))
+ val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"))
val externalAccums = Seq(acc1, acc2)
val taskMetrics = new TaskMetrics
// Set some values; these should not be observed later on the "executors"
acc1.setValue(10)
acc2.setValue(20L)
- taskMetrics.testAccum.get.asInstanceOf[Accumulator[Long]].setValue(30L)
+ taskMetrics.testAccum.get.setValue(30L)
// Simulate the task being serialized and sent to the executors.
val dummyTask = new DummyTask(taskMetrics, externalAccums)
val serInstance = new JavaSerializer(new SparkConf).newInstance()
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index fbc2fae08d..ee70419727 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -178,11 +178,11 @@ class TaskMetricsSuite extends SparkFunSuite {
val sr1 = tm.createTempShuffleReadMetrics()
val sr2 = tm.createTempShuffleReadMetrics()
val sr3 = tm.createTempShuffleReadMetrics()
- sr1.setRecordsRead(10L)
- sr2.setRecordsRead(10L)
- sr1.setFetchWaitTime(1L)
- sr2.setFetchWaitTime(2L)
- sr3.setFetchWaitTime(3L)
+ sr1.incRecordsRead(10L)
+ sr2.incRecordsRead(10L)
+ sr1.incFetchWaitTime(1L)
+ sr2.incFetchWaitTime(2L)
+ sr3.incFetchWaitTime(3L)
tm.mergeShuffleReadMetrics()
assert(tm.shuffleReadMetrics.remoteBlocksFetched === 0L)
assert(tm.shuffleReadMetrics.recordsRead === 20L)
@@ -198,8 +198,7 @@ class TaskMetricsSuite extends SparkFunSuite {
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
- val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"),
- internal = true, countFailedValues = true)
+ val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true)
tm.registerAccumulator(acc1)
tm.registerAccumulator(acc2)
tm.registerAccumulator(acc3)
@@ -219,9 +218,7 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(newUpdates(acc2.id).update === Some(2))
assert(newUpdates(acc3.id).update === Some(0))
assert(newUpdates(acc4.id).update === Some(0))
- assert(!newUpdates(acc3.id).internal)
assert(!newUpdates(acc3.id).countFailedValues)
- assert(newUpdates(acc4.id).internal)
assert(newUpdates(acc4.id).countFailedValues)
assert(newUpdates.values.map(_.update).forall(_.isDefined))
assert(newUpdates.values.map(_.value).forall(_.isEmpty))
@@ -230,7 +227,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("from accumulator updates") {
val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a =>
- AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
+ AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues)
}
val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
@@ -239,16 +236,15 @@ class TaskMetricsSuite extends SparkFunSuite {
// on the driver, internal or not, should be registered with `Accumulators` at some point.
val param = IntAccumulatorParam
val registeredAccums = Seq(
- new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true),
- new Accumulator(0, param, Some("b"), internal = true, countFailedValues = false),
- new Accumulator(0, param, Some("c"), internal = false, countFailedValues = true),
- new Accumulator(0, param, Some("d"), internal = false, countFailedValues = false))
+ new Accumulator(0, param, Some("a"), countFailedValues = true),
+ new Accumulator(0, param, Some("b"), countFailedValues = false))
val unregisteredAccums = Seq(
- new Accumulator(0, param, Some("e"), internal = true, countFailedValues = true),
- new Accumulator(0, param, Some("f"), internal = true, countFailedValues = false))
+ new Accumulator(0, param, Some("c"), countFailedValues = true),
+ new Accumulator(0, param, Some("d"), countFailedValues = false))
registeredAccums.foreach(Accumulators.register)
- registeredAccums.foreach { a => assert(Accumulators.originals.contains(a.id)) }
- unregisteredAccums.foreach { a => assert(!Accumulators.originals.contains(a.id)) }
+ registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id)))
+ unregisteredAccums.foreach(a => Accumulators.remove(a.id))
+ unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id)))
// set some values in these accums
registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
@@ -276,7 +272,6 @@ private[spark] object TaskMetricsSuite extends Assertions {
assert(info1.name === info2.name)
assert(info1.update === info2.update)
assert(info1.value === info2.value)
- assert(info1.internal === info2.internal)
assert(info1.countFailedValues === info2.countFailedValues)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index bda4c996b2..d55f6f60ec 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -148,8 +148,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
sc = new SparkContext("local[1,4]", "test")
val param = AccumulatorParam.LongAccumulatorParam
// Create 2 accumulators, one that counts failed values and another that doesn't
- val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
// Fail first 3 attempts of every task. This means each task should be run 4 times.
sc.parallelize(1 to 10, 10).map { i =>
acc1 += 1
@@ -169,8 +169,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("failed tasks collect only accumulators whose values count during failures") {
sc = new SparkContext("local", "test")
val param = AccumulatorParam.LongAccumulatorParam
- val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val taskMetrics = new TaskMetrics