diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-12 18:21:52 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-12 18:21:52 -0700 |
commit | e2a67a802447c9a778d57b687dc6321f5fb14283 (patch) | |
tree | 8bb5df00a407ac5c150cf159184891e0d88e3417 | |
parent | be622cf867fc6465b3b8bf81838a7deeb7296e23 (diff) | |
download | spark-e2a67a802447c9a778d57b687dc6321f5fb14283.tar.gz spark-e2a67a802447c9a778d57b687dc6321f5fb14283.tar.bz2 spark-e2a67a802447c9a778d57b687dc6321f5fb14283.zip |
Fixes to coarse-grained Mesos scheduler in dealing with failed nodes
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala | 27 |
2 files changed, 25 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index c3132abd7a..013671c1c8 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -112,7 +112,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor masterActor ! ReviveOffers } - def defaultParallelism(): Int = totalCoreCount.get() + def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) } object StandaloneSchedulerBackend { diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 62a0c5589c..31784985dc 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -41,6 +41,8 @@ class CoarseMesosSchedulerBackend( "SPARK_JAVA_OPTS" ) + val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + // Memory used by each executor (in megabytes) val executorMemory = { if (System.getenv("SPARK_MEM") != null) { @@ -67,6 +69,9 @@ class CoarseMesosSchedulerBackend( val slaveIdsWithExecutors = new HashSet[String] + val taskIdToSlaveId = new HashMap[Int, String] + val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed + val sparkHome = sc.getSparkHome() match { case Some(path) => path @@ -161,10 +166,14 @@ class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) val taskId = newMesosTaskId() + taskIdToSlaveId(taskId) = slaveId + slaveIdsWithExecutors += slaveId + coresByTaskId(taskId) = cpusToUse val task = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) @@ -210,15 +219,27 @@ class CoarseMesosSchedulerBackend( override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue.toInt - logInfo("Mesos task " + taskId + " is now " + status.getState) + val state = status.getState + logInfo("Mesos task " + taskId + " is now " + state) synchronized { - if (isFinished(status.getState)) { + if (isFinished(state)) { + val slaveId = taskIdToSlaveId(taskId) + slaveIdsWithExecutors -= slaveId + taskIdToSlaveId -= taskId // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores coresByTaskId -= taskId - driver.reviveOffers() // In case we'd rejected everything before but have now lost a node } + // If it was a failure, mark the slave as failed for blacklisting purposes + if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) { + failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 + if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { + logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + + "is Spark installed on it?") + } + } + driver.reviveOffers() // In case we'd rejected everything before but have now lost a node } } } |