From 1d8e2e6cffdd63b736f26054d4657c399293913e Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Mon, 10 Dec 2012 21:40:09 -0800 Subject: Call slaveLost on executor death for standalone clusters. --- .../scheduler/cluster/SparkDeploySchedulerBackend.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7aba7324ab..8f8ae9f409 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,6 +19,7 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + val executorIdToSlaveId = new HashMap[String, String] // Memory used by each executor (in megabytes) val executorMemory = { @@ -65,9 +66,19 @@ private[spark] class SparkDeploySchedulerBackend( } def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { + executorIdToSlaveId += id -> workerId logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( id, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String) {} + def executorRemoved(id: String, message: String) { + logInfo("Executor %s removed: %s".format(id, message)) + executorIdToSlaveId.get(id) match { + case Some(slaveId) => + executorIdToSlaveId.remove(id) + scheduler.slaveLost(slaveId) + case None => + logInfo("No slave ID known for executor %s".format(id)) + } + } } -- cgit v1.2.3