diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2014-05-06 12:53:39 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-05-06 12:53:39 -0700 |
commit | fbfe69de69aa6767d95167711622ec34c59a1f6d (patch) | |
tree | 5a57a06c21070e4f672664d9157342ddf4e284d5 | |
parent | 7b978c1ac59718b85e512c46105b6af641afc3dc (diff) | |
download | spark-fbfe69de69aa6767d95167711622ec34c59a1f6d.tar.gz spark-fbfe69de69aa6767d95167711622ec34c59a1f6d.tar.bz2 spark-fbfe69de69aa6767d95167711622ec34c59a1f6d.zip |
[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient
See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete description, but in essence: If the Worker or AppClient actor restarts before successfully registering with Master, multiple retryTimers will be running, which will lead to less than the full number of registration retries being attempted before the new actor is forced to give up.
Author: Mark Hamstra <markhamstra@gmail.com>
Closes #602 from markhamstra/SPARK-1685 and squashes the following commits:
11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer
69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala | 13 | ||||
-rwxr-xr-x | core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 10 |
2 files changed, 15 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 57085fc337..896913d796 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -60,6 +60,7 @@ private[spark] class AppClient( var master: ActorSelection = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times + var registrationRetryTimer: Option[Cancellable] = None override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -83,21 +84,20 @@ private[spark] class AppClient( def registerWithMaster() { tryRegisterAllMasters() - import context.dispatcher var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + registrationRetryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { markDead("All masters are unresponsive! Giving up.") } else { tryRegisterAllMasters() } } - retryTimer // start timer + } } def changeMaster(url: String) { @@ -177,6 +177,11 @@ private[spark] class AppClient( alreadyDead = true } } + + override def postStop() { + registrationRetryTimer.foreach(_.cancel()) + } + } def start() { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index cd6bd2cd29..85d25dc7db 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -100,6 +100,8 @@ private[spark] class Worker( val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) val workerSource = new WorkerSource(this) + var registrationRetryTimer: Option[Cancellable] = None + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -161,13 +163,12 @@ private[spark] class Worker( def registerWithMaster() { tryRegisterAllMasters() - var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { retries += 1 if (registered) { - retryTimer.cancel() + registrationRetryTimer.foreach(_.cancel()) } else if (retries >= REGISTRATION_RETRIES) { logError("All masters are unresponsive! Giving up.") System.exit(1) @@ -175,7 +176,7 @@ private[spark] class Worker( tryRegisterAllMasters() } } - retryTimer // start timer + } } override def receive = { @@ -344,6 +345,7 @@ private[spark] class Worker( } override def postStop() { + registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) webUi.stop() |