aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-06 13:30:35 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-10 22:43:38 -0800
commitda8afbc77e5796d45686034db5560f18c057d3c9 (patch)
treebfcfa5f78cdf0996b3c868a6035992944ae2a2ed
parent1b47fa275236657bea358f5c95d89f568c439395 (diff)
downloadspark-da8afbc77e5796d45686034db5560f18c057d3c9.tar.gz
spark-da8afbc77e5796d45686034db5560f18c057d3c9.tar.bz2
spark-da8afbc77e5796d45686034db5560f18c057d3c9.zip
Some bug and formatting fixes to FT
Conflicts: core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala6
5 files changed, 21 insertions, 16 deletions
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index a63eee1233..e01181d1b2 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -107,7 +107,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = 1.seconds
+ val timeout = 5.seconds
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index d985261600..a5de23261c 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -117,8 +117,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
case Heartbeat(workerId) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
- workerInfo.lastHeartbeat = System.currentTimeMillis()
- case None =>
+ workerInfo.lastHeartbeat = System.currentTimeMillis()
+ case None =>
logWarning("Got heartbeat from unregistered worker " + workerId)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 3c3e83b138..e77355c6cd 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -68,6 +68,5 @@ private[spark] class SparkDeploySchedulerBackend(
}
logInfo("Executor %s removed: %s".format(executorId, message))
removeExecutor(executorId, reason.toString)
- scheduler.executorLost(executorId, reason)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 4213eb8719..d606432572 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -104,16 +104,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String) {
- logInfo("Executor " + executorId + " disconnected, so removing it")
- val numCores = freeCores(executorId)
- actorToExecutorId -= executorActor(executorId)
- addressToExecutorId -= executorAddress(executorId)
- executorActor -= executorId
- executorHost -= executorId
- freeCores -= executorId
- executorHost -= executorId
- totalCoreCount.addAndGet(-numCores)
- scheduler.executorLost(executorId, SlaveLost(reason))
+ if (executorActor.contains(executorId)) {
+ logInfo("Executor " + executorId + " disconnected, so removing it")
+ val numCores = freeCores(executorId)
+ actorToExecutorId -= executorActor(executorId)
+ addressToExecutorId -= executorAddress(executorId)
+ executorActor -= executorId
+ executorHost -= executorId
+ freeCores -= executorId
+ executorHost -= executorId
+ totalCoreCount.addAndGet(-numCores)
+ scheduler.executorLost(executorId, SlaveLost(reason))
+ }
}
}
@@ -153,7 +155,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
- // Called by backends
+ // Called by subclasses when notified of a lost worker
def removeExecutor(executorId: String, reason: String) {
try {
val timeout = 5.seconds
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index b481ec0a72..7caf06e917 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -239,7 +239,11 @@ private[spark] class CoarseMesosSchedulerBackend(
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
+ if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
+ // Note that the slave ID corresponds to the executor ID on that slave
+ slaveIdsWithExecutors -= slaveId.getValue
+ removeExecutor(slaveId.getValue, "Mesos slave lost")
+ }
}
}