diff options
author | root <root@ip-10-8-17-6.ec2.internal> | 2013-02-06 09:11:17 +0000 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-10 22:28:28 -0800 |
commit | 1b47fa275236657bea358f5c95d89f568c439395 (patch) | |
tree | e1aa459fc7067bad67b34d44c4dbd87c2ffc11cd /core/src | |
parent | 05d2e94838d5b728df203d87708beaf3f4aa4c81 (diff) | |
download | spark-1b47fa275236657bea358f5c95d89f568c439395.tar.gz spark-1b47fa275236657bea358f5c95d89f568c439395.tar.bz2 spark-1b47fa275236657bea358f5c95d89f568c439395.zip |
Detect hard crashes of workers using a heartbeat mechanism.
Also fixes some issues in the rest of the code with detecting workers this way.
Conflicts:
core/src/main/scala/spark/deploy/master/Master.scala
core/src/main/scala/spark/deploy/worker/Worker.scala
core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
Diffstat (limited to 'core/src')
8 files changed, 62 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 9f05cb4f35..1d88d4bc84 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -30,6 +30,8 @@ case class ExecutorStateChanged( exitStatus: Option[Int]) extends DeployMessage +private[spark] case class Heartbeat(workerId: String) extends DeployMessage + // Master to Worker private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage @@ -45,7 +47,6 @@ private[spark] case class LaunchExecutor( sparkHome: String) extends DeployMessage - // Client to Master private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 5986281d97..d985261600 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -3,6 +3,7 @@ package spark.deploy.master import akka.actor._ import akka.actor.Terminated import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} +import akka.util.duration._ import java.text.SimpleDateFormat import java.util.Date @@ -16,6 +17,7 @@ import spark.util.AkkaUtils private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs + val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 var nextJobNumber = 0 val workers = new HashSet[WorkerInfo] @@ -46,6 +48,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) startWebUi() + context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) } def startWebUi() { @@ -111,6 +114,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } + case Heartbeat(workerId) => { + idToWorker.get(workerId) match { + case Some(workerInfo) => + workerInfo.lastHeartbeat = System.currentTimeMillis() + case None => + logWarning("Got heartbeat from unregistered worker " + workerId) + } + } + case Terminated(actor) => { // The disconnected actor could've been either a worker or a job; remove whichever of // those we have an entry for in the corresponding actor hashmap @@ -219,8 +231,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address for (exec <- worker.executors.values) { - exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None) - exec.job.executors -= exec.id + logInfo("Telling job of lost executor: " + exec.id) + exec.job.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None) + exec.job.removeExecutor(exec) } } @@ -259,6 +272,18 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor nextJobNumber += 1 jobId } + + /** Check for, and remove, any timed-out workers */ + def timeOutDeadWorkers() { + // Copy the workers into an array so we don't modify the hashset while iterating through it + val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT + val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray + for (worker <- toRemove) { + logWarning("Removing %s because we got no heartbeat in %d seconds".format( + worker.id, WORKER_TIMEOUT)) + removeWorker(worker) + } + } } private[spark] object Master { diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala index 5a7f5fef8a..2e467007a0 100644 --- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala @@ -18,6 +18,8 @@ private[spark] class WorkerInfo( var coresUsed = 0 var memoryUsed = 0 + var lastHeartbeat = System.currentTimeMillis() + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 62f01776a9..924935a5fd 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -2,6 +2,7 @@ package spark.deploy.worker import scala.collection.mutable.{ArrayBuffer, HashMap} import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} +import akka.util.duration._ import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ @@ -26,6 +27,9 @@ private[spark] class Worker( val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs + // Send a heartbeat every (heartbeat timeout) / 4 milliseconds + val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4 + var master: ActorRef = null var masterWebUiUrl : String = "" val workerId = generateWorkerId() @@ -97,6 +101,9 @@ private[spark] class Worker( case RegisteredWorker(url) => masterWebUiUrl = url logInfo("Successfully registered with master") + context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { + master ! Heartbeat(workerId) + } case RegisterWorkerFailed(message) => logError("Worker registration failed: " + message) diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala index bba7de6a65..8bf838209f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala @@ -12,10 +12,10 @@ class ExecutorLossReason(val message: String) { private[spark] case class ExecutorExited(val exitCode: Int) - extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) { + extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) { } private[spark] case class SlaveLost(_message: String = "Slave lost") - extends ExecutorLossReason(_message) { + extends ExecutorLossReason(_message) { } diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 59ff8bcb90..3c3e83b138 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -67,6 +67,7 @@ private[spark] class SparkDeploySchedulerBackend( case None => SlaveLost(message) } logInfo("Executor %s removed: %s".format(executorId, message)) + removeExecutor(executorId, reason.toString) scheduler.executorLost(executorId, reason) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index da7dcf4b6b..d766067824 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -37,3 +37,6 @@ object StatusUpdate { // Internal messages in driver private[spark] case object ReviveOffers extends StandaloneClusterMessage private[spark] case object StopDriver extends StandaloneClusterMessage + +private[spark] case class RemoveExecutor(executorId: String, reason: String) + extends StandaloneClusterMessage diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 082022be1c..4213eb8719 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -68,6 +68,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor sender ! true context.stop(self) + case RemoveExecutor(executorId, reason) => + removeExecutor(executorId, reason) + sender ! true + case Terminated(actor) => actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) @@ -100,7 +104,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Remove a disconnected slave from the cluster def removeExecutor(executorId: String, reason: String) { - logInfo("Slave " + executorId + " disconnected, so removing it") + logInfo("Executor " + executorId + " disconnected, so removing it") val numCores = freeCores(executorId) actorToExecutorId -= executorActor(executorId) addressToExecutorId -= executorAddress(executorId) @@ -139,7 +143,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } } catch { case e: Exception => - throw new SparkException("Error stopping standalone scheduler's master actor", e) + throw new SparkException("Error stopping standalone scheduler's driver actor", e) } } @@ -148,6 +152,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor } override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) + + // Called by backends + def removeExecutor(executorId: String, reason: String) { + try { + val timeout = 5.seconds + val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) + Await.result(future, timeout) + } catch { + case e: Exception => + throw new SparkException("Error notifying standalone scheduler's driver actor", e) + } + } } private[spark] object StandaloneSchedulerBackend { |