diff options
author | Reynold Xin <rxin@apache.org> | 2013-11-05 18:46:38 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2013-11-05 18:46:38 -0800 |
commit | a02eed68110f99c08d8ff379108c96546bbc16b0 (patch) | |
tree | f6e8e9ee00d1b1fe34f6aff6371fc5aba4e454e4 | |
parent | 9f7b9bb1cd157e8278d43a3908d5f778b54aed55 (diff) | |
download | spark-a02eed68110f99c08d8ff379108c96546bbc16b0.tar.gz spark-a02eed68110f99c08d8ff379108c96546bbc16b0.tar.bz2 spark-a02eed68110f99c08d8ff379108c96546bbc16b0.zip |
Ignore a task update status if the executor doesn't exist anymore.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 70f3f88401..a45bee536c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -87,8 +87,14 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { - freeCores(executorId) += 1 - makeOffers(executorId) + if (executorActor.contains(executorId)) { + freeCores(executorId) += 1 + makeOffers(executorId) + } else { + // Ignoring the update since we don't know about the executor. + val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s" + logWarning(msg.format(taskId, state, sender, executorId)) + } } case ReviveOffers => @@ -175,7 +181,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) } - private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + private val timeout = { + Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + } def stopExecutors() { try { |