aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-28 00:26:39 -0700
committerReynold Xin <rxin@databricks.com>2016-04-28 00:26:39 -0700
commitbf5496dbdac75ea69081c95a92a29771e635ea98 (patch)
treeb6151d946b25171b5bbcd3252aa77f7f7a69f60c /core/src/test
parentbe317d4a90b3ca906fefeb438f89a09b1c7da5a8 (diff)
downloadspark-bf5496dbdac75ea69081c95a92a29771e635ea98.tar.gz
spark-bf5496dbdac75ea69081c95a92a29771e635ea98.tar.bz2
spark-bf5496dbdac75ea69081c95a92a29771e635ea98.zip
[SPARK-14654][CORE] New accumulator API
## What changes were proposed in this pull request? This PR introduces a new accumulator API which is much simpler than before: 1. the type hierarchy is simplified, now we only have an `Accumulator` class 2. Combine `initialValue` and `zeroValue` concepts into just one concept: `zeroValue` 3. there in only one `register` method, the accumulator registration and cleanup registration are combined. 4. the `id`,`name` and `countFailedValues` are combined into an `AccumulatorMetadata`, and is provided during registration. `SQLMetric` is a good example to show the simplicity of this new API. What we break: 1. no `setValue` anymore. In the new API, the intermedia type can be different from the result type, it's very hard to implement a general `setValue` 2. accumulator can't be serialized before registered. Problems need to be addressed in follow-ups: 1. with this new API, `AccumulatorInfo` doesn't make a lot of sense, the partial output is not partial updates, we need to expose the intermediate value. 2. `ExceptionFailure` should not carry the accumulator updates. Why do users care about accumulator updates for failed cases? It looks like we only use this feature to update the internal metrics, how about we sending a heartbeat to update internal metrics after the failure event? 3. the public event `SparkListenerTaskEnd` carries a `TaskMetrics`. Ideally this `TaskMetrics` don't need to carry external accumulators, as the only method of `TaskMetrics` that can access external accumulators is `private[spark]`. However, `SQLListener` use it to retrieve sql metrics. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #12612 from cloud-fan/acc.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala132
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/SparkFunSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala85
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala71
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala7
12 files changed, 157 insertions, 220 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 6063476936..5f97e58845 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -28,17 +28,17 @@ import scala.util.control.NonFatal
import org.scalatest.Matchers
import org.scalatest.exceptions.TestFailedException
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.AccumulatorParam.{ListAccumulatorParam, StringAccumulatorParam}
import org.apache.spark.scheduler._
import org.apache.spark.serializer.JavaSerializer
class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext {
- import AccumulatorParam._
+ import AccumulatorSuite.createLongAccum
override def afterEach(): Unit = {
try {
- Accumulators.clear()
+ AccumulatorContext.clear()
} finally {
super.afterEach()
}
@@ -59,9 +59,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
}
+ test("accumulator serialization") {
+ val ser = new JavaSerializer(new SparkConf).newInstance()
+ val acc = createLongAccum("x")
+ acc.add(5)
+ assert(acc.value == 5)
+ assert(acc.isAtDriverSide)
+
+ // serialize and de-serialize it, to simulate sending accumulator to executor.
+ val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc))
+ // value is reset on the executors
+ assert(acc2.localValue == 0)
+ assert(!acc2.isAtDriverSide)
+
+ acc2.add(10)
+ // serialize and de-serialize it again, to simulate sending accumulator back to driver.
+ val acc3 = ser.deserialize[LongAccumulator](ser.serialize(acc2))
+ // value is not reset on the driver
+ assert(acc3.value == 10)
+ assert(acc3.isAtDriverSide)
+ }
+
test ("basic accumulation") {
sc = new SparkContext("local", "test")
- val acc : Accumulator[Int] = sc.accumulator(0)
+ val acc: Accumulator[Int] = sc.accumulator(0)
val d = sc.parallelize(1 to 20)
d.foreach{x => acc += x}
@@ -75,7 +96,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
test("value not assignable from tasks") {
sc = new SparkContext("local", "test")
- val acc : Accumulator[Int] = sc.accumulator(0)
+ val acc: Accumulator[Int] = sc.accumulator(0)
val d = sc.parallelize(1 to 20)
an [Exception] should be thrownBy {d.foreach{x => acc.value = x}}
@@ -169,14 +190,13 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
System.gc()
assert(ref.get.isEmpty)
- Accumulators.remove(accId)
- assert(!Accumulators.originals.get(accId).isDefined)
+ AccumulatorContext.remove(accId)
+ assert(!AccumulatorContext.originals.containsKey(accId))
}
test("get accum") {
- sc = new SparkContext("local", "test")
// Don't register with SparkContext for cleanup
- var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
+ var acc = createLongAccum("a")
val accId = acc.id
val ref = WeakReference(acc)
assert(ref.get.isDefined)
@@ -188,44 +208,16 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
// Getting a garbage collected accum should throw error
intercept[IllegalAccessError] {
- Accumulators.get(accId)
+ AccumulatorContext.get(accId)
}
// Getting a normal accumulator. Note: this has to be separate because referencing an
// accumulator above in an `assert` would keep it from being garbage collected.
- val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true)
- assert(Accumulators.get(acc2.id) === Some(acc2))
+ val acc2 = createLongAccum("b")
+ assert(AccumulatorContext.get(acc2.id) === Some(acc2))
// Getting an accumulator that does not exist should return None
- assert(Accumulators.get(100000).isEmpty)
- }
-
- test("copy") {
- val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false)
- val acc2 = acc1.copy()
- assert(acc1.id === acc2.id)
- assert(acc1.value === acc2.value)
- assert(acc1.name === acc2.name)
- assert(acc1.countFailedValues === acc2.countFailedValues)
- assert(acc1 !== acc2)
- // Modifying one does not affect the other
- acc1.add(44L)
- assert(acc1.value === 500L)
- assert(acc2.value === 456L)
- acc2.add(144L)
- assert(acc1.value === 500L)
- assert(acc2.value === 600L)
- }
-
- test("register multiple accums with same ID") {
- val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
- // `copy` will create a new Accumulable and register it.
- val acc2 = acc1.copy()
- assert(acc1 !== acc2)
- assert(acc1.id === acc2.id)
- // The second one does not override the first one
- assert(Accumulators.originals.size === 1)
- assert(Accumulators.get(acc1.id) === Some(acc1))
+ assert(AccumulatorContext.get(100000).isEmpty)
}
test("string accumulator param") {
@@ -257,38 +249,33 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
acc.setValue(Seq(9, 10))
assert(acc.value === Seq(9, 10))
}
-
- test("value is reset on the executors") {
- val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"))
- val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"))
- val externalAccums = Seq(acc1, acc2)
- val taskMetrics = new TaskMetrics
- // Set some values; these should not be observed later on the "executors"
- acc1.setValue(10)
- acc2.setValue(20L)
- taskMetrics.testAccum.get.setValue(30L)
- // Simulate the task being serialized and sent to the executors.
- val dummyTask = new DummyTask(taskMetrics, externalAccums)
- val serInstance = new JavaSerializer(new SparkConf).newInstance()
- val taskSer = Task.serializeWithDependencies(
- dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
- // Now we're on the executors.
- // Deserialize the task and assert that its accumulators are zero'ed out.
- val (_, _, _, taskBytes) = Task.deserializeWithDependencies(taskSer)
- val taskDeser = serInstance.deserialize[DummyTask](
- taskBytes, Thread.currentThread.getContextClassLoader)
- // Assert that executors see only zeros
- taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) }
- taskDeser.metrics.internalAccums.foreach { a => assert(a.localValue == a.zero) }
- }
-
}
private[spark] object AccumulatorSuite {
-
import InternalAccumulator._
/**
+ * Create a long accumulator and register it to [[AccumulatorContext]].
+ */
+ def createLongAccum(
+ name: String,
+ countFailedValues: Boolean = false,
+ initValue: Long = 0,
+ id: Long = AccumulatorContext.newId()): LongAccumulator = {
+ val acc = new LongAccumulator
+ acc.setValue(initValue)
+ acc.metadata = AccumulatorMetadata(id, Some(name), countFailedValues)
+ AccumulatorContext.register(acc)
+ acc
+ }
+
+ /**
+ * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
+ * info as an accumulator update.
+ */
+ def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None)
+
+ /**
* Run one or more Spark jobs and verify that in at least one job the peak execution memory
* accumulator is updated afterwards.
*/
@@ -340,7 +327,6 @@ private class SaveInfoListener extends SparkListener {
if (jobCompletionCallback != null) {
jobCompletionSem.acquire()
if (exception != null) {
- exception = null
throw exception
}
}
@@ -377,13 +363,3 @@ private class SaveInfoListener extends SparkListener {
(taskEnd.stageId, taskEnd.stageAttemptId), new ArrayBuffer[TaskInfo]) += taskEnd.taskInfo
}
}
-
-
-/**
- * A dummy [[Task]] that contains internal and external [[Accumulator]]s.
- */
-private[spark] class DummyTask(
- metrics: TaskMetrics,
- val externalAccums: Seq[Accumulator[_]]) extends Task[Int](0, 0, 0, metrics) {
- override def runTask(c: TaskContext): Int = 1
-}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 4d2b3e7f3b..1adc90ab1e 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -211,10 +211,10 @@ class HeartbeatReceiverSuite
private def triggerHeartbeat(
executorId: String,
executorShouldReregister: Boolean): Unit = {
- val metrics = new TaskMetrics
+ val metrics = TaskMetrics.empty
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
- Heartbeat(executorId, Array(1L -> metrics.accumulatorUpdates()), blockManagerId))
+ Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId))
if (executorShouldReregister) {
assert(response.reregisterBlockManager)
} else {
@@ -222,7 +222,7 @@ class HeartbeatReceiverSuite
// Additionally verify that the scheduler callback is called with the correct parameters
verify(scheduler).executorHeartbeatReceived(
Matchers.eq(executorId),
- Matchers.eq(Array(1L -> metrics.accumulatorUpdates())),
+ Matchers.eq(Array(1L -> metrics.accumulators())),
Matchers.eq(blockManagerId))
}
}
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index b074b95424..e4474bb813 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
@@ -29,7 +30,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
override def afterEach(): Unit = {
try {
- Accumulators.clear()
+ AccumulatorContext.clear()
} finally {
super.afterEach()
}
@@ -37,9 +38,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
test("internal accumulators in TaskContext") {
val taskContext = TaskContext.empty()
- val accumUpdates = taskContext.taskMetrics.accumulatorUpdates()
+ val accumUpdates = taskContext.taskMetrics.accumulators()
assert(accumUpdates.size > 0)
- assert(accumUpdates.forall(_.internal))
val testAccum = taskContext.taskMetrics.testAccum.get
assert(accumUpdates.exists(_.id == testAccum.id))
}
@@ -51,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
sc.addSparkListener(listener)
// Have each task add 1 to the internal accumulator
val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 1
+ TaskContext.get().taskMetrics().testAccum.get.add(1)
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -87,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
val rdd = sc.parallelize(1 to 100, numPartitions)
.map { i => (i, i) }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 1
+ TaskContext.get().taskMetrics().testAccum.get.add(1)
iter
}
.reduceByKey { case (x, y) => x + y }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 10
+ TaskContext.get().taskMetrics().testAccum.get.add(10)
iter
}
.repartition(numPartitions * 2)
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 100
+ TaskContext.get().taskMetrics().testAccum.get.add(100)
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -127,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
// This should retry both stages in the scheduler. Note that we only want to fail the
// first stage attempt because we want the stage to eventually succeed.
val x = sc.parallelize(1 to 100, numPartitions)
- .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter }
+ .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get.add(1); iter }
.groupBy(identity)
val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId
val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
@@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private val myCleaner = new SaveAccumContextCleaner(this)
override def cleaner: Option[ContextCleaner] = Some(myCleaner)
}
- assert(Accumulators.originals.isEmpty)
+ assert(AccumulatorContext.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
val numInternalAccums = TaskMetrics.empty.internalAccums.length
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
- assert(Accumulators.originals.size === numInternalAccums * 2)
+ assert(AccumulatorContext.originals.size === numInternalAccums * 2)
val accumsRegistered = sc.cleaner match {
case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
case _ => Seq.empty[Long]
}
// Make sure the same set of accumulators is registered for cleanup
assert(accumsRegistered.size === numInternalAccums * 2)
- assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet)
+ assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala)
}
/**
@@ -212,7 +212,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
private val accumsRegistered = new ArrayBuffer[Long]
- override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+ override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
accumsRegistered += a.id
super.registerAccumulatorForCleanup(a)
}
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 3228752b96..4aae2c9b4a 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -34,7 +34,7 @@ private[spark] abstract class SparkFunSuite
protected override def afterAll(): Unit = {
try {
// Avoid leaking map entries in tests that use accumulators without SparkContext
- Accumulators.clear()
+ AccumulatorContext.clear()
} finally {
super.afterAll()
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index ee70419727..94f6e1a3a7 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -20,14 +20,11 @@ package org.apache.spark.executor
import org.scalatest.Assertions
import org.apache.spark._
-import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId}
+import org.apache.spark.storage.{BlockStatus, StorageLevel, TestBlockId}
class TaskMetricsSuite extends SparkFunSuite {
- import AccumulatorParam._
import StorageLevel._
- import TaskMetricsSuite._
test("mutating values") {
val tm = new TaskMetrics
@@ -59,8 +56,8 @@ class TaskMetricsSuite extends SparkFunSuite {
tm.incPeakExecutionMemory(8L)
val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L))
val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L))
- tm.incUpdatedBlockStatuses(Seq(block1))
- tm.incUpdatedBlockStatuses(Seq(block2))
+ tm.incUpdatedBlockStatuses(block1)
+ tm.incUpdatedBlockStatuses(block2)
// assert new values exist
assert(tm.executorDeserializeTime == 1L)
assert(tm.executorRunTime == 2L)
@@ -194,18 +191,19 @@ class TaskMetricsSuite extends SparkFunSuite {
}
test("additional accumulables") {
- val tm = new TaskMetrics
- val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
- val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
- val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
- val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true)
+ val tm = TaskMetrics.empty
+ val acc1 = AccumulatorSuite.createLongAccum("a")
+ val acc2 = AccumulatorSuite.createLongAccum("b")
+ val acc3 = AccumulatorSuite.createLongAccum("c")
+ val acc4 = AccumulatorSuite.createLongAccum("d", true)
tm.registerAccumulator(acc1)
tm.registerAccumulator(acc2)
tm.registerAccumulator(acc3)
tm.registerAccumulator(acc4)
- acc1 += 1
- acc2 += 2
- val newUpdates = tm.accumulatorUpdates().map { a => (a.id, a) }.toMap
+ acc1.add(1)
+ acc2.add(2)
+ val newUpdates = tm.accumulators()
+ .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap
assert(newUpdates.contains(acc1.id))
assert(newUpdates.contains(acc2.id))
assert(newUpdates.contains(acc3.id))
@@ -214,46 +212,14 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(newUpdates(acc2.id).name === Some("b"))
assert(newUpdates(acc3.id).name === Some("c"))
assert(newUpdates(acc4.id).name === Some("d"))
- assert(newUpdates(acc1.id).update === Some(1))
- assert(newUpdates(acc2.id).update === Some(2))
- assert(newUpdates(acc3.id).update === Some(0))
- assert(newUpdates(acc4.id).update === Some(0))
+ assert(newUpdates(acc1.id).value === 1)
+ assert(newUpdates(acc2.id).value === 2)
+ assert(newUpdates(acc3.id).value === 0)
+ assert(newUpdates(acc4.id).value === 0)
assert(!newUpdates(acc3.id).countFailedValues)
assert(newUpdates(acc4.id).countFailedValues)
- assert(newUpdates.values.map(_.update).forall(_.isDefined))
- assert(newUpdates.values.map(_.value).forall(_.isEmpty))
assert(newUpdates.size === tm.internalAccums.size + 4)
}
-
- test("from accumulator updates") {
- val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a =>
- AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues)
- }
- val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
- assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
- // Test this with additional accumulators to ensure that we do not crash when handling
- // updates from unregistered accumulators. In practice, all accumulators created
- // on the driver, internal or not, should be registered with `Accumulators` at some point.
- val param = IntAccumulatorParam
- val registeredAccums = Seq(
- new Accumulator(0, param, Some("a"), countFailedValues = true),
- new Accumulator(0, param, Some("b"), countFailedValues = false))
- val unregisteredAccums = Seq(
- new Accumulator(0, param, Some("c"), countFailedValues = true),
- new Accumulator(0, param, Some("d"), countFailedValues = false))
- registeredAccums.foreach(Accumulators.register)
- registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id)))
- unregisteredAccums.foreach(a => Accumulators.remove(a.id))
- unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id)))
- // set some values in these accums
- registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
- unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
- val registeredAccumInfos = registeredAccums.map(makeInfo)
- val unregisteredAccumInfos = unregisteredAccums.map(makeInfo)
- val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos
- // Simply checking that this does not crash:
- TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
- }
}
@@ -264,21 +230,14 @@ private[spark] object TaskMetricsSuite extends Assertions {
* Note: this does NOT check accumulator ID equality.
*/
def assertUpdatesEquals(
- updates1: Seq[AccumulableInfo],
- updates2: Seq[AccumulableInfo]): Unit = {
+ updates1: Seq[NewAccumulator[_, _]],
+ updates2: Seq[NewAccumulator[_, _]]): Unit = {
assert(updates1.size === updates2.size)
- updates1.zip(updates2).foreach { case (info1, info2) =>
+ updates1.zip(updates2).foreach { case (acc1, acc2) =>
// do not assert ID equals here
- assert(info1.name === info2.name)
- assert(info1.update === info2.update)
- assert(info1.value === info2.value)
- assert(info1.countFailedValues === info2.countFailedValues)
+ assert(acc1.name === acc2.name)
+ assert(acc1.countFailedValues === acc2.countFailedValues)
+ assert(acc1.value == acc2.value)
}
}
-
- /**
- * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
- * info as an accumulator update.
- */
- def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b76c0a4bd1..9912d1f3bc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def stop() = {}
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
@@ -277,8 +277,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
taskSet.tasks(i),
result._1,
result._2,
- Seq(new AccumulableInfo(
- accumId, Some(""), Some(1), None, internal = false, countFailedValues = false))))
+ Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accumId))))
}
}
}
@@ -484,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
@@ -997,10 +996,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// complete two tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(0), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
+ Seq.empty, createFakeTaskInfoWithId(0)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
+ Seq.empty, createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
@@ -1009,10 +1008,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// finish other 2 tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(2), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
+ Seq.empty, createFakeTaskInfoWithId(2)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
+ Seq.empty, createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 4)
@@ -1023,14 +1022,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// with a speculative task and make sure the event is sent out
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
+ Seq.empty, createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 5)
// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
+ Seq.empty, createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 6)
}
@@ -1613,37 +1612,43 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
test("accumulator not calculated for resubmitted result stage") {
// just for register
- val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
+ val accum = AccumulatorSuite.createLongAccum("a")
val finalRdd = new MyRDD(sc, 1, Nil)
submit(finalRdd, Array(0))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- val accVal = Accumulators.originals(accum.id).get.get.value
-
- assert(accVal === 1)
-
+ assert(accum.value === 1)
assertDataStructuresEmpty()
}
test("accumulators are updated on exception failures") {
- val acc1 = sc.accumulator(0L, "ingenieur")
- val acc2 = sc.accumulator(0L, "boulanger")
- val acc3 = sc.accumulator(0L, "agriculteur")
- assert(Accumulators.get(acc1.id).isDefined)
- assert(Accumulators.get(acc2.id).isDefined)
- assert(Accumulators.get(acc3.id).isDefined)
- val accInfo1 = acc1.toInfo(Some(15L), None)
- val accInfo2 = acc2.toInfo(Some(13L), None)
- val accInfo3 = acc3.toInfo(Some(18L), None)
- val accumUpdates = Seq(accInfo1, accInfo2, accInfo3)
- val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates)
+ val acc1 = AccumulatorSuite.createLongAccum("ingenieur")
+ val acc2 = AccumulatorSuite.createLongAccum("boulanger")
+ val acc3 = AccumulatorSuite.createLongAccum("agriculteur")
+ assert(AccumulatorContext.get(acc1.id).isDefined)
+ assert(AccumulatorContext.get(acc2.id).isDefined)
+ assert(AccumulatorContext.get(acc3.id).isDefined)
+ val accUpdate1 = new LongAccumulator
+ accUpdate1.metadata = acc1.metadata
+ accUpdate1.setValue(15)
+ val accUpdate2 = new LongAccumulator
+ accUpdate2.metadata = acc2.metadata
+ accUpdate2.setValue(13)
+ val accUpdate3 = new LongAccumulator
+ accUpdate3.metadata = acc3.metadata
+ accUpdate3.setValue(18)
+ val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3)
+ val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo)
+ val exceptionFailure = new ExceptionFailure(
+ new SparkException("fondue?"),
+ accumInfo).copy(accums = accumUpdates)
submit(new MyRDD(sc, 1, Nil), Array(0))
runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result"))
- assert(Accumulators.get(acc1.id).get.value === 15L)
- assert(Accumulators.get(acc2.id).get.value === 13L)
- assert(Accumulators.get(acc3.id).get.value === 18L)
+ assert(AccumulatorContext.get(acc1.id).get.value === 15L)
+ assert(AccumulatorContext.get(acc2.id).get.value === 13L)
+ assert(AccumulatorContext.get(acc3.id).get.value === 18L)
}
test("reduce tasks should be placed locally with map output") {
@@ -2007,12 +2012,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
task: Task[_],
reason: TaskEndReason,
result: Any,
- extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
+ extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty,
taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
val accumUpdates = reason match {
- case Success => task.metrics.accumulatorUpdates()
- case ef: ExceptionFailure => ef.accumUpdates
- case _ => Seq.empty[AccumulableInfo]
+ case Success => task.metrics.accumulators()
+ case ef: ExceptionFailure => ef.accums
+ case _ => Seq.empty
}
CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 9971d48a52..16027d944f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -17,12 +17,11 @@
package org.apache.spark.scheduler
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
-class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
-{
+class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
test("launch of backend and scheduler") {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
@@ -68,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler {
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index d55f6f60ec..9aca4dbc23 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -162,18 +162,17 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}.count()
// The one that counts failed values should be 4x the one that didn't,
// since we ran each task 4 times
- assert(Accumulators.get(acc1.id).get.value === 40L)
- assert(Accumulators.get(acc2.id).get.value === 10L)
+ assert(AccumulatorContext.get(acc1.id).get.value === 40L)
+ assert(AccumulatorContext.get(acc2.id).get.value === 10L)
}
test("failed tasks collect only accumulators whose values count during failures") {
sc = new SparkContext("local", "test")
- val param = AccumulatorParam.LongAccumulatorParam
- val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
+ val acc1 = AccumulatorSuite.createLongAccum("x", true)
+ val acc2 = AccumulatorSuite.createLongAccum("y", false)
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
- val taskMetrics = new TaskMetrics
+ val taskMetrics = TaskMetrics.empty
val task = new Task[Int](0, 0, 0) {
context = new TaskContextImpl(0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
@@ -186,12 +185,11 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}
// First, simulate task success. This should give us all the accumulators.
val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
- val accumUpdates2 = (taskMetrics.internalAccums ++ Seq(acc1, acc2))
- .map(TaskMetricsSuite.makeInfo)
+ val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
// Now, simulate task failures. This should give us only the accums that count failed values.
val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
- val accumUpdates4 = (taskMetrics.internalAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
+ val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index b5385c11a9..9e472f900b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -241,8 +241,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
assert(resultGetter.taskResults.size === 1)
val resBefore = resultGetter.taskResults.head
val resAfter = captor.getValue
- val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
- val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
+ val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value)
+ val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value)
assert(resSizeBefore.exists(_ == 0L))
assert(resSizeAfter.exists(_.toString.toLong > 0L))
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ecf4b76da5..339fc4254d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Seq[AccumulableInfo],
+ accumUpdates: Seq[NewAccumulator[_, _]],
taskInfo: TaskInfo) {
taskScheduler.endedTasks(taskInfo.index) = reason
}
@@ -166,8 +166,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
- val accumUpdates =
- taskSet.tasks.head.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
+ val accumUpdates = taskSet.tasks.head.metrics.internalAccums
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
@@ -185,8 +184,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
- val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
- task.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
+ val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task =>
+ task.metrics.internalAccums
}
// First three offers should all find tasks
@@ -792,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
private def createTaskResult(
id: Int,
- accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]): DirectTaskResult[Int] = {
+ accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 221124829f..ce7d51d1c3 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -183,7 +183,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
test("test executor id to summary") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
- val taskMetrics = new TaskMetrics()
+ val taskMetrics = TaskMetrics.empty
val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
assert(listener.stageIdToData.size === 0)
@@ -230,7 +230,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
test("test task success vs failure counting for different task end reasons") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
- val metrics = new TaskMetrics()
+ val metrics = TaskMetrics.empty
val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0)
@@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"
def makeTaskMetrics(base: Int): TaskMetrics = {
- val taskMetrics = new TaskMetrics
+ val taskMetrics = TaskMetrics.empty
val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
val inputMetrics = taskMetrics.inputMetrics
@@ -300,9 +300,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
- (1234L, 0, 0, makeTaskMetrics(0).accumulatorUpdates()),
- (1235L, 0, 0, makeTaskMetrics(100).accumulatorUpdates()),
- (1236L, 1, 0, makeTaskMetrics(200).accumulatorUpdates()))))
+ (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)),
+ (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)),
+ (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo)))))
var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d3b6cdfe86..6fda7378e6 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -85,7 +85,8 @@ class JsonProtocolSuite extends SparkFunSuite {
// Use custom accum ID for determinism
val accumUpdates =
makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
- .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) }
+ .accumulators().map(AccumulatorSuite.makeInfo)
+ .zipWithIndex.map { case (a, i) => a.copy(id = i) }
SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
}
@@ -385,7 +386,7 @@ class JsonProtocolSuite extends SparkFunSuite {
// "Task Metrics" field, if it exists.
val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
val tmJson = JsonProtocol.taskMetricsToJson(tm)
- val accumUpdates = tm.accumulatorUpdates()
+ val accumUpdates = tm.accumulators().map(AccumulatorSuite.makeInfo)
val exception = new SparkException("sentimental")
val exceptionFailure = new ExceptionFailure(exception, accumUpdates)
val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure)
@@ -813,7 +814,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
hasHadoopInput: Boolean,
hasOutput: Boolean,
hasRecords: Boolean = true) = {
- val t = new TaskMetrics
+ val t = TaskMetrics.empty
t.setExecutorDeserializeTime(a)
t.setExecutorRunTime(b)
t.setResultSize(c)