aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-12 18:21:52 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-12 18:21:52 -0700
commite2a67a802447c9a778d57b687dc6321f5fb14283 (patch)
tree8bb5df00a407ac5c150cf159184891e0d88e3417 /core/src
parentbe622cf867fc6465b3b8bf81838a7deeb7296e23 (diff)
downloadspark-e2a67a802447c9a778d57b687dc6321f5fb14283.tar.gz
spark-e2a67a802447c9a778d57b687dc6321f5fb14283.tar.bz2
spark-e2a67a802447c9a778d57b687dc6321f5fb14283.zip
Fixes to coarse-grained Mesos scheduler in dealing with failed nodes
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala27
2 files changed, 25 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index c3132abd7a..013671c1c8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -112,7 +112,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
masterActor ! ReviveOffers
}
- def defaultParallelism(): Int = totalCoreCount.get()
+ def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
}
object StandaloneSchedulerBackend {
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 62a0c5589c..31784985dc 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -41,6 +41,8 @@ class CoarseMesosSchedulerBackend(
"SPARK_JAVA_OPTS"
)
+ val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
+
// Memory used by each executor (in megabytes)
val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
@@ -67,6 +69,9 @@ class CoarseMesosSchedulerBackend(
val slaveIdsWithExecutors = new HashSet[String]
+ val taskIdToSlaveId = new HashMap[Int, String]
+ val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
+
val sparkHome = sc.getSparkHome() match {
case Some(path) =>
path
@@ -161,10 +166,14 @@ class CoarseMesosSchedulerBackend(
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
+ failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
val taskId = newMesosTaskId()
+ taskIdToSlaveId(taskId) = slaveId
+ slaveIdsWithExecutors += slaveId
+ coresByTaskId(taskId) = cpusToUse
val task = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
@@ -210,15 +219,27 @@ class CoarseMesosSchedulerBackend(
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
- logInfo("Mesos task " + taskId + " is now " + status.getState)
+ val state = status.getState
+ logInfo("Mesos task " + taskId + " is now " + state)
synchronized {
- if (isFinished(status.getState)) {
+ if (isFinished(state)) {
+ val slaveId = taskIdToSlaveId(taskId)
+ slaveIdsWithExecutors -= slaveId
+ taskIdToSlaveId -= taskId
// Remove the cores we have remembered for this task, if it's in the hashmap
for (cores <- coresByTaskId.get(taskId)) {
totalCoresAcquired -= cores
coresByTaskId -= taskId
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
}
+ // If it was a failure, mark the slave as failed for blacklisting purposes
+ if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
+ failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
+ if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
+ logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+ "is Spark installed on it?")
+ }
+ }
+ driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
}
}
}