diff options
5 files changed, 21 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index a63eee1233..e01181d1b2 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -107,7 +107,7 @@ private[spark] class Client( def stop() { if (actor != null) { try { - val timeout = 1.seconds + val timeout = 5.seconds val future = actor.ask(StopClient)(timeout) Await.result(future, timeout) } catch { diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index d985261600..a5de23261c 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -117,8 +117,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor case Heartbeat(workerId) => { idToWorker.get(workerId) match { case Some(workerInfo) => - workerInfo.lastHeartbeat = System.currentTimeMillis() - case None => + workerInfo.lastHeartbeat = System.currentTimeMillis() + case None => logWarning("Got heartbeat from unregistered worker " + workerId) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 3c3e83b138..e77355c6cd 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -68,6 +68,5 @@ private[spark] class SparkDeploySchedulerBackend( } logInfo("Executor %s removed: %s".format(executorId, message)) removeExecutor(executorId, reason.toString) - scheduler.executorLost(executorId, reason) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 4213eb8719..d606432572 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -104,16 +104,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { - logInfo("Executor " + executorId + " disconnected, so removing it") - val numCores = freeCores(executorId) - actorToExecutorId -= executorActor(executorId) - addressToExecutorId -= executorAddress(executorId) - executorActor -= executorId - executorHost -= executorId - freeCores -= executorId - executorHost -= executorId - totalCoreCount.addAndGet(-numCores) - scheduler.executorLost(executorId, SlaveLost(reason)) + if (executorActor.contains(executorId)) { + logInfo("Executor " + executorId + " disconnected, so removing it") + val numCores = freeCores(executorId) + actorToExecutorId -= executorActor(executorId) + addressToExecutorId -= executorAddress(executorId) + executorActor -= executorId + executorHost -= executorId + freeCores -= executorId + executorHost -= executorId + totalCoreCount.addAndGet(-numCores) + scheduler.executorLost(executorId, SlaveLost(reason)) + } } } @@ -153,7 +155,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) - // Called by backends + // Called by subclasses when notified of a lost worker def removeExecutor(executorId: String, reason: String) { try { val timeout = 5.seconds diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index b481ec0a72..7caf06e917 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -239,7 +239,11 @@ private[spark] class CoarseMesosSchedulerBackend( override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { - slaveIdsWithExecutors -= slaveId.getValue + if (slaveIdsWithExecutors.contains(slaveId.getValue)) { + // Note that the slave ID corresponds to the executor ID on that slave + slaveIdsWithExecutors -= slaveId.getValue + removeExecutor(slaveId.getValue, "Mesos slave lost") + } } } |