diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-14 10:54:57 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-04-14 10:54:57 -0700 |
commit | dac40b68dc52d5ab855dfde63f0872064aa3d999 (patch) | |
tree | c9192854e5f40d52d3a2ce1096da370b79888560 /core/src/test | |
parent | 9fa43a33b91c3a9b6be39bf3e00febf61a4b5b59 (diff) | |
download | spark-dac40b68dc52d5ab855dfde63f0872064aa3d999.tar.gz spark-dac40b68dc52d5ab855dfde63f0872064aa3d999.tar.bz2 spark-dac40b68dc52d5ab855dfde63f0872064aa3d999.zip |
[SPARK-14619] Track internal accumulators (metrics) by stage attempt
## What changes were proposed in this pull request?
When there are multiple attempts for a stage, we currently only reset internal accumulator values if all the tasks are resubmitted. It would make more sense to reset the accumulator values for each stage attempt. This will allow us to eventually get rid of the internal flag in the Accumulator class. This is part of my bigger effort to simplify accumulators and task metrics.
## How was this patch tested?
Covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #12378 from rxin/SPARK-14619.
Diffstat (limited to 'core/src/test')
3 files changed, 6 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 80a1de6065..ee6b991461 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -928,8 +928,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { numTasks: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty ): StageInfo = { - new StageInfo( - stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences) + new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", + Seq.empty, taskLocalityPreferences) } private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 00f3f15c45..cd7d2e1570 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -337,7 +337,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // first attempt -- its successful val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0, new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.create(sc))) + InternalAccumulator.createAll(sc))) val data1 = (1 to 10).map { x => x -> x} // second attempt -- also successful. We'll write out different data, @@ -345,7 +345,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // depending on what gets spilled, what gets combined, etc. val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0, new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.create(sc))) + InternalAccumulator.createAll(sc))) val data2 = (11 to 20).map { x => x -> x} // interleave writes of both attempts -- we want to test that both attempts can occur @@ -374,7 +374,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1, new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem, - InternalAccumulator.create(sc))) + InternalAccumulator.createAll(sc))) val readData = reader.read().toIndexedSeq assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2293c11dad..fd96fb04f8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou // SPARK-9809 -- this stage is submitted without a task for each partition (because some of // the shuffle map output is still available from stage 0); make sure we've still got internal // accumulators setup - assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty) + assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty) completeShuffleMapStageSuccessfully(2, 0, 2) completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) assert(results === Map(0 -> 1234, 1 -> 1235)) |