aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala54
-rw-r--r--core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala33
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala92
9 files changed, 194 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 2d89232ba2..eeb7963c9e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -30,6 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class TaskInfo(
val taskId: Long,
+ /**
+ * The index of this task within its task set. Not necessarily the same as the ID of the RDD
+ * partition that the task is computing.
+ */
val index: Int,
val attemptNumber: Int,
val launchTime: Long,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 821e3ee7f1..2ce49ca134 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -279,6 +279,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}
+ if (!launchedTask) {
+ taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+ }
return launchedTask
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2eedd201ca..2fef447b0a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -83,7 +83,7 @@ private[spark] class TaskSetManager(
val copiesRunning = new Array[Int](numTasks)
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)
- // key is taskId, value is a Map of executor id to when it failed
+ // key is taskId (aka TaskInfo.index), value is a Map of executor id to when it failed
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
@@ -270,7 +270,7 @@ private[spark] class TaskSetManager(
* Is this re-execution of a failed task on an executor it already failed in before
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
*/
- private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
+ private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
if (failedExecutors.contains(taskId)) {
val failed = failedExecutors.get(taskId).get
@@ -576,6 +576,56 @@ private[spark] class TaskSetManager(
}
/**
+ * Check whether the given task set has been blacklisted to the point that it can't run anywhere.
+ *
+ * It is possible that this taskset has become impossible to schedule *anywhere* due to the
+ * blacklist. The most common scenario would be if there are fewer executors than
+ * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
+ * will hang.
+ *
+ * There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that
+ * would add extra time to each iteration of the scheduling loop. Here, we take the approach of
+ * making sure at least one of the unscheduled tasks is schedulable. This means we may not detect
+ * the hang as quickly as we could have, but we'll always detect the hang eventually, and the
+ * method is faster in the typical case. In the worst case, this method can take
+ * O(maxTaskFailures + numTasks) time, but it will be faster when there haven't been any task
+ * failures (this is because the method picks on unscheduled task, and then iterates through each
+ * executor until it finds one that the task hasn't failed on already).
+ */
+ private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = {
+
+ val pendingTask: Option[Int] = {
+ // usually this will just take the last pending task, but because of the lazy removal
+ // from each list, we may need to go deeper in the list. We poll from the end because
+ // failed tasks are put back at the end of allPendingTasks, so we're more likely to find
+ // an unschedulable task this way.
+ val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
+ copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
+ }
+ if (indexOffset == -1) {
+ None
+ } else {
+ Some(allPendingTasks(indexOffset))
+ }
+ }
+
+ // If no executors have registered yet, don't abort the stage, just wait. We probably
+ // got here because a task set was added before the executors registered.
+ if (executors.nonEmpty) {
+ // take any task that needs to be scheduled, and see if we can find some executor it *could*
+ // run on
+ pendingTask.foreach { taskId =>
+ if (executors.forall(executorIsBlacklisted(_, taskId))) {
+ val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
+ val partition = tasks(taskId).partitionId
+ abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" +
+ s" has already failed on executors $execs, and no other executors are available.")
+ }
+ }
+ }
+ }
+
+ /**
* Marks the task as getting result and notifies the DAG Scheduler
*/
def handleTaskGettingResult(tid: Long): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 3e69894b10..683eeeeb6d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -53,7 +53,7 @@ class ExecutorSuite extends SparkFunSuite {
when(mockEnv.closureSerializer).thenReturn(serializer)
val serializedTask =
Task.serializeWithDependencies(
- new FakeTask(0),
+ new FakeTask(0, 0),
HashMap[String, Long](),
HashMap[String, Long](),
serializer.newInstance())
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 8ba2697dd9..14c8b664d4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -57,7 +57,8 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
- // just set this to something much longer than the test duration
+ // set this to something much longer than the test duration so that executors don't get
+ // removed from the blacklist during the test
("spark.scheduler.executorTaskBlacklistTime", "10000000")
)
) {
@@ -74,7 +75,8 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
- // just set this to something much longer than the test duration
+ // set this to something much longer than the test duration so that executors don't get
+ // removed from the blacklist during the test
("spark.scheduler.executorTaskBlacklistTime", "10000000"),
// this has to be higher than the number of executors on the bad host
("spark.task.maxFailures", "5"),
@@ -91,6 +93,33 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
assertDataStructuresEmpty(noFailure = true)
}
+ // Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job
+ // doesn't hang
+ testScheduler(
+ "SPARK-15865 Progress with fewer executors than maxTaskFailures",
+ extraConfs = Seq(
+ // set this to something much longer than the test duration so that executors don't get
+ // removed from the blacklist during the test
+ "spark.scheduler.executorTaskBlacklistTime" -> "10000000",
+ "spark.testing.nHosts" -> "2",
+ "spark.testing.nExecutorsPerHost" -> "1",
+ "spark.testing.nCoresPerExecutor" -> "1"
+ )
+ ) {
+ def runBackend(): Unit = {
+ val (taskDescription, _) = backend.beginTask()
+ backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
+ }
+ withBackend(runBackend _) {
+ val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+ Await.ready(jobFuture, duration)
+ val pattern = ("Aborting TaskSet 0.0 because task .* " +
+ "already failed on executors \\(.*\\), and no other executors are available").r
+ assert(pattern.findFirstIn(failure.getMessage).isDefined,
+ s"Couldn't find $pattern in ${failure.getMessage()}")
+ }
+ assertDataStructuresEmpty(noFailure = false)
+ }
}
class MultiExecutorMockBackend(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 4fe705b201..87600fe504 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -21,7 +21,8 @@ import org.apache.spark.TaskContext
class FakeTask(
stageId: Int,
- prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 0) {
+ partitionId: Int,
+ prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, partitionId) {
override def runTask(context: TaskContext): Int = 0
override def preferredLocations: Seq[TaskLocation] = prefLocs
}
@@ -40,7 +41,7 @@ object FakeTask {
throw new IllegalArgumentException("Wrong number of task locations")
}
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
- new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+ new FakeTask(0, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
}
new TaskSet(tasks, 0, stageAttemptId, 0, null)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index 467796d7c2..00e1c447cc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -30,7 +30,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
: TaskSetManager = {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
- new FakeTask(i, Nil)
+ new FakeTask(stageId, i, Nil)
}
new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 634b94f4c3..14f52a6be9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -279,12 +279,6 @@ private[spark] abstract class MockBackend(
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
- reviveThread.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- reviveOffers()
- }
- }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
-
/**
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
@@ -348,7 +342,13 @@ private[spark] abstract class MockBackend(
assignedTasksWaitingToRun.nonEmpty
}
- override def start(): Unit = {}
+ override def start(): Unit = {
+ reviveThread.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ reviveOffers()
+ }
+ }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
+ }
override def stop(): Unit = {
reviveThread.shutdown()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 34b8d55339..100b15740c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -281,6 +281,98 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(!failedTaskSet)
}
+ test("abort stage if executor loss results in unschedulability from previously failed tasks") {
+ // Make sure we can detect when a taskset becomes unschedulable from a blacklisting. This
+ // test explores a particular corner case -- you may have one task fail, but still be
+ // schedulable on another executor. However, that executor may fail later on, leaving the
+ // first task with no place to run.
+ val taskScheduler = setupScheduler(
+ // set this to something much longer than the test duration so that executors don't get
+ // removed from the blacklist during the test
+ "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+ )
+
+ val taskSet = FakeTask.createTaskSet(2)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
+
+ val firstTaskAttempts = taskScheduler.resourceOffers(Seq(
+ new WorkerOffer("executor0", "host0", 1),
+ new WorkerOffer("executor1", "host1", 1)
+ )).flatten
+ assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet)
+
+ // fail one of the tasks, but leave the other running
+ val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
+ taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost)
+ // at this point, our failed task could run on the other executor, so don't give up the task
+ // set yet.
+ assert(!failedTaskSet)
+
+ // Now we fail our second executor. The other task can still run on executor1, so make an offer
+ // on that executor, and make sure that the other task (not the failed one) is assigned there
+ taskScheduler.executorLost("executor1", SlaveLost("oops"))
+ val nextTaskAttempts =
+ taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten
+ // Note: Its OK if some future change makes this already realize the taskset has become
+ // unschedulable at this point (though in the current implementation, we're sure it will not)
+ assert(nextTaskAttempts.size === 1)
+ assert(nextTaskAttempts.head.executorId === "executor0")
+ assert(nextTaskAttempts.head.attemptNumber === 1)
+ assert(nextTaskAttempts.head.index != failedTask.index)
+
+ // now we should definitely realize that our task set is unschedulable, because the only
+ // task left can't be scheduled on any executors due to the blacklist
+ taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1)))
+ sc.listenerBus.waitUntilEmpty(100000)
+ assert(tsm.isZombie)
+ assert(failedTaskSet)
+ val idx = failedTask.index
+ assert(failedTaskSetReason == s"Aborting TaskSet 0.0 because task $idx (partition $idx) has " +
+ s"already failed on executors (executor0), and no other executors are available.")
+ }
+
+ test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
+ // interaction of SPARK-15865 & SPARK-16106
+ // if we have a small number of tasks, we might be able to schedule them all on the first
+ // executor. But if those tasks fail, we should still realize there is another executor
+ // available and not bail on the job
+
+ val taskScheduler = setupScheduler(
+ // set this to something much longer than the test duration so that executors don't get
+ // removed from the blacklist during the test
+ "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+ )
+
+ val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => Seq(TaskLocation("host0")) }: _*)
+ taskScheduler.submitTasks(taskSet)
+ val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
+
+ val offers = Seq(
+ // each offer has more than enough free cores for the entire task set, so when combined
+ // with the locality preferences, we schedule all tasks on one executor
+ new WorkerOffer("executor0", "host0", 4),
+ new WorkerOffer("executor1", "host1", 4)
+ )
+ val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ assert(firstTaskAttempts.size == 2)
+ firstTaskAttempts.foreach { taskAttempt => assert("executor0" === taskAttempt.executorId) }
+
+ // fail all the tasks on the bad executor
+ firstTaskAttempts.foreach { taskAttempt =>
+ taskScheduler.handleFailedTask(tsm, taskAttempt.taskId, TaskState.FAILED, TaskResultLost)
+ }
+
+ // Here is the main check of this test -- we have the same offers again, and we schedule it
+ // successfully. Because the scheduler first tries to schedule with locality in mind, at first
+ // it won't schedule anything on executor1. But despite that, we don't abort the job. Then the
+ // scheduler tries for ANY locality, and successfully schedules tasks on executor1.
+ val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+ assert(secondTaskAttempts.size == 2)
+ secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) }
+ assert(!failedTaskSet)
+ }
+
test("SPARK-16106 locality levels updated if executor added to existing host") {
val taskScheduler = setupScheduler()