aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala64
2 files changed, 65 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index e63feb8893..19ebaf817e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -874,7 +874,8 @@ private[spark] class TaskSetManager(
// and we are not using an external shuffle server which could serve the shuffle outputs.
// The reason is the next stage wouldn't be able to fetch the data from this dead executor
// so we would need to rerun these tasks on other executors.
- if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
+ if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
+ && !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
if (successful(index)) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index d03a0c990a..2c2cda9f31 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.util.Random
+import java.util.{Properties, Random}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, never, spy, verify, when}
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, ManualClock}
@@ -664,6 +665,67 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize"))
}
+ test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") {
+ val conf = new SparkConf().set("spark.speculation", "true")
+ sc = new SparkContext("local", "test", conf)
+
+ val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+ sched.initialize(new FakeSchedulerBackend() {
+ override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {}
+ })
+
+ // Keep track of the number of tasks that are resubmitted,
+ // so that the test can check that no tasks were resubmitted.
+ var resubmittedTasks = 0
+ val dagScheduler = new FakeDAGScheduler(sc, sched) {
+ override def taskEnded(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Seq[AccumulatorV2[_, _]],
+ taskInfo: TaskInfo): Unit = {
+ super.taskEnded(task, reason, result, accumUpdates, taskInfo)
+ reason match {
+ case Resubmitted => resubmittedTasks += 1
+ case _ =>
+ }
+ }
+ }
+ sched.setDAGScheduler(dagScheduler)
+
+ val singleTask = new ShuffleMapTask(0, 0, null, new Partition {
+ override def index: Int = 0
+ }, Seq(TaskLocation("host1", "execA")), new Properties, null)
+ val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+
+ // Offer host1, which should be accepted as a PROCESS_LOCAL location
+ // by the one task in the task set
+ val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get
+
+ // Mark the task as available for speculation, and then offer another resource,
+ // which should be used to launch a speculative copy of the task.
+ manager.speculatableTasks += singleTask.partitionId
+ val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get
+
+ assert(manager.runningTasks === 2)
+ assert(manager.isZombie === false)
+
+ val directTaskResult = new DirectTaskResult[String](null, Seq()) {
+ override def value(resultSer: SerializerInstance): String = ""
+ }
+ // Complete one copy of the task, which should result in the task set manager
+ // being marked as a zombie, because at least one copy of its only task has completed.
+ manager.handleSuccessfulTask(task1.taskId, directTaskResult)
+ assert(manager.isZombie === true)
+ assert(resubmittedTasks === 0)
+ assert(manager.runningTasks === 1)
+
+ manager.executorLost("execB", "host2", new SlaveLost())
+ assert(manager.runningTasks === 0)
+ assert(resubmittedTasks === 0)
+ }
+
test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(