diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-05-10 11:16:31 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-10 11:16:56 -0700 |
commit | bcfee153b1cacfe617e602f3b72c0877e0bdf1f7 (patch) | |
tree | 7d8aa24d2213d098901547d3b81390fb67bef2be /core/src/test | |
parent | 0b9cae42426e14060bc6182c037fd715f35a2d23 (diff) | |
download | spark-bcfee153b1cacfe617e602f3b72c0877e0bdf1f7.tar.gz spark-bcfee153b1cacfe617e602f3b72c0877e0bdf1f7.tar.bz2 spark-bcfee153b1cacfe617e602f3b72c0877e0bdf1f7.zip |
[SPARK-12837][CORE] reduce network IO for accumulators
Sending un-updated accumulators back to driver makes no sense, as merging a zero value accumulator is a no-op. We should only send back updated accumulators, to save network IO.
new test in `TaskContextSuite`
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12899 from cloud-fan/acc.
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala | 37 |
1 files changed, 30 insertions, 7 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 9aca4dbc23..368668bc7e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -168,8 +168,10 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark test("failed tasks collect only accumulators whose values count during failures") { sc = new SparkContext("local", "test") - val acc1 = AccumulatorSuite.createLongAccum("x", true) - val acc2 = AccumulatorSuite.createLongAccum("y", false) + val acc1 = AccumulatorSuite.createLongAccum("x", false) + val acc2 = AccumulatorSuite.createLongAccum("y", true) + acc1.add(1) + acc2.add(1) // Create a dummy task. We won't end up running this; we just want to collect // accumulator updates from it. val taskMetrics = TaskMetrics.empty @@ -185,12 +187,33 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark } // First, simulate task success. This should give us all the accumulators. val accumUpdates1 = task.collectAccumulatorUpdates(taskFailed = false) - val accumUpdates2 = taskMetrics.internalAccums ++ Seq(acc1, acc2) - TaskMetricsSuite.assertUpdatesEquals(accumUpdates1, accumUpdates2) + TaskMetricsSuite.assertUpdatesEquals(accumUpdates1.takeRight(2), Seq(acc1, acc2)) // Now, simulate task failures. This should give us only the accums that count failed values. - val accumUpdates3 = task.collectAccumulatorUpdates(taskFailed = true) - val accumUpdates4 = taskMetrics.internalAccums ++ Seq(acc1) - TaskMetricsSuite.assertUpdatesEquals(accumUpdates3, accumUpdates4) + val accumUpdates2 = task.collectAccumulatorUpdates(taskFailed = true) + TaskMetricsSuite.assertUpdatesEquals(accumUpdates2.takeRight(1), Seq(acc2)) + } + + test("only updated internal accumulators will be sent back to driver") { + sc = new SparkContext("local", "test") + // Create a dummy task. We won't end up running this; we just want to collect + // accumulator updates from it. + val taskMetrics = TaskMetrics.empty + val task = new Task[Int](0, 0, 0) { + context = new TaskContextImpl(0, 0, 0L, 0, + new TaskMemoryManager(SparkEnv.get.memoryManager, 0L), + new Properties, + SparkEnv.get.metricsSystem, + taskMetrics) + taskMetrics.incMemoryBytesSpilled(10) + override def runTask(tc: TaskContext): Int = 0 + } + val updatedAccums = task.collectAccumulatorUpdates() + assert(updatedAccums.length == 2) + // the RESULT_SIZE accumulator will be sent back anyway. + assert(updatedAccums(0).name == Some(InternalAccumulator.RESULT_SIZE)) + assert(updatedAccums(0).value == 0) + assert(updatedAccums(1).name == Some(InternalAccumulator.MEMORY_BYTES_SPILLED)) + assert(updatedAccums(1).value == 10) } test("localProperties are propagated to executors correctly") { |