aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/AccumulatorSuite.scala32
-rw-r--r--core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala8
3 files changed, 26 insertions, 47 deletions
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 454c42517c..6063476936 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -176,11 +176,10 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
test("get accum") {
sc = new SparkContext("local", "test")
// Don't register with SparkContext for cleanup
- var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ var acc = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
val accId = acc.id
val ref = WeakReference(acc)
assert(ref.get.isDefined)
- Accumulators.register(ref.get.get)
// Remove the explicit reference to it and allow weak reference to get garbage collected
acc = null
@@ -194,30 +193,19 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
// Getting a normal accumulator. Note: this has to be separate because referencing an
// accumulator above in an `assert` would keep it from being garbage collected.
- val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true, true)
- Accumulators.register(acc2)
+ val acc2 = new Accumulable[Long, Long](0L, LongAccumulatorParam, None, true)
assert(Accumulators.get(acc2.id) === Some(acc2))
// Getting an accumulator that does not exist should return None
assert(Accumulators.get(100000).isEmpty)
}
- test("only external accums are automatically registered") {
- val accEx = new Accumulator(0, IntAccumulatorParam, Some("external"), internal = false)
- val accIn = new Accumulator(0, IntAccumulatorParam, Some("internal"), internal = true)
- assert(!accEx.isInternal)
- assert(accIn.isInternal)
- assert(Accumulators.get(accEx.id).isDefined)
- assert(Accumulators.get(accIn.id).isEmpty)
- }
-
test("copy") {
- val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), true, false)
+ val acc1 = new Accumulable[Long, Long](456L, LongAccumulatorParam, Some("x"), false)
val acc2 = acc1.copy()
assert(acc1.id === acc2.id)
assert(acc1.value === acc2.value)
assert(acc1.name === acc2.name)
- assert(acc1.isInternal === acc2.isInternal)
assert(acc1.countFailedValues === acc2.countFailedValues)
assert(acc1 !== acc2)
// Modifying one does not affect the other
@@ -230,15 +218,11 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("register multiple accums with same ID") {
- // Make sure these are internal accums so we don't automatically register them already
- val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true, true)
+ val acc1 = new Accumulable[Int, Int](0, IntAccumulatorParam, None, true)
+ // `copy` will create a new Accumulable and register it.
val acc2 = acc1.copy()
assert(acc1 !== acc2)
assert(acc1.id === acc2.id)
- assert(Accumulators.originals.isEmpty)
- assert(Accumulators.get(acc1.id).isEmpty)
- Accumulators.register(acc1)
- Accumulators.register(acc2)
// The second one does not override the first one
assert(Accumulators.originals.size === 1)
assert(Accumulators.get(acc1.id) === Some(acc1))
@@ -275,14 +259,14 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("value is reset on the executors") {
- val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"), internal = false)
- val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"), internal = false)
+ val acc1 = new Accumulator(0, IntAccumulatorParam, Some("thing"))
+ val acc2 = new Accumulator(0L, LongAccumulatorParam, Some("thing2"))
val externalAccums = Seq(acc1, acc2)
val taskMetrics = new TaskMetrics
// Set some values; these should not be observed later on the "executors"
acc1.setValue(10)
acc2.setValue(20L)
- taskMetrics.testAccum.get.asInstanceOf[Accumulator[Long]].setValue(30L)
+ taskMetrics.testAccum.get.setValue(30L)
// Simulate the task being serialized and sent to the executors.
val dummyTask = new DummyTask(taskMetrics, externalAccums)
val serInstance = new JavaSerializer(new SparkConf).newInstance()
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index fbc2fae08d..ee70419727 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -178,11 +178,11 @@ class TaskMetricsSuite extends SparkFunSuite {
val sr1 = tm.createTempShuffleReadMetrics()
val sr2 = tm.createTempShuffleReadMetrics()
val sr3 = tm.createTempShuffleReadMetrics()
- sr1.setRecordsRead(10L)
- sr2.setRecordsRead(10L)
- sr1.setFetchWaitTime(1L)
- sr2.setFetchWaitTime(2L)
- sr3.setFetchWaitTime(3L)
+ sr1.incRecordsRead(10L)
+ sr2.incRecordsRead(10L)
+ sr1.incFetchWaitTime(1L)
+ sr2.incFetchWaitTime(2L)
+ sr3.incFetchWaitTime(3L)
tm.mergeShuffleReadMetrics()
assert(tm.shuffleReadMetrics.remoteBlocksFetched === 0L)
assert(tm.shuffleReadMetrics.recordsRead === 20L)
@@ -198,8 +198,7 @@ class TaskMetricsSuite extends SparkFunSuite {
val acc1 = new Accumulator(0, IntAccumulatorParam, Some("a"))
val acc2 = new Accumulator(0, IntAccumulatorParam, Some("b"))
val acc3 = new Accumulator(0, IntAccumulatorParam, Some("c"))
- val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"),
- internal = true, countFailedValues = true)
+ val acc4 = new Accumulator(0, IntAccumulatorParam, Some("d"), countFailedValues = true)
tm.registerAccumulator(acc1)
tm.registerAccumulator(acc2)
tm.registerAccumulator(acc3)
@@ -219,9 +218,7 @@ class TaskMetricsSuite extends SparkFunSuite {
assert(newUpdates(acc2.id).update === Some(2))
assert(newUpdates(acc3.id).update === Some(0))
assert(newUpdates(acc4.id).update === Some(0))
- assert(!newUpdates(acc3.id).internal)
assert(!newUpdates(acc3.id).countFailedValues)
- assert(newUpdates(acc4.id).internal)
assert(newUpdates(acc4.id).countFailedValues)
assert(newUpdates.values.map(_.update).forall(_.isDefined))
assert(newUpdates.values.map(_.value).forall(_.isEmpty))
@@ -230,7 +227,7 @@ class TaskMetricsSuite extends SparkFunSuite {
test("from accumulator updates") {
val accumUpdates1 = TaskMetrics.empty.internalAccums.map { a =>
- AccumulableInfo(a.id, a.name, Some(3L), None, a.isInternal, a.countFailedValues)
+ AccumulableInfo(a.id, a.name, Some(3L), None, true, a.countFailedValues)
}
val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1)
assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1)
@@ -239,16 +236,15 @@ class TaskMetricsSuite extends SparkFunSuite {
// on the driver, internal or not, should be registered with `Accumulators` at some point.
val param = IntAccumulatorParam
val registeredAccums = Seq(
- new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true),
- new Accumulator(0, param, Some("b"), internal = true, countFailedValues = false),
- new Accumulator(0, param, Some("c"), internal = false, countFailedValues = true),
- new Accumulator(0, param, Some("d"), internal = false, countFailedValues = false))
+ new Accumulator(0, param, Some("a"), countFailedValues = true),
+ new Accumulator(0, param, Some("b"), countFailedValues = false))
val unregisteredAccums = Seq(
- new Accumulator(0, param, Some("e"), internal = true, countFailedValues = true),
- new Accumulator(0, param, Some("f"), internal = true, countFailedValues = false))
+ new Accumulator(0, param, Some("c"), countFailedValues = true),
+ new Accumulator(0, param, Some("d"), countFailedValues = false))
registeredAccums.foreach(Accumulators.register)
- registeredAccums.foreach { a => assert(Accumulators.originals.contains(a.id)) }
- unregisteredAccums.foreach { a => assert(!Accumulators.originals.contains(a.id)) }
+ registeredAccums.foreach(a => assert(Accumulators.originals.contains(a.id)))
+ unregisteredAccums.foreach(a => Accumulators.remove(a.id))
+ unregisteredAccums.foreach(a => assert(!Accumulators.originals.contains(a.id)))
// set some values in these accums
registeredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
unregisteredAccums.zipWithIndex.foreach { case (a, i) => a.setValue(i) }
@@ -276,7 +272,6 @@ private[spark] object TaskMetricsSuite extends Assertions {
assert(info1.name === info2.name)
assert(info1.update === info2.update)
assert(info1.value === info2.value)
- assert(info1.internal === info2.internal)
assert(info1.countFailedValues === info2.countFailedValues)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index bda4c996b2..d55f6f60ec 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -148,8 +148,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
sc = new SparkContext("local[1,4]", "test")
val param = AccumulatorParam.LongAccumulatorParam
// Create 2 accumulators, one that counts failed values and another that doesn't
- val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
// Fail first 3 attempts of every task. This means each task should be run 4 times.
sc.parallelize(1 to 10, 10).map { i =>
acc1 += 1
@@ -169,8 +169,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("failed tasks collect only accumulators whose values count during failures") {
sc = new SparkContext("local", "test")
val param = AccumulatorParam.LongAccumulatorParam
- val acc1 = new Accumulator(0L, param, Some("x"), internal = false, countFailedValues = true)
- val acc2 = new Accumulator(0L, param, Some("y"), internal = false, countFailedValues = false)
+ val acc1 = new Accumulator(0L, param, Some("x"), countFailedValues = true)
+ val acc2 = new Accumulator(0L, param, Some("y"), countFailedValues = false)
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val taskMetrics = new TaskMetrics