aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <root@ip-10-8-17-6.ec2.internal>2013-02-06 09:11:17 +0000
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-10 22:28:28 -0800
commit1b47fa275236657bea358f5c95d89f568c439395 (patch)
treee1aa459fc7067bad67b34d44c4dbd87c2ffc11cd
parent05d2e94838d5b728df203d87708beaf3f4aa4c81 (diff)
downloadspark-1b47fa275236657bea358f5c95d89f568c439395.tar.gz
spark-1b47fa275236657bea358f5c95d89f568c439395.tar.bz2
spark-1b47fa275236657bea358f5c95d89f568c439395.zip
Detect hard crashes of workers using a heartbeat mechanism.
Also fixes some issues in the rest of the code with detecting workers this way. Conflicts: core/src/main/scala/spark/deploy/master/Master.scala core/src/main/scala/spark/deploy/worker/Worker.scala core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala3
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala29
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala20
8 files changed, 62 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 9f05cb4f35..1d88d4bc84 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -30,6 +30,8 @@ case class ExecutorStateChanged(
exitStatus: Option[Int])
extends DeployMessage
+private[spark] case class Heartbeat(workerId: String) extends DeployMessage
+
// Master to Worker
private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
@@ -45,7 +47,6 @@ private[spark] case class LaunchExecutor(
sparkHome: String)
extends DeployMessage
-
// Client to Master
private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 5986281d97..d985261600 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -3,6 +3,7 @@ package spark.deploy.master
import akka.actor._
import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
import java.text.SimpleDateFormat
import java.util.Date
@@ -16,6 +17,7 @@ import spark.util.AkkaUtils
private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
+ val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
var nextJobNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -46,6 +48,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi()
+ context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
}
def startWebUi() {
@@ -111,6 +114,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
+ case Heartbeat(workerId) => {
+ idToWorker.get(workerId) match {
+ case Some(workerInfo) =>
+ workerInfo.lastHeartbeat = System.currentTimeMillis()
+ case None =>
+ logWarning("Got heartbeat from unregistered worker " + workerId)
+ }
+ }
+
case Terminated(actor) => {
// The disconnected actor could've been either a worker or a job; remove whichever of
// those we have an entry for in the corresponding actor hashmap
@@ -219,8 +231,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
- exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
- exec.job.executors -= exec.id
+ logInfo("Telling job of lost executor: " + exec.id)
+ exec.job.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
+ exec.job.removeExecutor(exec)
}
}
@@ -259,6 +272,18 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
nextJobNumber += 1
jobId
}
+
+ /** Check for, and remove, any timed-out workers */
+ def timeOutDeadWorkers() {
+ // Copy the workers into an array so we don't modify the hashset while iterating through it
+ val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
+ val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
+ for (worker <- toRemove) {
+ logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+ worker.id, WORKER_TIMEOUT))
+ removeWorker(worker)
+ }
+ }
}
private[spark] object Master {
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 5a7f5fef8a..2e467007a0 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -18,6 +18,8 @@ private[spark] class WorkerInfo(
var coresUsed = 0
var memoryUsed = 0
+ var lastHeartbeat = System.currentTimeMillis()
+
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 62f01776a9..924935a5fd 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -2,6 +2,7 @@ package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap}
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.util.duration._
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
@@ -26,6 +27,9 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
+ // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
+ val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+
var master: ActorRef = null
var masterWebUiUrl : String = ""
val workerId = generateWorkerId()
@@ -97,6 +101,9 @@ private[spark] class Worker(
case RegisteredWorker(url) =>
masterWebUiUrl = url
logInfo("Successfully registered with master")
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
+ master ! Heartbeat(workerId)
+ }
case RegisterWorkerFailed(message) =>
logError("Worker registration failed: " + message)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
index bba7de6a65..8bf838209f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
@@ -12,10 +12,10 @@ class ExecutorLossReason(val message: String) {
private[spark]
case class ExecutorExited(val exitCode: Int)
- extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+ extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
}
private[spark]
case class SlaveLost(_message: String = "Slave lost")
- extends ExecutorLossReason(_message) {
+ extends ExecutorLossReason(_message) {
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 59ff8bcb90..3c3e83b138 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -67,6 +67,7 @@ private[spark] class SparkDeploySchedulerBackend(
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(executorId, message))
+ removeExecutor(executorId, reason.toString)
scheduler.executorLost(executorId, reason)
}
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index da7dcf4b6b..d766067824 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -37,3 +37,6 @@ object StatusUpdate {
// Internal messages in driver
private[spark] case object ReviveOffers extends StandaloneClusterMessage
private[spark] case object StopDriver extends StandaloneClusterMessage
+
+private[spark] case class RemoveExecutor(executorId: String, reason: String)
+ extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 082022be1c..4213eb8719 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -68,6 +68,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
sender ! true
context.stop(self)
+ case RemoveExecutor(executorId, reason) =>
+ removeExecutor(executorId, reason)
+ sender ! true
+
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
@@ -100,7 +104,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String) {
- logInfo("Slave " + executorId + " disconnected, so removing it")
+ logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId)
actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId)
@@ -139,7 +143,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
} catch {
case e: Exception =>
- throw new SparkException("Error stopping standalone scheduler's master actor", e)
+ throw new SparkException("Error stopping standalone scheduler's driver actor", e)
}
}
@@ -148,6 +152,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
+
+ // Called by backends
+ def removeExecutor(executorId: String, reason: String) {
+ try {
+ val timeout = 5.seconds
+ val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
+ Await.result(future, timeout)
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error notifying standalone scheduler's driver actor", e)
+ }
+ }
}
private[spark] object StandaloneSchedulerBackend {