diff options
Diffstat (limited to 'core')
5 files changed, 107 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 33edf25043..47a5cbff49 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -40,6 +40,15 @@ private[spark] object ExecutorExited { } } +/** + * A loss reason that means we don't yet know why the executor exited. + * + * This is used by the task scheduler to remove state associated with the executor, but + * not yet fail any tasks that were running in the executor before the real loss reason + * is known. + */ +private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.") + private[spark] case class SlaveLost(_message: String = "Slave lost") extends ExecutorLossReason(_message) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1c7bfe89c0..43d7d80b7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -468,11 +468,20 @@ private[spark] class TaskSchedulerImpl( removeExecutor(executorId, reason) failedExecutor = Some(executorId) } else { - // We may get multiple executorLost() calls with different loss reasons. For example, one - // may be triggered by a dropped connection from the slave while another may be a report - // of executor termination from Mesos. We produce log messages for both so we eventually - // report the termination reason. - logError("Lost an executor " + executorId + " (already removed): " + reason) + executorIdToHost.get(executorId) match { + case Some(_) => + // If the host mapping still exists, it means we don't know the loss reason for the + // executor. So call removeExecutor() to update tasks running on that executor when + // the real loss reason is finally known. + removeExecutor(executorId, reason) + + case None => + // We may get multiple executorLost() calls with different loss reasons. For example, + // one may be triggered by a dropped connection from the slave while another may be a + // report of executor termination from Mesos. We produce log messages for both so we + // eventually report the termination reason. + logError("Lost an executor " + executorId + " (already removed): " + reason) + } } } // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock @@ -482,7 +491,11 @@ private[spark] class TaskSchedulerImpl( } } - /** Remove an executor from all our data structures and mark it as lost */ + /** + * Remove an executor from all our data structures and mark it as lost. If the executor's loss + * reason is not yet known, do not yet remove its association with its host nor update the status + * of any running tasks, since the loss reason defines whether we'll fail those tasks. + */ private def removeExecutor(executorId: String, reason: ExecutorLossReason) { activeExecutorIds -= executorId val host = executorIdToHost(executorId) @@ -497,8 +510,11 @@ private[spark] class TaskSchedulerImpl( } } } - executorIdToHost -= executorId - rootPool.executorLost(executorId, host, reason) + + if (reason != LossReasonPending) { + executorIdToHost -= executorId + rootPool.executorLost(executorId, host, reason) + } } def executorAdded(execId: String, host: String) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ebce5021b1..f71d98feac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -73,6 +73,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The number of pending tasks which is locality required protected var localityAwareTasks = 0 + // Executors that have been lost, but for which we don't yet know the real exit reason. + protected val executorsPendingLossReason = new HashSet[String] + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -184,7 +187,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on all executors private def makeOffers() { // Filter out executors under killing - val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_)) + val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq @@ -202,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Filter out executors under killing - if (!executorsPendingToRemove.contains(executorId)) { + if (executorIsAlive(executorId)) { val executorData = executorDataMap(executorId) val workOffers = Seq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) @@ -210,6 +213,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } + private def executorIsAlive(executorId: String): Boolean = synchronized { + !executorsPendingToRemove.contains(executorId) && + !executorsPendingLossReason.contains(executorId) + } + // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { @@ -246,6 +254,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingToRemove -= executorId + executorsPendingLossReason -= executorId } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) @@ -256,6 +265,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } + /** + * Stop making resource offers for the given executor. The executor is marked as lost with + * the loss reason still pending. + * + * @return Whether executor was alive. + */ + protected def disableExecutor(executorId: String): Boolean = { + val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { + if (executorIsAlive(executorId)) { + executorsPendingLossReason += executorId + true + } else { + false + } + } + + if (shouldDisable) { + logInfo(s"Disabling executor $executorId.") + scheduler.executorLost(executorId, LossReasonPending) + } + + shouldDisable + } + override def onStop() { reviveThread.shutdownNow() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index d75d6f673e..80da37b09b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -115,15 +115,12 @@ private[spark] abstract class YarnSchedulerBackend( * (e.g., preemption), according to the application master, then we pass that information down * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should * not count towards a job failure. - * - * TODO there's a race condition where while we are querying the ApplicationMaster for - * the executor loss reason, there is the potential that tasks will be scheduled on - * the executor that failed. We should fix this by having this onDisconnected event - * also "blacklist" executors so that tasks are not assigned to them. */ override def onDisconnected(rpcAddress: RpcAddress): Unit = { addressToExecutorId.get(rpcAddress).foreach { executorId => - yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress) + if (disableExecutor(executorId)) { + yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress) + } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index c2edd4c317..2afb595e6f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -237,4 +237,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L } } + test("tasks are not re-scheduled while executor loss reason is pending") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1)) + val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1)) + val attempt1 = FakeTask.createTaskSet(1) + + // submit attempt 1, offer resources, task gets scheduled + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten + assert(1 === taskDescriptions.length) + + // mark executor0 as dead but pending fail reason + taskScheduler.executorLost("executor0", LossReasonPending) + + // offer some more resources on a different executor, nothing should change + val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten + assert(0 === taskDescriptions2.length) + + // provide the actual loss reason for executor0 + taskScheduler.executorLost("executor0", SlaveLost("oops")) + + // executor0's tasks should have failed now that the loss reason is known, so offering more + // resources should make them be scheduled on the new executor. + val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten + assert(1 === taskDescriptions3.length) + assert("executor1" === taskDescriptions3(0).executorId) + } + } |