aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2016-06-30 13:36:06 -0500
committerImran Rashid <irashid@cloudera.com>2016-06-30 13:36:06 -0500
commitfdf9f94f8c8861a00cd8415073f842b857c397f7 (patch)
tree43a498dc10b47c2355b5e71c994e9de5b611ff36 /core/src/main
parent07f46afc733b1718d528a6ea5c0d774f047024fa (diff)
downloadspark-fdf9f94f8c8861a00cd8415073f842b857c397f7.tar.gz
spark-fdf9f94f8c8861a00cd8415073f842b857c397f7.tar.bz2
spark-fdf9f94f8c8861a00cd8415073f842b857c397f7.zip
[SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors
## What changes were proposed in this pull request? Before this change, when you turn on blacklisting with `spark.scheduler.executorTaskBlacklistTime`, but you have fewer than `spark.task.maxFailures` executors, you can end with a job "hung" after some task failures. Whenever a taskset is unable to schedule anything on resourceOfferSingleTaskSet, we check whether the last pending task can be scheduled on *any* known executor. If not, the taskset (and any corresponding jobs) are failed. * Worst case, this is O(maxTaskFailures + numTasks). But unless many executors are bad, this should be small * This does not fail as fast as possible -- when a task becomes unschedulable, we keep scheduling other tasks. This is to avoid an O(numPendingTasks * numExecutors) operation * Also, it is conceivable this fails too quickly. You may be 1 millisecond away from unblacklisting a place for a task to run, or acquiring a new executor. ## How was this patch tested? Added unit test which failed before the change, ran new test 5k times manually, ran all scheduler tests manually, and the full suite via jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes #13603 from squito/progress_w_few_execs_and_blacklist.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala54
3 files changed, 59 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 2d89232ba2..eeb7963c9e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -30,6 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class TaskInfo(
val taskId: Long,
+ /**
+ * The index of this task within its task set. Not necessarily the same as the ID of the RDD
+ * partition that the task is computing.
+ */
val index: Int,
val attemptNumber: Int,
val launchTime: Long,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 821e3ee7f1..2ce49ca134 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -279,6 +279,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}
+ if (!launchedTask) {
+ taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+ }
return launchedTask
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2eedd201ca..2fef447b0a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -83,7 +83,7 @@ private[spark] class TaskSetManager(
val copiesRunning = new Array[Int](numTasks)
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)
- // key is taskId, value is a Map of executor id to when it failed
+ // key is taskId (aka TaskInfo.index), value is a Map of executor id to when it failed
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
@@ -270,7 +270,7 @@ private[spark] class TaskSetManager(
* Is this re-execution of a failed task on an executor it already failed in before
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
*/
- private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
+ private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
if (failedExecutors.contains(taskId)) {
val failed = failedExecutors.get(taskId).get
@@ -576,6 +576,56 @@ private[spark] class TaskSetManager(
}
/**
+ * Check whether the given task set has been blacklisted to the point that it can't run anywhere.
+ *
+ * It is possible that this taskset has become impossible to schedule *anywhere* due to the
+ * blacklist. The most common scenario would be if there are fewer executors than
+ * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
+ * will hang.
+ *
+ * There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that
+ * would add extra time to each iteration of the scheduling loop. Here, we take the approach of
+ * making sure at least one of the unscheduled tasks is schedulable. This means we may not detect
+ * the hang as quickly as we could have, but we'll always detect the hang eventually, and the
+ * method is faster in the typical case. In the worst case, this method can take
+ * O(maxTaskFailures + numTasks) time, but it will be faster when there haven't been any task
+ * failures (this is because the method picks on unscheduled task, and then iterates through each
+ * executor until it finds one that the task hasn't failed on already).
+ */
+ private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = {
+
+ val pendingTask: Option[Int] = {
+ // usually this will just take the last pending task, but because of the lazy removal
+ // from each list, we may need to go deeper in the list. We poll from the end because
+ // failed tasks are put back at the end of allPendingTasks, so we're more likely to find
+ // an unschedulable task this way.
+ val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
+ copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
+ }
+ if (indexOffset == -1) {
+ None
+ } else {
+ Some(allPendingTasks(indexOffset))
+ }
+ }
+
+ // If no executors have registered yet, don't abort the stage, just wait. We probably
+ // got here because a task set was added before the executors registered.
+ if (executors.nonEmpty) {
+ // take any task that needs to be scheduled, and see if we can find some executor it *could*
+ // run on
+ pendingTask.foreach { taskId =>
+ if (executors.forall(executorIsBlacklisted(_, taskId))) {
+ val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
+ val partition = tasks(taskId).partitionId
+ abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" +
+ s" has already failed on executors $execs, and no other executors are available.")
+ }
+ }
+ }
+ }
+
+ /**
* Marks the task as getting result and notifies the DAG Scheduler
*/
def handleTaskGettingResult(tid: Long): Unit = {