diff options
author | CodingCat <zhunansjtu@gmail.com> | 2014-03-05 14:00:28 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2014-03-05 14:00:28 -0800 |
commit | a3da5088195eea7d90b37feee5dd2a372fcd9ace (patch) | |
tree | ecc9dc4aaa413c021d9c0b357938d3316ba85584 | |
parent | 02836657cfec50bc6cc357541e40f8d36c90b352 (diff) | |
download | spark-a3da5088195eea7d90b37feee5dd2a372fcd9ace.tar.gz spark-a3da5088195eea7d90b37feee5dd2a372fcd9ace.tar.bz2 spark-a3da5088195eea7d90b37feee5dd2a372fcd9ace.zip |
SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor
https://spark-project.atlassian.net/browse/SPARK-1171
When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...
Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>
Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits:
f6bf93f [Nan Zhu] code clean
19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers
43c13e9 [CodingCat] keep WorkerOffer immutable
af470d3 [CodingCat] style fix
0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 8 |
2 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index ba6bab3f91..810b36cddf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -21,4 +21,4 @@ package org.apache.spark.scheduler * Represents free resources available on an executor. */ private[spark] -class WorkerOffer(val executorId: String, val host: String, val cores: Int) +case class WorkerOffer(executorId: String, host: String, cores: Int) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 379e02eb9a..fad0373157 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -54,6 +54,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A private val executorAddress = new HashMap[String, Address] private val executorHost = new HashMap[String, String] private val freeCores = new HashMap[String, Int] + private val totalCores = new HashMap[String, Int] private val addressToExecutorId = new HashMap[Address, String] override def preStart() { @@ -76,6 +77,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 + totalCores(executorId) = cores freeCores(executorId) = cores executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId @@ -147,10 +149,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A def removeExecutor(executorId: String, reason: String) { if (executorActor.contains(executorId)) { logInfo("Executor " + executorId + " disconnected, so removing it") - val numCores = freeCores(executorId) - addressToExecutorId -= executorAddress(executorId) + val numCores = totalCores(executorId) executorActor -= executorId executorHost -= executorId + addressToExecutorId -= executorAddress(executorId) + executorAddress -= executorId + totalCores -= executorId freeCores -= executorId totalCoreCount.addAndGet(-numCores) scheduler.executorLost(executorId, SlaveLost(reason)) |