diff options
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 9b7b945bf3..1d7c8f4a61 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -22,6 +22,8 @@ import java.util.Random import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.mockito.Mockito.{mock, verify} + import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.util.{AccumulatorV2, ManualClock} @@ -789,6 +791,54 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) } + test("Kill other task attempts when one attempt belonging to the same task succeeds") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set("spark.speculation.multiplier", "0.0") + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 4 tasks to start + for ((k, v) <- List( + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { + val taskOption = manager.resourceOffer(k, v, NO_PREF) + assert(taskOption.isDefined) + val task = taskOption.get + assert(task.executorId === k) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + // Complete the 3 tasks and leave 1 task in running + for (id <- Set(0, 1, 2)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + + assert(manager.checkSpeculatableTasks(0)) + // Offer resource to start the speculative attempt for the running task + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + assert(taskOption5.isDefined) + val task5 = taskOption5.get + assert(task5.index === 3) + assert(task5.taskId === 4) + assert(task5.executorId === "exec1") + assert(task5.attemptNumber === 1) + sched.backend = mock(classOf[SchedulerBackend]) + // Complete the speculative attempt for the running task + manager.handleSuccessfulTask(4, createTaskResult(3, accumUpdatesByTask(3))) + // Verify that it kills other running attempt + verify(sched.backend).killTask(3, "exec2", true) + // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be + // killed, so the FakeTaskScheduler is only told about the successful completion + // of the speculated task. + assert(sched.endedTasks(3) === Success) + } + private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { |