aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala105
1 files changed, 60 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 20f6e65020..1e4fbdb874 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -27,19 +27,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
- val taskIdToSlaveId = new HashMap[Long, String]
+ val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
// Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0)
- // Which hosts in the cluster are alive (contains hostnames)
- val hostsAlive = new HashSet[String]
+ // Which executor IDs we have executors on
+ val activeExecutorIds = new HashSet[String]
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
+ // The set of executors we have on each host; this is used to compute hostsAlive, which
+ // in turn is used to decide when we can attain data locality on a given host
+ val executorsByHost = new HashMap[String, HashSet[String]]
- val slaveIdToHost = new HashMap[String, String]
+ val executorIdToHost = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
@@ -85,7 +86,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- def submitTasks(taskSet: TaskSet) {
+ override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
@@ -102,7 +103,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets -= manager.taskSet.id
activeTaskSetsQueue -= manager
taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
- taskIdToSlaveId --= taskSetTaskIds(manager.taskSet.id)
+ taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
taskSetTaskIds.remove(manager.taskSet.id)
}
}
@@ -117,8 +118,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname
for (o <- offers) {
- slaveIdToHost(o.slaveId) = o.hostname
- hostsAlive += o.hostname
+ executorIdToHost(o.executorId) = o.hostname
}
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
@@ -128,16 +128,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
do {
launchedTask = false
for (i <- 0 until offers.size) {
- val sid = offers(i).slaveId
+ val execId = offers(i).executorId
val host = offers(i).hostname
- manager.slaveOffer(sid, host, availableCpus(i)) match {
+ manager.slaveOffer(execId, host, availableCpus(i)) match {
case Some(task) =>
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = manager.taskSet.id
taskSetTaskIds(manager.taskSet.id) += tid
- taskIdToSlaveId(tid) = sid
- slaveIdsWithExecutors += sid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ if (!executorsByHost.contains(host)) {
+ executorsByHost(host) = new HashSet()
+ }
+ executorsByHost(host) += execId
availableCpus(i) -= 1
launchedTask = true
@@ -152,25 +156,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var taskSetToUpdate: Option[TaskSetManager] = None
- var failedHost: Option[String] = None
+ var failedExecutor: Option[String] = None
var taskFailed = false
synchronized {
try {
- if (state == TaskState.LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- val slaveId = taskIdToSlaveId(tid)
- val host = slaveIdToHost(slaveId)
- if (hostsAlive.contains(host)) {
- slaveIdsWithExecutors -= slaveId
- hostsAlive -= host
- activeTaskSetsQueue.foreach(_.hostLost(host))
- failedHost = Some(host)
+ if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
+ // We lost this entire executor, so remember that it's gone
+ val execId = taskIdToExecutorId(tid)
+ if (activeExecutorIds.contains(execId)) {
+ removeExecutor(execId)
+ failedExecutor = Some(execId)
}
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
if (activeTaskSets.contains(taskSetId)) {
- //activeTaskSets(taskSetId).statusUpdate(status)
taskSetToUpdate = Some(activeTaskSets(taskSetId))
}
if (TaskState.isFinished(state)) {
@@ -178,7 +178,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (taskSetTaskIds.contains(taskSetId)) {
taskSetTaskIds(taskSetId) -= tid
}
- taskIdToSlaveId.remove(tid)
+ taskIdToExecutorId.remove(tid)
}
if (state == TaskState.FAILED) {
taskFailed = true
@@ -190,12 +190,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
case e: Exception => logError("Exception in statusUpdate", e)
}
}
- // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock
+ // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock
if (taskSetToUpdate != None) {
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
}
- if (failedHost != None) {
- listener.hostLost(failedHost.get)
+ if (failedExecutor != None) {
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
if (taskFailed) {
@@ -249,27 +249,42 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- def slaveLost(slaveId: String, reason: ExecutorLossReason) {
- var failedHost: Option[String] = None
+ def executorLost(executorId: String, reason: ExecutorLossReason) {
+ var failedExecutor: Option[String] = None
synchronized {
- val host = slaveIdToHost(slaveId)
- if (hostsAlive.contains(host)) {
- logError("Lost an executor on " + host + ": " + reason)
- slaveIdsWithExecutors -= slaveId
- hostsAlive -= host
- activeTaskSetsQueue.foreach(_.hostLost(host))
- failedHost = Some(host)
+ if (activeExecutorIds.contains(executorId)) {
+ val host = executorIdToHost(executorId)
+ logError("Lost executor %s on %s: %s".format(executorId, host, reason))
+ removeExecutor(executorId)
+ failedExecutor = Some(executorId)
} else {
- // We may get multiple slaveLost() calls with different loss reasons. For example, one
- // may be triggered by a dropped connection from the slave while another may be a report
- // of executor termination from Mesos. We produce log messages for both so we eventually
- // report the termination reason.
- logError("Lost an executor on " + host + " (already removed): " + reason)
+ // We may get multiple executorLost() calls with different loss reasons. For example, one
+ // may be triggered by a dropped connection from the slave while another may be a report
+ // of executor termination from Mesos. We produce log messages for both so we eventually
+ // report the termination reason.
+ logError("Lost an executor " + executorId + " (already removed): " + reason)
}
}
- if (failedHost != None) {
- listener.hostLost(failedHost.get)
+ // Call listener.executorLost without holding the lock on this to prevent deadlock
+ if (failedExecutor != None) {
+ listener.executorLost(failedExecutor.get)
backend.reviveOffers()
}
}
+
+ /** Get a list of hosts that currently have executors */
+ def hostsAlive: scala.collection.Set[String] = executorsByHost.keySet
+
+ /** Remove an executor from all our data structures and mark it as lost */
+ private def removeExecutor(executorId: String) {
+ activeExecutorIds -= executorId
+ val host = executorIdToHost(executorId)
+ val execs = executorsByHost.getOrElse(host, new HashSet)
+ execs -= executorId
+ if (execs.isEmpty) {
+ executorsByHost -= host
+ }
+ executorIdToHost -= executorId
+ activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
+ }
}