diff options
Diffstat (limited to 'core/src/test/scala')
12 files changed, 157 insertions, 220 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 6063476936..5f97e58845 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -28,17 +28,17 @@ import scala.util.control.NonFatal import org.scalatest.Matchers import org.scalatest.exceptions.TestFailedException -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.AccumulatorParam.{ListAccumulatorParam, StringAccumulatorParam} import org.apache.spark.scheduler._ import org.apache.spark.serializer.JavaSerializer class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { - import AccumulatorParam._ + import AccumulatorSuite.createLongAccum override def afterEach(): Unit = { try { - Accumulators.clear() + AccumulatorContext.clear() } finally { super.afterEach() } @@ -59,9 +59,30 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex } } + test("accumulator serialization") { + val ser = new JavaSerializer(new SparkConf).newInstance() + val acc = createLongAccum("x") + acc.add(5) + assert(acc.value == 5) + assert(acc.isAtDriverSide) + + // serialize and de-serialize it, to simulate sending accumulator to executor. + val acc2 = ser.deserialize[LongAccumulator](ser.serialize(acc)) + // value is reset on the executors + assert(acc2.localValue == 0) + assert(!acc2.isAtDriverSide) + + acc2.add(10) + // serialize and de-serialize it again, to simulate sending accumulator back to driver. + val acc3 = ser.deserialize[LongAccumulator](ser.serialize(acc2)) + // value is not reset on the driver + assert(acc3.value == 10) + assert(acc3.isAtDriverSide) + } + test ("basic accumulation") { sc = new SparkContext("local", "test") - val acc : Accumulator[Int] = sc.accumulator(0) + val acc: Accumulator[Int] = sc.accumulator(0) val d = sc.parallelize(1 to 20) d.foreach{x => acc += x} @@ -75,7 +96,7 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex test("value not assignable from tasks") { sc = new SparkContext("local", "test") - val acc : Accumulator[Int] = sc.accumulator(0) + val acc: Accumulator[Int] = sc.accumulator(0) val d = sc.parallelize(1 to 20) an [Exception] should be thrownBy {d.foreach{x => acc.value = x}} @@ -169,14 +190,13 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex System.gc() assert(ref.get.isEmpty) - Accumulators.remove(accId) - assert(!Accumulators.originals.get(accId).isDefined) + AccumulatorContext.remove(accId) + assert(!AccumulatorContext.originals.containsKey(accId)) } test("get accum") { - sc = new SparkContext("local", "test") // Don't register with SparkContext for cleanup - var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true) + var acc = createLongAccum("a") val accId = acc.id val ref = WeakReference(acc) assert(ref.get.isDefined) @@ -188,44 +208,16 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex // Getting a garbage collected accum should throw error intercept[IllegalAccessError] { - Accumulators.get(accId) + AccumulatorContext.get(accId) } // Getting a normal accumulator. Note: this has to be separate because referencing an // accumulator above in an `assert` would keep it from being garbage collected. - val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true) - assert(Accumulators.get(acc2.id) === Some(acc2)) + val acc2 = createLongAccum("b") + assert(AccumulatorContext.get(acc2.id) === Some(acc2)) // Getting an accumulator that does not exist should return None - assert(Accumulators.get(100000).isEmpty) - } - - test("copy") { - val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false) - val acc2 = acc1.copy() - assert(acc1.id === acc2.id) - assert(acc1.value === acc2.value) - assert(acc1.name === acc2.name) - assert(acc1.countFailedValues === acc2.countFailedValues) - assert(acc1 !== acc2) - // Modifying one does not affect the other - acc1.add(44L) - assert(acc1.value === 500L) - assert(acc2.value === 456L) - acc2.add(144L) - assert(acc1.value === 500L) - assert(acc2.value === 600L) - } - - test("register multiple accums with same ID") { - val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true) - // `copy` will create a new Accumulable and register it. - val acc2 = acc1.copy() - assert(acc1 !== acc2) - assert(acc1.id === acc2.id) - // The second one does not override the first one - assert(Accumulators.originals.size === 1) - assert(Accumulators.get(acc1.id) === Some(acc1)) + assert(AccumulatorContext.get(100000).isEmpty) } test("string accumulator param") { @@ -257,38 +249,33 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.setValue(Seq(9, 10)) assert(acc.value === Seq(9, 10)) } - - test("value is reset on the executors") { - val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing")) - val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2")) - val externalAccums = Seq(acc1, acc2) - val taskMetrics = new TaskMetrics - // Set some values; these should not be observed later on the "executors" - acc1.setValue(10) - acc2.setValue(20L) - taskMetrics.testAccum.get.setValue(30L) - // Simulate the task being serialized and sent to the executors. - val dummyTask = new DummyTask(taskMetrics, externalAccums) - val serInstance = new JavaSerializer(new SparkConf).newInstance() - val taskSer = Task.serializeWithDependencies( - dummyTask, mutable.HashMap(), mutable.HashMap(), serInstance) - // Now we're on the executors. - // Deserialize the task and assert that its accumulators are zero'ed out. - val (_, _, _, taskBytes) = Task.deserializeWithDependencies(taskSer) - val taskDeser = serInstance.deserialize[DummyTask]( - taskBytes, Thread.currentThread.getContextClassLoader) - // Assert that executors see only zeros - taskDeser.externalAccums.foreach { a => assert(a.localValue == a.zero) } - taskDeser.metrics.internalAccums.foreach { a => assert(a.localValue == a.zero) } - } - } private[spark] object AccumulatorSuite { - import InternalAccumulator._ /** + * Create a long accumulator and register it to [[AccumulatorContext]]. + */ + def createLongAccum( + name: String, + countFailedValues: Boolean = false, + initValue: Long = 0, + id: Long = AccumulatorContext.newId()): LongAccumulator = { + val acc = new LongAccumulator + acc.setValue(initValue) + acc.metadata = AccumulatorMetadata(id, Some(name), countFailedValues) + AccumulatorContext.register(acc) + acc + } + + /** + * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the + * info as an accumulator update. + */ + def makeInfo(a: NewAccumulator[_, _]): AccumulableInfo = a.toInfo(Some(a.localValue), None) + + /** * Run one or more Spark jobs and verify that in at least one job the peak execution memory * accumulator is updated afterwards. */ @@ -340,7 +327,6 @@ private class SaveInfoListener extends SparkListener { if (jobCompletionCallback != null) { jobCompletionSem.acquire() if (exception != null) { - exception = null throw exception } } @@ -377,13 +363,3 @@ private class SaveInfoListener extends SparkListener { (taskEnd.stageId, taskEnd.stageAttemptId), new ArrayBuffer[TaskInfo]) += taskEnd.taskInfo } } - - -/** - * A dummy [[Task]] that contains internal and external [[Accumulator]]s. - */ -private[spark] class DummyTask( - metrics: TaskMetrics, - val externalAccums: Seq[Accumulator[_]]) extends Task[Int](0, 0, 0, metrics) { - override def runTask(c: TaskContext): Int = 1 -} diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 4d2b3e7f3b..1adc90ab1e 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -211,10 +211,10 @@ class HeartbeatReceiverSuite private def triggerHeartbeat( executorId: String, executorShouldReregister: Boolean): Unit = { - val metrics = new TaskMetrics + val metrics = TaskMetrics.empty val blockManagerId = BlockManagerId(executorId, "localhost", 12345) val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse]( - Heartbeat(executorId, Array(1L -> metrics.accumulatorUpdates()), blockManagerId)) + Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId)) if (executorShouldReregister) { assert(response.reregisterBlockManager) } else { @@ -222,7 +222,7 @@ class HeartbeatReceiverSuite // Additionally verify that the scheduler callback is called with the correct parameters verify(scheduler).executorHeartbeatReceived( Matchers.eq(executorId), - Matchers.eq(Array(1L -> metrics.accumulatorUpdates())), + Matchers.eq(Array(1L -> metrics.accumulators())), Matchers.eq(blockManagerId)) } } diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index b074b95424..e4474bb813 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics @@ -29,7 +30,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { override def afterEach(): Unit = { try { - Accumulators.clear() + AccumulatorContext.clear() } finally { super.afterEach() } @@ -37,9 +38,8 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { test("internal accumulators in TaskContext") { val taskContext = TaskContext.empty() - val accumUpdates = taskContext.taskMetrics.accumulatorUpdates() + val accumUpdates = taskContext.taskMetrics.accumulators() assert(accumUpdates.size > 0) - assert(accumUpdates.forall(_.internal)) val testAccum = taskContext.taskMetrics.testAccum.get assert(accumUpdates.exists(_.id == testAccum.id)) } @@ -51,7 +51,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { sc.addSparkListener(listener) // Have each task add 1 to the internal accumulator val rdd = sc.parallelize(1 to 100, numPartitions).mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 1 + TaskContext.get().taskMetrics().testAccum.get.add(1) iter } // Register asserts in job completion callback to avoid flakiness @@ -87,17 +87,17 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { val rdd = sc.parallelize(1 to 100, numPartitions) .map { i => (i, i) } .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 1 + TaskContext.get().taskMetrics().testAccum.get.add(1) iter } .reduceByKey { case (x, y) => x + y } .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 10 + TaskContext.get().taskMetrics().testAccum.get.add(10) iter } .repartition(numPartitions * 2) .mapPartitions { iter => - TaskContext.get().taskMetrics().testAccum.get += 100 + TaskContext.get().taskMetrics().testAccum.get.add(100) iter } // Register asserts in job completion callback to avoid flakiness @@ -127,7 +127,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { // This should retry both stages in the scheduler. Note that we only want to fail the // first stage attempt because we want the stage to eventually succeed. val x = sc.parallelize(1 to 100, numPartitions) - .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get += 1; iter } + .mapPartitions { iter => TaskContext.get().taskMetrics().testAccum.get.add(1); iter } .groupBy(identity) val sid = x.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle.shuffleId val rdd = x.mapPartitionsWithIndex { case (i, iter) => @@ -183,18 +183,18 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private val myCleaner = new SaveAccumContextCleaner(this) override def cleaner: Option[ContextCleaner] = Some(myCleaner) } - assert(Accumulators.originals.isEmpty) + assert(AccumulatorContext.originals.isEmpty) sc.parallelize(1 to 100).map { i => (i, i) }.reduceByKey { _ + _ }.count() val numInternalAccums = TaskMetrics.empty.internalAccums.length // We ran 2 stages, so we should have 2 sets of internal accumulators, 1 for each stage - assert(Accumulators.originals.size === numInternalAccums * 2) + assert(AccumulatorContext.originals.size === numInternalAccums * 2) val accumsRegistered = sc.cleaner match { case Some(cleaner: SaveAccumContextCleaner) => cleaner.accumsRegisteredForCleanup case _ => Seq.empty[Long] } // Make sure the same set of accumulators is registered for cleanup assert(accumsRegistered.size === numInternalAccums * 2) - assert(accumsRegistered.toSet === Accumulators.originals.keys.toSet) + assert(accumsRegistered.toSet === AccumulatorContext.originals.keySet().asScala) } /** @@ -212,7 +212,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { private class SaveAccumContextCleaner(sc: SparkContext) extends ContextCleaner(sc) { private val accumsRegistered = new ArrayBuffer[Long] - override def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = { + override def registerAccumulatorForCleanup(a: NewAccumulator[_, _]): Unit = { accumsRegistered += a.id super.registerAccumulatorForCleanup(a) } diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 3228752b96..4aae2c9b4a 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -34,7 +34,7 @@ private[spark] abstract class SparkFunSuite protected override def afterAll(): Unit = { try { // Avoid leaking map entries in tests that use accumulators without SparkContext - Accumulators.clear() + AccumulatorContext.clear() } finally { super.afterAll() } diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index ee70419727..94f6e1a3a7 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -20,14 +20,11 @@ package org.apache.spark.executor import org.scalatest.Assertions import org.apache.spark._ -import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel, TestBlockId} +import org.apache.spark.storage.{BlockStatus, StorageLevel, TestBlockId} class TaskMetricsSuite extends SparkFunSuite { - import AccumulatorParam._ import StorageLevel._ - import TaskMetricsSuite._ test("mutating values") { val tm = new TaskMetrics @@ -59,8 +56,8 @@ class TaskMetricsSuite extends SparkFunSuite { tm.incPeakExecutionMemory(8L) val block1 = (TestBlockId("a"), BlockStatus(MEMORY_ONLY, 1L, 2L)) val block2 = (TestBlockId("b"), BlockStatus(MEMORY_ONLY, 3L, 4L)) - tm.incUpdatedBlockStatuses(Seq(block1)) - tm.incUpdatedBlockStatuses(Seq(block2)) + tm.incUpdatedBlockStatuses(block1) + tm.incUpdatedBlockStatuses(block2) // assert new values exist assert(tm.executorDeserializeTime == 1L) assert(tm.executorRunTime == 2L) @@ -194,18 +191,19 @@ class TaskMetricsSuite extends SparkFunSuite { } test("additional accumulables") { - val tm = new TaskMetrics - val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a")) - val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b")) - val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c")) - val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true) + val tm = TaskMetrics.empty + val acc1 = AccumulatorSuite.createLongAccum("a") + val acc2 = AccumulatorSuite.createLongAccum("b") + val acc3 = AccumulatorSuite.createLongAccum("c") + val acc4 = AccumulatorSuite.createLongAccum("d", true) tm.registerAccumulator(acc1) tm.registerAccumulator(acc2) tm.registerAccumulator(acc3) tm.registerAccumulator(acc4) - acc1 += 1 - acc2 += 2 - val newUpdates = tm.accumulatorUpdates().map { a => (a.id, a) }.toMap + acc1.add(1) + acc2.add(2) + val newUpdates = tm.accumulators() + .map(a => (a.id, a.asInstanceOf[NewAccumulator[Any, Any]])).toMap assert(newUpdates.contains(acc1.id)) assert(newUpdates.contains(acc2.id)) assert(newUpdates.contains(acc3.id)) @@ -214,46 +212,14 @@ class TaskMetricsSuite extends SparkFunSuite { assert(newUpdates(acc2.id).name === Some("b")) assert(newUpdates(acc3.id).name === Some("c")) assert(newUpdates(acc4.id).name === Some("d")) - assert(newUpdates(acc1.id).update === Some(1)) - assert(newUpdates(acc2.id).update === Some(2)) - assert(newUpdates(acc3.id).update === Some(0)) - assert(newUpdates(acc4.id).update === Some(0)) + assert(newUpdates(acc1.id).value === 1) + assert(newUpdates(acc2.id).value === 2) + assert(newUpdates(acc3.id).value === 0) + assert(newUpdates(acc4.id).value === 0) assert(!newUpdates(acc3.id).countFailedValues) assert(newUpdates(acc4.id).countFailedValues) - assert(newUpdates.values.map(_.update).forall(_.isDefined)) - assert(newUpdates.values.map(_.value).forall(_.isEmpty)) assert(newUpdates.size === tm.internalAccums.size + 4) } - - test("from accumulator updates") { - val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a => - AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues) - } - val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1) - assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1) - // Test this with additional accumulators to ensure that we do not crash when handling - // updates from unregistered accumulators. In practice, all accumulators created - // on the driver, internal or not, should be registered with `Accumulators` at some point. - val param = IntAccumulatorParam - val registeredAccums = Seq( - new Accumulator(0, param, Some("a"), countFailedValues = true), - new Accumulator(0, param, Some("b"), countFailedValues = false)) - val unregisteredAccums = Seq( - new Accumulator(0, param, Some("c"), countFailedValues = true), - new Accumulator(0, param, Some("d"), countFailedValues = false)) - registeredAccums.foreach(Accumulators.register) - registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id))) - unregisteredAccums.foreach(a => Accumulators.remove(a.id)) - unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id))) - // set some values in these accums - registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) } - unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) } - val registeredAccumInfos = registeredAccums.map(makeInfo) - val unregisteredAccumInfos = unregisteredAccums.map(makeInfo) - val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos - // Simply checking that this does not crash: - TaskMetrics.fromAccumulatorUpdates(accumUpdates2) - } } @@ -264,21 +230,14 @@ private[spark] object TaskMetricsSuite extends Assertions { * Note: this does NOT check accumulator ID equality. */ def assertUpdatesEquals( - updates1: Seq[AccumulableInfo], - updates2: Seq[AccumulableInfo]): Unit = { + updates1: Seq[NewAccumulator[_, _]], + updates2: Seq[NewAccumulator[_, _]]): Unit = { assert(updates1.size === updates2.size) - updates1.zip(updates2).foreach { case (info1, info2) => + updates1.zip(updates2).foreach { case (acc1, acc2) => // do not assert ID equals here - assert(info1.name === info2.name) - assert(info1.update === info2.update) - assert(info1.value === info2.value) - assert(info1.countFailedValues === info2.countFailedValues) + assert(acc1.name === acc2.name) + assert(acc1.countFailedValues === acc2.countFailedValues) + assert(acc1.value == acc2.value) } } - - /** - * Make an [[AccumulableInfo]] out of an [[Accumulable]] with the intent to use the - * info as an accumulator update. - */ - def makeInfo(a: Accumulable[_, _]): AccumulableInfo = a.toInfo(Some(a.value), None) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b76c0a4bd1..9912d1f3bc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -112,7 +112,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def stop() = {} override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { // normally done by TaskSetManager @@ -277,8 +277,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou taskSet.tasks(i), result._1, result._2, - Seq(new AccumulableInfo( - accumId, Some(""), Some(1), None, internal = false, countFailedValues = false)))) + Seq(AccumulatorSuite.createLongAccum("", initValue = 1, id = accumId)))) } } } @@ -484,7 +483,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -997,10 +996,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // complete two tasks runEvent(makeCompletionEvent( taskSets(0).tasks(0), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0))) + Seq.empty, createFakeTaskInfoWithId(0))) runEvent(makeCompletionEvent( taskSets(0).tasks(1), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1))) + Seq.empty, createFakeTaskInfoWithId(1))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // verify stage exists assert(scheduler.stageIdToStage.contains(0)) @@ -1009,10 +1008,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // finish other 2 tasks runEvent(makeCompletionEvent( taskSets(0).tasks(2), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2))) + Seq.empty, createFakeTaskInfoWithId(2))) runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3))) + Seq.empty, createFakeTaskInfoWithId(3))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 4) @@ -1023,14 +1022,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // with a speculative task and make sure the event is sent out runEvent(makeCompletionEvent( taskSets(0).tasks(3), Success, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5))) + Seq.empty, createFakeTaskInfoWithId(5))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 5) // make sure non successful tasks also send out event runEvent(makeCompletionEvent( taskSets(0).tasks(3), UnknownReason, 42, - Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6))) + Seq.empty, createFakeTaskInfoWithId(6))) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(sparkListener.endedTasks.size == 6) } @@ -1613,37 +1612,43 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou test("accumulator not calculated for resubmitted result stage") { // just for register - val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam) + val accum = AccumulatorSuite.createLongAccum("a") val finalRdd = new MyRDD(sc, 1, Nil) submit(finalRdd, Array(0)) completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) completeWithAccumulator(accum.id, taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) - val accVal = Accumulators.originals(accum.id).get.get.value - - assert(accVal === 1) - + assert(accum.value === 1) assertDataStructuresEmpty() } test("accumulators are updated on exception failures") { - val acc1 = sc.accumulator(0L, "ingenieur") - val acc2 = sc.accumulator(0L, "boulanger") - val acc3 = sc.accumulator(0L, "agriculteur") - assert(Accumulators.get(acc1.id).isDefined) - assert(Accumulators.get(acc2.id).isDefined) - assert(Accumulators.get(acc3.id).isDefined) - val accInfo1 = acc1.toInfo(Some(15L), None) - val accInfo2 = acc2.toInfo(Some(13L), None) - val accInfo3 = acc3.toInfo(Some(18L), None) - val accumUpdates = Seq(accInfo1, accInfo2, accInfo3) - val exceptionFailure = new ExceptionFailure(new SparkException("fondue?"), accumUpdates) + val acc1 = AccumulatorSuite.createLongAccum("ingenieur") + val acc2 = AccumulatorSuite.createLongAccum("boulanger") + val acc3 = AccumulatorSuite.createLongAccum("agriculteur") + assert(AccumulatorContext.get(acc1.id).isDefined) + assert(AccumulatorContext.get(acc2.id).isDefined) + assert(AccumulatorContext.get(acc3.id).isDefined) + val accUpdate1 = new LongAccumulator + accUpdate1.metadata = acc1.metadata + accUpdate1.setValue(15) + val accUpdate2 = new LongAccumulator + accUpdate2.metadata = acc2.metadata + accUpdate2.setValue(13) + val accUpdate3 = new LongAccumulator + accUpdate3.metadata = acc3.metadata + accUpdate3.setValue(18) + val accumUpdates = Seq(accUpdate1, accUpdate2, accUpdate3) + val accumInfo = accumUpdates.map(AccumulatorSuite.makeInfo) + val exceptionFailure = new ExceptionFailure( + new SparkException("fondue?"), + accumInfo).copy(accums = accumUpdates) submit(new MyRDD(sc, 1, Nil), Array(0)) runEvent(makeCompletionEvent(taskSets.head.tasks.head, exceptionFailure, "result")) - assert(Accumulators.get(acc1.id).get.value === 15L) - assert(Accumulators.get(acc2.id).get.value === 13L) - assert(Accumulators.get(acc3.id).get.value === 18L) + assert(AccumulatorContext.get(acc1.id).get.value === 15L) + assert(AccumulatorContext.get(acc2.id).get.value === 13L) + assert(AccumulatorContext.get(acc3.id).get.value === 18L) } test("reduce tasks should be placed locally with map output") { @@ -2007,12 +2012,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou task: Task[_], reason: TaskEndReason, result: Any, - extraAccumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo], + extraAccumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty, taskInfo: TaskInfo = createFakeTaskInfo()): CompletionEvent = { val accumUpdates = reason match { - case Success => task.metrics.accumulatorUpdates() - case ef: ExceptionFailure => ef.accumUpdates - case _ => Seq.empty[AccumulableInfo] + case Success => task.metrics.accumulators() + case ef: ExceptionFailure => ef.accums + case _ => Seq.empty } CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 9971d48a52..16027d944f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -17,12 +17,11 @@ package org.apache.spark.scheduler -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, NewAccumulator, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId -class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext -{ +class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext { test("launch of backend and scheduler") { val conf = new SparkConf().setMaster("myclusterManager"). setAppName("testcm").set("spark.driver.allowMultipleContexts", "true") @@ -68,6 +67,6 @@ private class DummyTaskScheduler extends TaskScheduler { override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, - accumUpdates: Array[(Long, Seq[AccumulableInfo])], + accumUpdates: Array[(Long, Seq[NewAccumulator[_, _]])], blockManagerId: BlockManagerId): Boolean = true } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index d55f6f60ec..9aca4dbc23 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -162,18 +162,17 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark }.count() // The one that counts failed values should be 4x the one that didn't, // since we ran each task 4 times - assert(Accumulators.get(acc1.id).get.value === 40L) - assert(Accumulators.get(acc2.id).get.value === 10L) + assert(AccumulatorContext.get(acc1.id).get.value === 40L) + assert(AccumulatorContext.get(acc2.id).get.value === 10L) } test("failed tasks collect only accumulators whose values count during failures") { sc = new SparkContext("local", "test") - val param = AccumulatorParam.LongAccumulatorParam - val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true) - val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false) + val acc1 = AccumulatorSuite.createLongAccum("x", true) + val acc2 = AccumulatorSuite.createLongAccum("y", false) // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. - val taskMetrics = new TaskMetrics + val taskMetrics = TaskMetrics.empty val task = new Task[Int](0, 0, 0) { context = new TaskContextImpl(0, 0, 0L, 0, new TaskMemoryManager(SparkEnv.get.memoryManager, 0L), @@ -186,12 +185,11 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark } // First, simulate task success. This should give us all the accumulators. val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false) - val accumUpdates2 = (taskMetrics.internalAccums ++ Seq(acc1, acc2)) - .map(TaskMetricsSuite.makeInfo) + val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2) TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2) // Now, simulate task failures. This should give us only the accums that count failed values. val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true) - val accumUpdates4 = (taskMetrics.internalAccums ++ Seq(acc1)).map(TaskMetricsSuite.makeInfo) + val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1) TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index b5385c11a9..9e472f900b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -241,8 +241,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local assert(resultGetter.taskResults.size === 1) val resBefore = resultGetter.taskResults.head val resAfter = captor.getValue - val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update) - val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).flatMap(_.update) + val resSizeBefore = resBefore.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value) + val resSizeAfter = resAfter.accumUpdates.find(_.name == Some(RESULT_SIZE)).map(_.value) assert(resSizeBefore.exists(_ == 0L)) assert(resSizeAfter.exists(_.toString.toLong > 0L)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ecf4b76da5..339fc4254d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -37,7 +37,7 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) task: Task[_], reason: TaskEndReason, result: Any, - accumUpdates: Seq[AccumulableInfo], + accumUpdates: Seq[NewAccumulator[_, _]], taskInfo: TaskInfo) { taskScheduler.endedTasks(taskInfo.index) = reason } @@ -166,8 +166,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskSet = FakeTask.createTaskSet(1) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) - val accumUpdates = - taskSet.tasks.head.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) } + val accumUpdates = taskSet.tasks.head.metrics.internalAccums // Offer a host with NO_PREF as the constraint, // we should get a nopref task immediately since that's what we only have @@ -185,8 +184,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = FakeTask.createTaskSet(3) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - val accumUpdatesByTask: Array[Seq[AccumulableInfo]] = taskSet.tasks.map { task => - task.metrics.internalAccums.map { a => a.toInfo(Some(0L), None) } + val accumUpdatesByTask: Array[Seq[NewAccumulator[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums } // First three offers should all find tasks @@ -792,7 +791,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg private def createTaskResult( id: Int, - accumUpdates: Seq[AccumulableInfo] = Seq.empty[AccumulableInfo]): DirectTaskResult[Int] = { + accumUpdates: Seq[NewAccumulator[_, _]] = Seq.empty): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 221124829f..ce7d51d1c3 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -183,7 +183,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with test("test executor id to summary") { val conf = new SparkConf() val listener = new JobProgressListener(conf) - val taskMetrics = new TaskMetrics() + val taskMetrics = TaskMetrics.empty val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() assert(listener.stageIdToData.size === 0) @@ -230,7 +230,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with test("test task success vs failure counting for different task end reasons") { val conf = new SparkConf() val listener = new JobProgressListener(conf) - val metrics = new TaskMetrics() + val metrics = TaskMetrics.empty val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 val task = new ShuffleMapTask(0) @@ -269,7 +269,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val execId = "exe-1" def makeTaskMetrics(base: Int): TaskMetrics = { - val taskMetrics = new TaskMetrics + val taskMetrics = TaskMetrics.empty val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics val inputMetrics = taskMetrics.inputMetrics @@ -300,9 +300,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( - (1234L, 0, 0, makeTaskMetrics(0).accumulatorUpdates()), - (1235L, 0, 0, makeTaskMetrics(100).accumulatorUpdates()), - (1236L, 1, 0, makeTaskMetrics(200).accumulatorUpdates())))) + (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)), + (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)), + (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo))))) var stage0Data = listener.stageIdToData.get((0, 0)).get var stage1Data = listener.stageIdToData.get((1, 0)).get diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index d3b6cdfe86..6fda7378e6 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -85,7 +85,8 @@ class JsonProtocolSuite extends SparkFunSuite { // Use custom accum ID for determinism val accumUpdates = makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true) - .accumulatorUpdates().zipWithIndex.map { case (a, i) => a.copy(id = i) } + .accumulators().map(AccumulatorSuite.makeInfo) + .zipWithIndex.map { case (a, i) => a.copy(id = i) } SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates))) } @@ -385,7 +386,7 @@ class JsonProtocolSuite extends SparkFunSuite { // "Task Metrics" field, if it exists. val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true) val tmJson = JsonProtocol.taskMetricsToJson(tm) - val accumUpdates = tm.accumulatorUpdates() + val accumUpdates = tm.accumulators().map(AccumulatorSuite.makeInfo) val exception = new SparkException("sentimental") val exceptionFailure = new ExceptionFailure(exception, accumUpdates) val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure) @@ -813,7 +814,7 @@ private[spark] object JsonProtocolSuite extends Assertions { hasHadoopInput: Boolean, hasOutput: Boolean, hasRecords: Boolean = true) = { - val t = new TaskMetrics + val t = TaskMetrics.empty t.setExecutorDeserializeTime(a) t.setExecutorRunTime(b) t.setResultSize(c) |