diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2013-09-30 23:26:15 -0700 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2013-09-30 23:26:15 -0700 |
commit | 0dcad2edcbcc1f3f12a339110e85c8b1a48af156 (patch) | |
tree | d27aeff8b668af2a4613648c3a87e1e491200630 /core | |
parent | dea4677c887a515e7b2a3ef52dd65e69b15c60c3 (diff) | |
download | spark-0dcad2edcbcc1f3f12a339110e85c8b1a48af156.tar.gz spark-0dcad2edcbcc1f3f12a339110e85c8b1a48af156.tar.bz2 spark-0dcad2edcbcc1f3f12a339110e85c8b1a48af156.zip |
Added additional unit test for repeated task failures
Diffstat (limited to 'core')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala | 29 |
1 files changed, 28 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala index 58cc1ef185..80d0c5a5e9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala @@ -40,6 +40,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* val startedTasks = new ArrayBuffer[Long] val endedTasks = new mutable.HashMap[Long, TaskEndReason] val finishedManagers = new ArrayBuffer[TaskSetManager] + val taskSetsFailed = new ArrayBuffer[String] val executors = new mutable.HashMap[String, String] ++ liveExecutors @@ -63,7 +64,9 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* def executorLost(execId: String) {} - def taskSetFailed(taskSet: TaskSet, reason: String) {} + def taskSetFailed(taskSet: TaskSet, reason: String) { + taskSetsFailed += taskSet.id + } } def removeExecutor(execId: String): Unit = executors -= execId @@ -270,6 +273,30 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) } + test("repeated failures lead to task set abortion") { + sc = new SparkContext("local", "test") + val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val taskSet = createTaskSet(1) + val clock = new FakeClock + val manager = new ClusterTaskSetManager(sched, taskSet, clock) + + // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted + // after the last failure. + (0 until manager.MAX_TASK_FAILURES).foreach { index => + val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY) + assert(offerResult != None, + "Expect resource offer on iteration %s to return a task".format(index)) + assert(offerResult.get.index === 0) + manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost)) + if (index < manager.MAX_TASK_FAILURES) { + assert(!sched.taskSetsFailed.contains(taskSet.id)) + } else { + assert(sched.taskSetsFailed.contains(taskSet.id)) + } + } + } + + /** * Utility method to create a TaskSet, potentially setting a particular sequence of preferred * locations for each task (given as varargs) if this sequence is not empty. |