aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAkshat Aranya <aaranya@quantcast.com>2014-11-19 17:20:20 -0800
committerAndrew Or <andrew@databricks.com>2014-11-19 17:20:32 -0800
commitd68b40bfca70cfbcee052dd6fea4f39602bf9dcf (patch)
tree936c15f9588cd36acbc674ce762e6c439b413b59 /core/src
parent8786ddd48166a3c7da20bf37ab894053d882e078 (diff)
downloadspark-d68b40bfca70cfbcee052dd6fea4f39602bf9dcf.tar.gz
spark-d68b40bfca70cfbcee052dd6fea4f39602bf9dcf.tar.bz2
spark-d68b40bfca70cfbcee052dd6fea4f39602bf9dcf.zip
[SPARK-4478] Keep totalRegisteredExecutors up-to-date
This rebases PR 3368. This commit fixes totalRegisteredExecutors update [SPARK-4478], so that we can correctly keep track of number of registered executors. Author: Akshat Aranya <aaranya@quantcast.com> Closes #3373 from coolfrood/topic/SPARK-4478 and squashes the following commits: 8a4d1e4 [Akshat Aranya] Added comment 150ae93 [Akshat Aranya] [SPARK-4478] Keep totalRegisteredExecutors up-to-date (cherry picked from commit 9ccc53c72c5bcffcc121291710754e1e2d659341) Signed-off-by: Andrew Or <andrew@databricks.com>
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala2
1 files changed, 2 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7a6ee56f81..047fae104b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -46,6 +46,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ // Total number of executors that are currently registered
var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
@@ -204,6 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
executorsPendingToRemove -= executorId
}
totalCoreCount.addAndGet(-executorInfo.totalCores)
+ totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, SlaveLost(reason))
case None => logError(s"Asked to remove non-existent executor $executorId")
}