aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-05-10 11:16:31 -0700
committerAndrew Or <andrew@databricks.com>2016-05-10 11:16:56 -0700
commitbcfee153b1cacfe617e602f3b72c0877e0bdf1f7 (patch)
tree7d8aa24d2213d098901547d3b81390fb67bef2be /core/src/test
parent0b9cae42426e14060bc6182c037fd715f35a2d23 (diff)
downloadspark-bcfee153b1cacfe617e602f3b72c0877e0bdf1f7.tar.gz
spark-bcfee153b1cacfe617e602f3b72c0877e0bdf1f7.tar.bz2
spark-bcfee153b1cacfe617e602f3b72c0877e0bdf1f7.zip
[SPARK-12837][CORE] reduce network IO for accumulators
Sending un-updated accumulators back to driver makes no sense, as merging a zero value accumulator is a no-op. We should only send back updated accumulators, to save network IO. new test in `TaskContextSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #12899 from cloud-fan/acc.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala37
1 files changed, 30 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 9aca4dbc23..368668bc7e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -168,8 +168,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
test("failed tasks collect only accumulators whose values count during failures") {
sc = new SparkContext("local", "test")
- val acc1 = AccumulatorSuite.createLongAccum("x", true)
- val acc2 = AccumulatorSuite.createLongAccum("y", false)
+ val acc1 = AccumulatorSuite.createLongAccum("x", false)
+ val acc2 = AccumulatorSuite.createLongAccum("y", true)
+ acc1.add(1)
+ acc2.add(1)
// Create a dummy task. We won't end up running this; we just want to collect
// accumulator updates from it.
val taskMetrics = TaskMetrics.empty
@@ -185,12 +187,33 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
}
// First, simulate task success. This should give us all the accumulators.
val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false)
- val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2)
- TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates1.takeRight(2), Seq(acc1, acc2))
// Now, simulate task failures. This should give us only the accums that count failed values.
- val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true)
- val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1)
- TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4)
+ val accumUpdates2 = task.collectAccumulatorUpdates(taskFailed = true)
+ TaskMetricsSuite.assertUpdatesEquals(accumUpdates2.takeRight(1), Seq(acc2))
+ }
+
+ test("only updated internal accumulators will be sent back to driver") {
+ sc = new SparkContext("local", "test")
+ // Create a dummy task. We won't end up running this; we just want to collect
+ // accumulator updates from it.
+ val taskMetrics = TaskMetrics.empty
+ val task = new Task[Int](0, 0, 0) {
+ context = new TaskContextImpl(0, 0, 0L, 0,
+ new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
+ new Properties,
+ SparkEnv.get.metricsSystem,
+ taskMetrics)
+ taskMetrics.incMemoryBytesSpilled(10)
+ override def runTask(tc: TaskContext): Int = 0
+ }
+ val updatedAccums = task.collectAccumulatorUpdates()
+ assert(updatedAccums.length == 2)
+ // the RESULT_SIZE accumulator will be sent back anyway.
+ assert(updatedAccums(0).name == Some(InternalAccumulator.RESULT_SIZE))
+ assert(updatedAccums(0).value == 0)
+ assert(updatedAccums(1).name == Some(InternalAccumulator.MEMORY_BYTES_SPILLED))
+ assert(updatedAccums(1).value == 10)
}
test("localProperties are propagated to executors correctly") {