aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-08-04 09:13:41 -0700
committerMark Hamstra <markhamstra@gmail.com>2013-08-05 13:13:56 -0700
commit35d8f5ee521dc1873548a978d27b10644076a0c0 (patch)
tree91a9a411372bd7439d736a4089c97f291bc1b614 /core
parent37ccf9301a427095d5bf6a35447f5871e12cdb35 (diff)
downloadspark-35d8f5ee521dc1873548a978d27b10644076a0c0.tar.gz
spark-35d8f5ee521dc1873548a978d27b10644076a0c0.tar.bz2
spark-35d8f5ee521dc1873548a978d27b10644076a0c0.zip
Moved handling of timed out workers within the Master actor
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala6
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala6
2 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 7c37a16615..31861f3ac2 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -109,6 +109,7 @@ private[deploy] object DeployMessages {
}
// WorkerWebUI to Worker
+
case object RequestWorkerState
// Worker to WorkerWebUI
@@ -120,4 +121,9 @@ private[deploy] object DeployMessages {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
}
+
+ // Actor System to Master
+
+ case object CheckForWorkerTimeOut
+
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index bd7924c71d..4a4d9908a0 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -80,7 +80,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
- context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
+ context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
@@ -176,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
}
+
+ case CheckForWorkerTimeOut => {
+ timeOutDeadWorkers()
+ }
}
/**