aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala132
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/SparkFunSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala85
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala71
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala7
12 files changed, 157 insertions, 220 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 6063476936..5f97e58845 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -28,17 +28,17 @@ import scala.util.control.NonFatal
import org.scalatest.Matchers
import org.scalatest.exceptions.TestFailedException
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.AccumulatorParam.{ListAccumulatorParam, StringAccumulatorParam}
import org.apache.spark.scheduler._
import org.apache.spark.serializer.JavaSerializer
class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext {
- import AccumulatorParam._
+ import AccumulatorSuite.createLongAccum
override def afterEach(): Unit = {
try {
- Accumulators.clear()
+ AccumulatorContext.clear()
} finally {
super.afterEach()
}
@@ -59,9 +59,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
}
+ test("accumulator serialization") {
+ val ser = new JavaSerializer(new SparkConf).newInstance()
+ val acc = createLongAccum("x")
+ acc.add(5)
+ assert(acc.value == 5)
+ assert(acc.isAtDriverSide)
+
+ // serialize and de-serialize it, to simulate sending accumulator to executor.
+ val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc))
+ // value is reset on the executors
+ assert(acc2.localValue == 0)
+ assert(!acc2.isAtDriverSide)
+
+ acc2.add(10)
+ // serialize and de-serialize it again, to simulate sending accumulator back to driver.
+ val acc3 = ser.deserialize[LongAccumulator](ser.serialize(acc2))
+ // value is not reset on the driver
+ assert(acc3.value == 10)
+ assert(acc3.isAtDriverSide)
+ }
+
test ("basic accumulation") {
sc = new SparkContext("local", "test")
- val acc : Accumulator[Int] = sc.accumulator(0)
+ val acc: Accumulator[Int] = sc.accumulator(0)
val d = sc.parallelize(1 to 20)
d.foreach{x => acc += x}
@@ -75,7 +96,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
test("value not assignable from tasks") {
sc = new SparkContext("local", "test")
- val acc : Accumulator[Int] = sc.accumulator(0)
+ val acc: Accumulator[Int] = sc.accumulator(0)
val d = sc.parallelize(1 to 20)
an [Exception] should be thrownBy {d.foreach{x => acc.value = x}}
@@ -169,14 +190,13 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
System.gc()
assert(ref.get.isEmpty)
- Accumulators.remove(accId)
- assert(!Accumulators.originals.get(accId).isDefined)
+ AccumulatorContext.remove(accId)
+ assert(!AccumulatorContext.originals.containsKey(accId))
}
test("get accum") {
- sc = new SparkContext("local", "test")
// Don't register with SparkContext for cleanup
- var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
+ var acc = createLongAccum("a")
val accId = acc.id
val ref = WeakReference(acc)
assert(ref.get.isDefined)
@@ -188,44 +208,16 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
// Getting a garbage collected accum should throw error
intercept[IllegalAccessError] {
- Accumulators.get(accId)
+ AccumulatorContext.get(accId)
}
// Getting a normal accumulator. Note: this has to be separate because referencing an
// accumulator above in an `assert` would keep it from being garbage collected.
- val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true)
- assert(Accumulators.get(acc2.id) === Some(acc2))
+ val acc2 = createLongAccum("b")
+ assert(AccumulatorContext.get(acc2.id) === Some(acc2))
// Getting an accumulator that does not exist should return None
- assert(Accumulators.get(100000).isEmpty)
- }
-
- test("copy") {
- val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false)
- val acc2 = acc1.copy()
- assert(acc1.id === acc2.id)
- assert(acc1.value === acc2.value)
- assert(acc1.name === acc2.name)
- assert(acc1.countFailedValues === acc2.countFailedValues)
- assert(acc1 !== acc2)
- // Modifying one does not affect the other
- acc1.add(44L)
- assert(acc1.value === 500L)
- assert(acc2.value === 456L)
- acc2.add(144L)
- assert(acc1.value === 500L)
- assert(acc2.value === 600L)
- }
-
- test("register multiple accums with same ID") {
- val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
- // `copy` will create a new Accumulable and register it.
- val acc2 = acc1.copy()
- assert(acc1 !== acc2)
- assert(acc1.id === acc2.id)
- // The second one does not override the first one
- assert(Accumulators.originals.size === 1)
- assert(Accumulators.get(acc1.id) === Some(acc1))
+ assert(AccumulatorContext.get(100000).isEmpty)
}
test("string accumulator param") {
@@ -257,38 +249,33 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
acc.setValue(Seq(9, 10))
assert(acc.value === Seq(9, 10))
}
-
- test("value is reset on the executors") {
- val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"))
- val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"))
- val externalAccums = Seq(acc1, acc2)
- val taskMetrics = new TaskMetrics
- // Set some values; these should not be observed later on the "executors"
- acc1.setValue(10)
- acc2.setValue(20L)
- taskMetrics.testAccum.get.setValue(30L)
- // Simulate the task being serialized and sent to the executors.
- val dummyTask = new DummyTask(taskMetrics, externalAccums)
- val serInstance = new JavaSerializer(new SparkConf).newInstance()
- val taskSer = Task.serializeWithDependencies(
- dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance)
- // Now we're on the executors.
- // Deserialize the task and assert that its accumulators are zero'ed out.
- val (_, _, _, taskBytes) = Task.deserializeWithDependencies(taskSer)
- val taskDeser = serInstance.deserialize[DummyTask](
- taskBytes, Thread.currentThread.getContextClassLoader)
- // Assert that executors see only zeros
- taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) }
- taskDeser.metrics.internalAccums.foreach { a => assert(a.localValue == a.zero) }
- }
-
}
private[spark] object AccumulatorSuite {
-
import InternalAccumulator._
/**
+ * Create a long accumulator and register it to [[AccumulatorContext]].
+ */
+ def createLongAccum(
+ name: String,
+ countFailedValues: Boolean = false,
+ initValue: Long = 0,
+ id: Long = AccumulatorContext.newId()): LongAccumulator = {
+ val acc = new LongAccumulator
+ acc.setValue(initValue)
+ acc.metadata = AccumulatorMetadata(id, Some(name), countFailedValues)
+ AccumulatorContext.register(acc)
+ acc
+ }
+
+ /**
+ * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
+ * info as an accumulator update.
+ */
+ def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None)
+
+ /**
* Run one or more Spark jobs and verify that in at least one job the peak execution memory
* accumulator is updated afterwards.
*/
@@ -340,7 +327,6 @@ private class SaveInfoListener extends SparkListener {
if (jobCompletionCallback != null) {
jobCompletionSem.acquire()
if (exception != null) {
- exception = null
throw exception
}
}
@@ -377,13 +363,3 @@ private class SaveInfoListener extends SparkListener {
(taskEnd.stageId, taskEnd.stageAttemptId), new ArrayBuffer[TaskInfo]) += taskEnd.taskInfo
}
}
-
-
-/**
- * A dummy [[Task]] that contains internal and external [[Accumulator]]s.
- */
-private[spark] class DummyTask(
- metrics: TaskMetrics,
- val externalAccums: Seq[Accumulator[_]]) extends Task[Int](0, 0, 0, metrics) {
- override def runTask(c: TaskContext): Int = 1
-}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 4d2b3e7f3b..1adc90ab1e 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -211,10 +211,10 @@ class HeartbeatReceiverSuite
private def triggerHeartbeat(
executorId: String,
executorShouldReregister: Boolean): Unit = {
- val metrics = new TaskMetrics
+ val metrics = TaskMetrics.empty
val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
- Heartbeat(executorId, Array(1L -> metrics.accumulatorUpdates()), blockManagerId))
+ Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId))
if (executorShouldReregister) {
assert(response.reregisterBlockManager)
} else {
@@ -222,7 +222,7 @@ class HeartbeatReceiverSuite
// Additionally verify that the scheduler callback is called with the correct parameters
verify(scheduler).executorHeartbeatReceived(
Matchers.eq(executorId),
- Matchers.eq(Array(1L -> metrics.accumulatorUpdates())),
+ Matchers.eq(Array(1L -> metrics.accumulators())),
Matchers.eq(blockManagerId))
}
}
diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
index b074b95424..e4474bb813 100644
--- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.executor.TaskMetrics
@@ -29,7 +30,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
override def afterEach(): Unit = {
try {
- Accumulators.clear()
+ AccumulatorContext.clear()
} finally {
super.afterEach()
}
@@ -37,9 +38,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
test("internal accumulators in TaskContext") {
val taskContext = TaskContext.empty()
- val accumUpdates = taskContext.taskMetrics.accumulatorUpdates()
+ val accumUpdates = taskContext.taskMetrics.accumulators()
assert(accumUpdates.size > 0)
- assert(accumUpdates.forall(_.internal))
val testAccum = taskContext.taskMetrics.testAccum.get
assert(accumUpdates.exists(_.id == testAccum.id))
}
@@ -51,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
sc.addSparkListener(listener)
// Have each task add 1 to the internal accumulator
val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 1
+ TaskContext.get().taskMetrics().testAccum.get.add(1)
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -87,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
val rdd = sc.parallelize(1 to 100, numPartitions)
.map { i => (i, i) }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 1
+ TaskContext.get().taskMetrics().testAccum.get.add(1)
iter
}
.reduceByKey { case (x, y) => x + y }
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 10
+ TaskContext.get().taskMetrics().testAccum.get.add(10)
iter
}
.repartition(numPartitions * 2)
.mapPartitions { iter =>
- TaskContext.get().taskMetrics().testAccum.get += 100
+ TaskContext.get().taskMetrics().testAccum.get.add(100)
iter
}
// Register asserts in job completion callback to avoid flakiness
@@ -127,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
// This should retry both stages in the scheduler. Note that we only want to fail the
// first stage attempt because we want the stage to eventually succeed.
val x = sc.parallelize(1 to 100, numPartitions)
- .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter }
+ .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get.add(1); iter }
.groupBy(identity)
val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId
val rdd = x.mapPartitionsWithIndex { case (i, iter) =>
@@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private val myCleaner = new SaveAccumContextCleaner(this)
override def cleaner: Option[ContextCleaner] = Some(myCleaner)
}
- assert(Accumulators.originals.isEmpty)
+ assert(AccumulatorContext.originals.isEmpty)
sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count()
val numInternalAccums = TaskMetrics.empty.internalAccums.length
// We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage
- assert(Accumulators.originals.size === numInternalAccums * 2)
+ assert(AccumulatorContext.originals.size === numInternalAccums * 2)
val accumsRegistered = sc.cleaner match {
case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup
case _ => Seq.empty[Long]
}
// Make sure the same set of accumulators is registered for cleanup
assert(accumsRegistered.size === numInternalAccums * 2)
- assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet)
+ assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala)
}
/**
@@ -212,7 +212,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) {
private val accumsRegistered = new ArrayBuffer[Long]
- override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
+ override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = {
accumsRegistered += a.id
super.registerAccumulatorForCleanup(a)
}
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 3228752b96..4aae2c9b4a 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -34,7 +34,7 @@ private[spark] abstract class SparkFunSuite
protected override def afterAll(): Unit = {
try {
// Avoid leaking map entries in tests that use accumulators without SparkContext
- Accumulators.clear()
+ AccumulatorContext.clear()
} finally {
super.afterAll()
}
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index ee70419727..94f6e1a3a7 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -20,14 +20,11 @@ package org.apache.spark.executor
import org.scalatest.Assertions
import org.apache.spark._
-import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId}
+import org.apache.spark.storage.{BlockStatus, StorageLevel, TestBlockId}
class TaskMetricsSuite extends SparkFunSuite {
- import AccumulatorParam._
import StorageLevel._
- import TaskMetricsSuite._
test("mutating values") {
val tm = new TaskMetrics
@@ -59,8 +56,8 @@ class TaskMetricsSuite extends SparkFunSuite {
tm.incPeakExecutionMemory(8L)
val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L))
val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L))
- tm.incUpdatedBlockStatuses(Seq(block1))
- tm.incUpdatedBlockStatuses(Seq(block2))
+ tm.incUpdatedBlockStatuses(block1)
+ tm.incUpdatedBlockStatuses(block2)
// assert new values exist
assert(tm.executorDeserializeTime == 1L)
assert(tm.executorRunTime == 2L)
@@ -194,18 +191,19 @@ class TaskMetricsSuite extends SparkFunSuite {
}
test("additional accumulables") {
- val tm = new TaskMetrics
- val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
- val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
- val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
- val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true)
+ val tm = TaskMetrics.empty
+ val acc1 = AccumulatorSuite.createLongAccum("a")
+ val acc2 = AccumulatorSuite.createLongAccum("b")
+ val acc3 = AccumulatorSuite.createLongAccum("c")
+ val acc4 = AccumulatorSuite.createLongAccum("d", true)
tm.registerAccumulator(acc1)
tm.registerAccumulator(acc2)
tm.registerAccumulator(acc3)
tm.registerAccumulator(acc4)
- acc1 += 1
- acc2 += 2
- val newUpdates = tm.accumulatorUpdates().map { a => (a.id, a) }.toMap
+ acc1.add(1)
+ acc2.add(2)
+ val newUpdates = tm.accumulators()
+ .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap
assert(newUpdates.contains(acc1.id))
assert(newUpdates.contains(acc2.id))
assert(newUpdates.contains(acc3.id))
@@ -214,46 +212,14 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(newUpdates(acc2.id).name === Some("b"))
assert(newUpdates(acc3.id).name === Some("c"))
assert(newUpdates(acc4.id).name === Some("d"))
- assert(newUpdates(acc1.id).update === Some(1))
- assert(newUpdates(acc2.id).update === Some(2))
- assert(newUpdates(acc3.id).update === Some(0))
- assert(newUpdates(acc4.id).update === Some(0))
+ assert(newUpdates(acc1.id).value === 1)
+ assert(newUpdates(acc2.id).value === 2)
+ assert(newUpdates(acc3.id).value === 0)
+ assert(newUpdates(acc4.id).value === 0)
assert(!newUpdates(acc3.id).countFailedValues)
assert(newUpdates(acc4.id).countFailedValues)
- assert(newUpdates.values.map(_.update).forall(_.isDefined))
- assert(newUpdates.values.map(_.value).forall(_.isEmpty))
assert(newUpdates.size === tm.internalAccums.size + 4)
}
-
- test("from accumulator updates") {
- val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a =>
- AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues)
- }
- val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
- assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
- // Test this with additional accumulators to ensure that we do not crash when handling
- // updates from unregistered accumulators. In practice, all accumulators created
- // on the driver, internal or not, should be registered with `Accumulators` at some point.
- val param = IntAccumulatorParam
- val registeredAccums = Seq(
- new Accumulator(0, param, Some("a"), countFailedValues = true),
- new Accumulator(0, param, Some("b"), countFailedValues = false))
- val unregisteredAccums = Seq(
- new Accumulator(0, param, Some("c"), countFailedValues = true),
- new Accumulator(0, param, Some("d"), countFailedValues = false))
- registeredAccums.foreach(Accumulators.register)
- registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id)))
- unregisteredAccums.foreach(a => Accumulators.remove(a.id))
- unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id)))
- // set some values in these accums
- registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
- unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
- val registeredAccumInfos = registeredAccums.map(makeInfo)
- val unregisteredAccumInfos = unregisteredAccums.map(makeInfo)
- val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos
- // Simply checking that this does not crash:
- TaskMetrics.fromAccumulatorUpdates(accumUpdates2)
- }
}
@@ -264,21 +230,14 @@ private[spark] object TaskMetricsSuite extends Assertions {
* Note: this does NOT check accumulator ID equality.
*/
def assertUpdatesEquals(
- updates1: Seq[AccumulableInfo],
- updates2: Seq[AccumulableInfo]): Unit = {
+ updates1: Seq[NewAccumulator[_, _]],
+ updates2: Seq[NewAccumulator[_, _]]): Unit = {
assert(updates1.size === updates2.size)
- updates1.zip(updates2).foreach { case (info1, info2) =>
+ updates1.zip(updates2).foreach { case (acc1, acc2) =>
// do not assert ID equals here
- assert(info1.name === info2.name)
- assert(info1.update === info2.update)
- assert(info1.value === info2.value)
- assert(info1.countFailedValues === info2.countFailedValues)
+ assert(acc1.name === acc2.name)
+ assert(acc1.countFailedValues === acc2.countFailedValues)
+ assert(acc1.value == acc2.value)
}
}
-
- /**
- * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the
- * info as an accumulator update.
- */
- def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b76c0a4bd1..9912d1f3bc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def stop() = {}
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
@@ -277,8 +277,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
taskSet.tasks(i),
result._1,
result._2,
- Seq(new AccumulableInfo(
- accumId, Some(""), Some(1), None, internal = false, countFailedValues = false))))
+ Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accumId))))
}
}
}
@@ -484,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
override def defaultParallelism(): Int = 2
override def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
override def applicationAttemptId(): Option[String] = None
@@ -997,10 +996,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// complete two tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(0), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
+ Seq.empty, createFakeTaskInfoWithId(0)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
+ Seq.empty, createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
@@ -1009,10 +1008,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// finish other 2 tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(2), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
+ Seq.empty, createFakeTaskInfoWithId(2)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
+ Seq.empty, createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 4)
@@ -1023,14 +1022,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// with a speculative task and make sure the event is sent out
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
+ Seq.empty, createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 5)
// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
- Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
+ Seq.empty, createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 6)
}
@@ -1613,37 +1612,43 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
test("accumulator not calculated for resubmitted result stage") {
// just for register
- val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
+ val accum = AccumulatorSuite.createLongAccum("a")
val finalRdd = new MyRDD(sc, 1, Nil)
submit(finalRdd, Array(0))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
- val accVal = Accumulators.originals(accum.id).get.get.value
-
- assert(accVal === 1)
-
+ assert(accum.value === 1)
assertDataStructuresEmpty()
}
test("accumulators are updated on exception failures") {
- val acc1 = sc.accumulator(0L, "ingenieur")
- val acc2 = sc.accumulator(0L, "boulanger")
- val acc3 = sc.accumulator(0L, "agriculteur")
- assert(Accumulators.get(acc1.id).isDefined)
- assert(Accumulators.get(acc2.id).isDefined)
- assert(Accumulators.get(acc3.id).isDefined)
- val accInfo1 = acc1.toInfo(Some(15L), None)
- val accInfo2 = acc2.toInfo(Some(13L), None)
- val accInfo3 = acc3.toInfo(Some(18L), None)
- val accumUpdates = Seq(accInfo1, accInfo2, accInfo3)
- val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates)
+ val acc1 = AccumulatorSuite.createLongAccum("ingenieur")
+ val acc2 = AccumulatorSuite.createLongAccum("boulanger")
+ val acc3 = AccumulatorSuite.createLongAccum("agriculteur")
+ assert(AccumulatorContext.get(acc1.id).isDefined)
+ assert(AccumulatorContext.get(acc2.id).isDefined)
+ assert(AccumulatorContext.get(acc3.id).isDefined)
+ val accUpdate1 = new LongAccumulator
+ accUpdate1.metadata = acc1.metadata
+ accUpdate1.setValue(15)
+ val accUpdate2 = new LongAccumulator
+ accUpdate2.metadata = acc2.metadata
+ accUpdate2.setValue(13)
+ val accUpdate3 = new LongAccumulator
+ accUpdate3.metadata = acc3.metadata
+ accUpdate3.setValue(18)
+ val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3)
+ val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo)
+ val exceptionFailure = new ExceptionFailure(
+ new SparkException("fondue?"),
+ accumInfo).copy(accums = accumUpdates)
submit(new MyRDD(sc, 1, Nil), Array(0))
runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result"))
- assert(Accumulators.get(acc1.id).get.value === 15L)
- assert(Accumulators.get(acc2.id).get.value === 13L)
- assert(Accumulators.get(acc3.id).get.value === 18L)
+ assert(AccumulatorContext.get(acc1.id).get.value === 15L)
+ assert(AccumulatorContext.get(acc2.id).get.value === 13L)
+ assert(AccumulatorContext.get(acc3.id).get.value === 18L)
}
test("reduce tasks should be placed locally with map output") {
@@ -2007,12 +2012,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
task: Task[_],
reason: TaskEndReason,
result: Any,
- extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo],
+ extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty,
taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = {
val accumUpdates = reason match {
- case Success => task.metrics.accumulatorUpdates()
- case ef: ExceptionFailure => ef.accumUpdates
- case _ => Seq.empty[AccumulableInfo]
+ case Success => task.metrics.accumulators()
+ case ef: ExceptionFailure => ef.accums
+ case _ => Seq.empty
}
CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 9971d48a52..16027d944f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -17,12 +17,11 @@
package org.apache.spark.scheduler
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
-class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
-{
+class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext {
test("launch of backend and scheduler") {
val conf = new SparkConf().setMaster("myclusterManager").
setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
@@ -68,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler {
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
- accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+ accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index d55f6f60ec..9aca4dbc23 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -162,18 +162,17 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}.count()
// The one that counts failed values should be 4x the one that didn't,
// since we ran each task 4 times
- assert(Accumulators.get(acc1.id).get.value === 40L)
- assert(Accumulators.get(acc2.id).get.value === 10L)
+ assert(AccumulatorContext.get(acc1.id).get.value === 40L)
+ assert(AccumulatorContext.get(acc2.id).get.value === 10L)
}
test("failed tasks collect only accumulators whose values count during failures") {
sc = new SparkContext("local", "test")
- val param = AccumulatorParam.LongAccumulatorParam
- val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
+ val acc1 = AccumulatorSuite.createLongAccum("x", true)
+ val acc2 = AccumulatorSuite.createLongAccum("y", false)
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
- val taskMetrics = new TaskMetrics
+ val taskMetrics = TaskMetrics.empty
val task = new Task[Int](0, 0, 0) {
context = new TaskContextImpl(0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
@@ -186,12 +185,11 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}
// First, simulate task success. This should give us all the accumulators.
val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
- val accumUpdates2 = (taskMetrics.internalAccums ++ Seq(acc1, acc2))
- .map(TaskMetricsSuite.makeInfo)
+ val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
// Now, simulate task failures. This should give us only the accums that count failed values.
val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
- val accumUpdates4 = (taskMetrics.internalAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo)
+ val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1)
TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index b5385c11a9..9e472f900b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -241,8 +241,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
assert(resultGetter.taskResults.size === 1)
val resBefore = resultGetter.taskResults.head
val resAfter = captor.getValue
- val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
- val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update)
+ val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value)
+ val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value)
assert(resSizeBefore.exists(_ == 0L))
assert(resSizeAfter.exists(_.toString.toLong > 0L))
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ecf4b76da5..339fc4254d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
task: Task[_],
reason: TaskEndReason,
result: Any,
- accumUpdates: Seq[AccumulableInfo],
+ accumUpdates: Seq[NewAccumulator[_, _]],
taskInfo: TaskInfo) {
taskScheduler.endedTasks(taskInfo.index) = reason
}
@@ -166,8 +166,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
- val accumUpdates =
- taskSet.tasks.head.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
+ val accumUpdates = taskSet.tasks.head.metrics.internalAccums
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
@@ -185,8 +184,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
- val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task =>
- task.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) }
+ val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task =>
+ task.metrics.internalAccums
}
// First three offers should all find tasks
@@ -792,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
private def createTaskResult(
id: Int,
- accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]): DirectTaskResult[Int] = {
+ accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 221124829f..ce7d51d1c3 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -183,7 +183,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
test("test executor id to summary") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
- val taskMetrics = new TaskMetrics()
+ val taskMetrics = TaskMetrics.empty
val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
assert(listener.stageIdToData.size === 0)
@@ -230,7 +230,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
test("test task success vs failure counting for different task end reasons") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
- val metrics = new TaskMetrics()
+ val metrics = TaskMetrics.empty
val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0)
@@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val execId = "exe-1"
def makeTaskMetrics(base: Int): TaskMetrics = {
- val taskMetrics = new TaskMetrics
+ val taskMetrics = TaskMetrics.empty
val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
val inputMetrics = taskMetrics.inputMetrics
@@ -300,9 +300,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
- (1234L, 0, 0, makeTaskMetrics(0).accumulatorUpdates()),
- (1235L, 0, 0, makeTaskMetrics(100).accumulatorUpdates()),
- (1236L, 1, 0, makeTaskMetrics(200).accumulatorUpdates()))))
+ (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)),
+ (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)),
+ (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo)))))
var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d3b6cdfe86..6fda7378e6 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -85,7 +85,8 @@ class JsonProtocolSuite extends SparkFunSuite {
// Use custom accum ID for determinism
val accumUpdates =
makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
- .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) }
+ .accumulators().map(AccumulatorSuite.makeInfo)
+ .zipWithIndex.map { case (a, i) => a.copy(id = i) }
SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
}
@@ -385,7 +386,7 @@ class JsonProtocolSuite extends SparkFunSuite {
// "Task Metrics" field, if it exists.
val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
val tmJson = JsonProtocol.taskMetricsToJson(tm)
- val accumUpdates = tm.accumulatorUpdates()
+ val accumUpdates = tm.accumulators().map(AccumulatorSuite.makeInfo)
val exception = new SparkException("sentimental")
val exceptionFailure = new ExceptionFailure(exception, accumUpdates)
val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure)
@@ -813,7 +814,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
hasHadoopInput: Boolean,
hasOutput: Boolean,
hasRecords: Boolean = true) = {
- val t = new TaskMetrics
+ val t = TaskMetrics.empty
t.setExecutorDeserializeTime(a)
t.setExecutorRunTime(b)
t.setResultSize(c)