diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-08-04 09:13:41 -0700 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-08-05 13:13:56 -0700 |
commit | 35d8f5ee521dc1873548a978d27b10644076a0c0 (patch) | |
tree | 91a9a411372bd7439d736a4089c97f291bc1b614 /core | |
parent | 37ccf9301a427095d5bf6a35447f5871e12cdb35 (diff) | |
download | spark-35d8f5ee521dc1873548a978d27b10644076a0c0.tar.gz spark-35d8f5ee521dc1873548a978d27b10644076a0c0.tar.bz2 spark-35d8f5ee521dc1873548a978d27b10644076a0c0.zip |
Moved handling of timed out workers within the Master actor
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/deploy/DeployMessage.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/master/Master.scala | 6 |
2 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 7c37a16615..31861f3ac2 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -109,6 +109,7 @@ private[deploy] object DeployMessages { } // WorkerWebUI to Worker + case object RequestWorkerState // Worker to WorkerWebUI @@ -120,4 +121,9 @@ private[deploy] object DeployMessages { Utils.checkHost(host, "Required hostname") assert (port > 0) } + + // Actor System to Master + + case object CheckForWorkerTimeOut + } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index bd7924c71d..4a4d9908a0 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -80,7 +80,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) webUi.start() - context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) + context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() @@ -176,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RequestMasterState => { sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray) } + + case CheckForWorkerTimeOut => { + timeOutDeadWorkers() + } } /** |