From 61ca7742d21dd66f5a7b3bb826e3aaca6f049b68 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 21 Oct 2014 11:22:25 -0700 Subject: [SPARK-4020] Do not rely on timeouts to remove failed block managers If an executor fails without being scheduled to run any tasks, then `DAGScheduler` won't notify `BlockManagerMasterActor` that the associated block manager should be removed. Instead, the associated block manager will be expired only after a few rounds of heartbeat timeouts. In terms of removal treatment, there should really be no distinction between executors that have been scheduled tasks and those that have not. The fix, then, is to add all known executors to `TaskSchedulerImpl`'s `activeExecutorIds` whether or not it has been scheduled a task. In fact, the existing comment above `activeExecutorIds` is ``` // Which executor IDs we have executors on val activeExecutorIds = new HashSet[String] ``` not "Which executors have been scheduled tasks thus far." Author: Andrew Or Closes #2865 from andrewor14/active-executors and squashes the following commits: ff3172b [Andrew Or] Add all known executors to `activeExecutorIds` --- core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'core/src/main/scala') diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6d697e3d00..2b39c7fc87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -221,6 +221,7 @@ private[spark] class TaskSchedulerImpl( var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host + activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) @@ -261,7 +262,6 @@ private[spark] class TaskSchedulerImpl( val tid = task.taskId taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId - activeExecutorIds += execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) -- cgit v1.2.3