diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala')
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala | 105 |
1 files changed, 60 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 20f6e65020..1e4fbdb874 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -27,19 +27,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] - val taskIdToSlaveId = new HashMap[Long, String] + val taskIdToExecutorId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] // Incrementing Mesos task IDs val nextTaskId = new AtomicLong(0) - // Which hosts in the cluster are alive (contains hostnames) - val hostsAlive = new HashSet[String] + // Which executor IDs we have executors on + val activeExecutorIds = new HashSet[String] - // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] + // The set of executors we have on each host; this is used to compute hostsAlive, which + // in turn is used to decide when we can attain data locality on a given host + val executorsByHost = new HashMap[String, HashSet[String]] - val slaveIdToHost = new HashMap[String, String] + val executorIdToHost = new HashMap[String, String] // JAR server, if any JARs were added by the user to the SparkContext var jarServer: HttpServer = null @@ -85,7 +86,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def submitTasks(taskSet: TaskSet) { + override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { @@ -102,7 +103,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) activeTaskSets -= manager.taskSet.id activeTaskSetsQueue -= manager taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id) - taskIdToSlaveId --= taskSetTaskIds(manager.taskSet.id) + taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id) taskSetTaskIds.remove(manager.taskSet.id) } } @@ -117,8 +118,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname for (o <- offers) { - slaveIdToHost(o.slaveId) = o.hostname - hostsAlive += o.hostname + executorIdToHost(o.executorId) = o.hostname } // Build a list of tasks to assign to each slave val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) @@ -128,16 +128,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) do { launchedTask = false for (i <- 0 until offers.size) { - val sid = offers(i).slaveId + val execId = offers(i).executorId val host = offers(i).hostname - manager.slaveOffer(sid, host, availableCpus(i)) match { + manager.slaveOffer(execId, host, availableCpus(i)) match { case Some(task) => tasks(i) += task val tid = task.taskId taskIdToTaskSetId(tid) = manager.taskSet.id taskSetTaskIds(manager.taskSet.id) += tid - taskIdToSlaveId(tid) = sid - slaveIdsWithExecutors += sid + taskIdToExecutorId(tid) = execId + activeExecutorIds += execId + if (!executorsByHost.contains(host)) { + executorsByHost(host) = new HashSet() + } + executorsByHost(host) += execId availableCpus(i) -= 1 launchedTask = true @@ -152,25 +156,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var taskSetToUpdate: Option[TaskSetManager] = None - var failedHost: Option[String] = None + var failedExecutor: Option[String] = None var taskFailed = false synchronized { try { - if (state == TaskState.LOST && taskIdToSlaveId.contains(tid)) { - // We lost the executor on this slave, so remember that it's gone - val slaveId = taskIdToSlaveId(tid) - val host = slaveIdToHost(slaveId) - if (hostsAlive.contains(host)) { - slaveIdsWithExecutors -= slaveId - hostsAlive -= host - activeTaskSetsQueue.foreach(_.hostLost(host)) - failedHost = Some(host) + if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { + // We lost this entire executor, so remember that it's gone + val execId = taskIdToExecutorId(tid) + if (activeExecutorIds.contains(execId)) { + removeExecutor(execId) + failedExecutor = Some(execId) } } taskIdToTaskSetId.get(tid) match { case Some(taskSetId) => if (activeTaskSets.contains(taskSetId)) { - //activeTaskSets(taskSetId).statusUpdate(status) taskSetToUpdate = Some(activeTaskSets(taskSetId)) } if (TaskState.isFinished(state)) { @@ -178,7 +178,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) if (taskSetTaskIds.contains(taskSetId)) { taskSetTaskIds(taskSetId) -= tid } - taskIdToSlaveId.remove(tid) + taskIdToExecutorId.remove(tid) } if (state == TaskState.FAILED) { taskFailed = true @@ -190,12 +190,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext) case e: Exception => logError("Exception in statusUpdate", e) } } - // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock + // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock if (taskSetToUpdate != None) { taskSetToUpdate.get.statusUpdate(tid, state, serializedData) } - if (failedHost != None) { - listener.hostLost(failedHost.get) + if (failedExecutor != None) { + listener.executorLost(failedExecutor.get) backend.reviveOffers() } if (taskFailed) { @@ -249,27 +249,42 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - def slaveLost(slaveId: String, reason: ExecutorLossReason) { - var failedHost: Option[String] = None + def executorLost(executorId: String, reason: ExecutorLossReason) { + var failedExecutor: Option[String] = None synchronized { - val host = slaveIdToHost(slaveId) - if (hostsAlive.contains(host)) { - logError("Lost an executor on " + host + ": " + reason) - slaveIdsWithExecutors -= slaveId - hostsAlive -= host - activeTaskSetsQueue.foreach(_.hostLost(host)) - failedHost = Some(host) + if (activeExecutorIds.contains(executorId)) { + val host = executorIdToHost(executorId) + logError("Lost executor %s on %s: %s".format(executorId, host, reason)) + removeExecutor(executorId) + failedExecutor = Some(executorId) } else { - // We may get multiple slaveLost() calls with different loss reasons. For example, one - // may be triggered by a dropped connection from the slave while another may be a report - // of executor termination from Mesos. We produce log messages for both so we eventually - // report the termination reason. - logError("Lost an executor on " + host + " (already removed): " + reason) + // We may get multiple executorLost() calls with different loss reasons. For example, one + // may be triggered by a dropped connection from the slave while another may be a report + // of executor termination from Mesos. We produce log messages for both so we eventually + // report the termination reason. + logError("Lost an executor " + executorId + " (already removed): " + reason) } } - if (failedHost != None) { - listener.hostLost(failedHost.get) + // Call listener.executorLost without holding the lock on this to prevent deadlock + if (failedExecutor != None) { + listener.executorLost(failedExecutor.get) backend.reviveOffers() } } + + /** Get a list of hosts that currently have executors */ + def hostsAlive: scala.collection.Set[String] = executorsByHost.keySet + + /** Remove an executor from all our data structures and mark it as lost */ + private def removeExecutor(executorId: String) { + activeExecutorIds -= executorId + val host = executorIdToHost(executorId) + val execs = executorsByHost.getOrElse(host, new HashSet) + execs -= executorId + if (execs.isEmpty) { + executorsByHost -= host + } + executorIdToHost -= executorId + activeTaskSetsQueue.foreach(_.executorLost(executorId, host)) + } } |