aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-03-05 14:00:28 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2014-03-05 14:00:28 -0800
commita3da5088195eea7d90b37feee5dd2a372fcd9ace (patch)
treeecc9dc4aaa413c021d9c0b357938d3316ba85584 /core
parent02836657cfec50bc6cc357541e40f8d36c90b352 (diff)
downloadspark-a3da5088195eea7d90b37feee5dd2a372fcd9ace.tar.gz
spark-a3da5088195eea7d90b37feee5dd2a372fcd9ace.tar.bz2
spark-a3da5088195eea7d90b37feee5dd2a372fcd9ace.zip
SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor
https://spark-project.atlassian.net/browse/SPARK-1171 When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores... Author: CodingCat <zhunansjtu@gmail.com> Author: Nan Zhu <CodingCat@users.noreply.github.com> Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits: f6bf93f [Nan Zhu] code clean 19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers 43c13e9 [CodingCat] keep WorkerOffer immutable af470d3 [CodingCat] style fix 0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala8
2 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
index ba6bab3f91..810b36cddf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -21,4 +21,4 @@ package org.apache.spark.scheduler
* Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)
+case class WorkerOffer(executorId: String, host: String, cores: Int)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 379e02eb9a..fad0373157 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -54,6 +54,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
private val executorAddress = new HashMap[String, Address]
private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
+ private val totalCores = new HashMap[String, Int]
private val addressToExecutorId = new HashMap[Address, String]
override def preStart() {
@@ -76,6 +77,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
sender ! RegisteredExecutor(sparkProperties)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
+ totalCores(executorId) = cores
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
@@ -147,10 +149,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
def removeExecutor(executorId: String, reason: String) {
if (executorActor.contains(executorId)) {
logInfo("Executor " + executorId + " disconnected, so removing it")
- val numCores = freeCores(executorId)
- addressToExecutorId -= executorAddress(executorId)
+ val numCores = totalCores(executorId)
executorActor -= executorId
executorHost -= executorId
+ addressToExecutorId -= executorAddress(executorId)
+ executorAddress -= executorId
+ totalCores -= executorId
freeCores -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))