diff options
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 13 |
1 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7aba7324ab..8f8ae9f409 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -19,6 +19,7 @@ private[spark] class SparkDeploySchedulerBackend( var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + val executorIdToSlaveId = new HashMap[String, String] // Memory used by each executor (in megabytes) val executorMemory = { @@ -65,9 +66,19 @@ private[spark] class SparkDeploySchedulerBackend( } def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { + executorIdToSlaveId += id -> workerId logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( id, host, cores, Utils.memoryMegabytesToString(memory))) } - def executorRemoved(id: String, message: String) {} + def executorRemoved(id: String, message: String) { + logInfo("Executor %s removed: %s".format(id, message)) + executorIdToSlaveId.get(id) match { + case Some(slaveId) => + executorIdToSlaveId.remove(id) + scheduler.slaveLost(slaveId) + case None => + logInfo("No slave ID known for executor %s".format(id)) + } + } } |