diff options
Diffstat (limited to 'core/src/test')
3 files changed, 26 insertions, 47 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 454c42517c..6063476936 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -176,11 +176,10 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex test("get accum") { sc = new SparkContext("local", "test") // Don't register with SparkContext for cleanup - var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true) + var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true) val accId = acc.id val ref = WeakReference(acc) assert(ref.get.isDefined) - Accumulators.register(ref.get.get) // Remove the explicit reference to it and allow weak reference to get garbage collected acc = null @@ -194,30 +193,19 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex // Getting a normal accumulator. Note: this has to be separate because referencing an // accumulator above in an `assert` would keep it from being garbage collected. - val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true, true) - Accumulators.register(acc2) + val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true) assert(Accumulators.get(acc2.id) === Some(acc2)) // Getting an accumulator that does not exist should return None assert(Accumulators.get(100000).isEmpty) } - test("only external accums are automatically registered") { - val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), internal = false) - val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), internal = true) - assert(!accEx.isInternal) - assert(accIn.isInternal) - assert(Accumulators.get(accEx.id).isDefined) - assert(Accumulators.get(accIn.id).isEmpty) - } - test("copy") { - val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), true, false) + val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false) val acc2 = acc1.copy() assert(acc1.id === acc2.id) assert(acc1.value === acc2.value) assert(acc1.name === acc2.name) - assert(acc1.isInternal === acc2.isInternal) assert(acc1.countFailedValues === acc2.countFailedValues) assert(acc1 !== acc2) // Modifying one does not affect the other @@ -230,15 +218,11 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex } test("register multiple accums with same ID") { - // Make sure these are internal accums so we don't automatically register them already - val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true) + val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true) + // `copy` will create a new Accumulable and register it. val acc2 = acc1.copy() assert(acc1 !== acc2) assert(acc1.id === acc2.id) - assert(Accumulators.originals.isEmpty) - assert(Accumulators.get(acc1.id).isEmpty) - Accumulators.register(acc1) - Accumulators.register(acc2) // The second one does not override the first one assert(Accumulators.originals.size === 1) assert(Accumulators.get(acc1.id) === Some(acc1)) @@ -275,14 +259,14 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex } test("value is reset on the executors") { - val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false) - val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false) + val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing")) + val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2")) val externalAccums = Seq(acc1, acc2) val taskMetrics = new TaskMetrics // Set some values; these should not be observed later on the "executors" acc1.setValue(10) acc2.setValue(20L) - taskMetrics.testAccum.get.asInstanceOf[Accumulator[Long]].setValue(30L) + taskMetrics.testAccum.get.setValue(30L) // Simulate the task being serialized and sent to the executors. val dummyTask = new DummyTask(taskMetrics, externalAccums) val serInstance = new JavaSerializer(new SparkConf).newInstance() diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index fbc2fae08d..ee70419727 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -178,11 +178,11 @@ class TaskMetricsSuite extends SparkFunSuite { val sr1 = tm.createTempShuffleReadMetrics() val sr2 = tm.createTempShuffleReadMetrics() val sr3 = tm.createTempShuffleReadMetrics() - sr1.setRecordsRead(10L) - sr2.setRecordsRead(10L) - sr1.setFetchWaitTime(1L) - sr2.setFetchWaitTime(2L) - sr3.setFetchWaitTime(3L) + sr1.incRecordsRead(10L) + sr2.incRecordsRead(10L) + sr1.incFetchWaitTime(1L) + sr2.incFetchWaitTime(2L) + sr3.incFetchWaitTime(3L) tm.mergeShuffleReadMetrics() assert(tm.shuffleReadMetrics.remoteBlocksFetched === 0L) assert(tm.shuffleReadMetrics.recordsRead === 20L) @@ -198,8 +198,7 @@ class TaskMetricsSuite extends SparkFunSuite { val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a")) val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b")) val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c")) - val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), - internal = true, countFailedValues = true) + val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true) tm.registerAccumulator(acc1) tm.registerAccumulator(acc2) tm.registerAccumulator(acc3) @@ -219,9 +218,7 @@ class TaskMetricsSuite extends SparkFunSuite { assert(newUpdates(acc2.id).update === Some(2)) assert(newUpdates(acc3.id).update === Some(0)) assert(newUpdates(acc4.id).update === Some(0)) - assert(!newUpdates(acc3.id).internal) assert(!newUpdates(acc3.id).countFailedValues) - assert(newUpdates(acc4.id).internal) assert(newUpdates(acc4.id).countFailedValues) assert(newUpdates.values.map(_.update).forall(_.isDefined)) assert(newUpdates.values.map(_.value).forall(_.isEmpty)) @@ -230,7 +227,7 @@ class TaskMetricsSuite extends SparkFunSuite { test("from accumulator updates") { val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a => - AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues) + AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues) } val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1) assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1) @@ -239,16 +236,15 @@ class TaskMetricsSuite extends SparkFunSuite { // on the driver, internal or not, should be registered with `Accumulators` at some point. val param = IntAccumulatorParam val registeredAccums = Seq( - new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true), - new Accumulator(0, param, Some("b"), internal = true, countFailedValues = false), - new Accumulator(0, param, Some("c"), internal = false, countFailedValues = true), - new Accumulator(0, param, Some("d"), internal = false, countFailedValues = false)) + new Accumulator(0, param, Some("a"), countFailedValues = true), + new Accumulator(0, param, Some("b"), countFailedValues = false)) val unregisteredAccums = Seq( - new Accumulator(0, param, Some("e"), internal = true, countFailedValues = true), - new Accumulator(0, param, Some("f"), internal = true, countFailedValues = false)) + new Accumulator(0, param, Some("c"), countFailedValues = true), + new Accumulator(0, param, Some("d"), countFailedValues = false)) registeredAccums.foreach(Accumulators.register) - registeredAccums.foreach { a => assert(Accumulators.originals.contains(a.id)) } - unregisteredAccums.foreach { a => assert(!Accumulators.originals.contains(a.id)) } + registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id))) + unregisteredAccums.foreach(a => Accumulators.remove(a.id)) + unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id))) // set some values in these accums registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) } unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) } @@ -276,7 +272,6 @@ private[spark] object TaskMetricsSuite extends Assertions { assert(info1.name === info2.name) assert(info1.update === info2.update) assert(info1.value === info2.value) - assert(info1.internal === info2.internal) assert(info1.countFailedValues === info2.countFailedValues) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index bda4c996b2..d55f6f60ec 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -148,8 +148,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark sc = new SparkContext("local[1,4]", "test") val param = AccumulatorParam.LongAccumulatorParam // Create 2 accumulators, one that counts failed values and another that doesn't - val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true) - val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false) + val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true) + val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false) // Fail first 3 attempts of every task. This means each task should be run 4 times. sc.parallelize(1 to 10, 10).map { i => acc1 += 1 @@ -169,8 +169,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark test("failed tasks collect only accumulators whose values count during failures") { sc = new SparkContext("local", "test") val param = AccumulatorParam.LongAccumulatorParam - val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true) - val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false) + val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true) + val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false) // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. val taskMetrics = new TaskMetrics |