aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-14 10:54:57 -0700
committerAndrew Or <andrew@databricks.com>2016-04-14 10:54:57 -0700
commitdac40b68dc52d5ab855dfde63f0872064aa3d999 (patch)
treec9192854e5f40d52d3a2ce1096da370b79888560 /core/src/test
parent9fa43a33b91c3a9b6be39bf3e00febf61a4b5b59 (diff)
downloadspark-dac40b68dc52d5ab855dfde63f0872064aa3d999.tar.gz
spark-dac40b68dc52d5ab855dfde63f0872064aa3d999.tar.bz2
spark-dac40b68dc52d5ab855dfde63f0872064aa3d999.zip
[SPARK-14619] Track internal accumulators (metrics) by stage attempt
## What changes were proposed in this pull request? When there are multiple attempts for a stage, we currently only reset internal accumulator values if all the tasks are resubmitted. It would make more sense to reset the accumulator values for each stage attempt. This will allow us to eventually get rid of the internal flag in the Accumulator class. This is part of my bigger effort to simplify accumulators and task metrics. ## How was this patch tested? Covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12378 from rxin/SPARK-14619.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
3 files changed, 6 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 80a1de6065..ee6b991461 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -928,8 +928,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
numTasks: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
): StageInfo = {
- new StageInfo(
- stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences)
+ new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details",
+ Seq.empty, taskLocalityPreferences)
}
private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 00f3f15c45..cd7d2e1570 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -337,7 +337,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.create(sc)))
+ InternalAccumulator.createAll(sc)))
val data1 = (1 to 10).map { x => x -> x}
// second attempt -- also successful. We'll write out different data,
@@ -345,7 +345,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// depending on what gets spilled, what gets combined, etc.
val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0,
new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.create(sc)))
+ InternalAccumulator.createAll(sc)))
val data2 = (11 to 20).map { x => x -> x}
// interleave writes of both attempts -- we want to test that both attempts can occur
@@ -374,7 +374,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem,
- InternalAccumulator.create(sc)))
+ InternalAccumulator.createAll(sc)))
val readData = reader.read().toIndexedSeq
assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2293c11dad..fd96fb04f8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1144,7 +1144,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// SPARK-9809 -- this stage is submitted without a task for each partition (because some of
// the shuffle map output is still available from stage 0); make sure we've still got internal
// accumulators setup
- assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+ assert(scheduler.stageIdToStage(2).latestInfo.internalAccumulators.nonEmpty)
completeShuffleMapStageSuccessfully(2, 0, 2)
completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
assert(results === Map(0 -> 1234, 1 -> 1235))