aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2014-05-06 12:53:39 -0700
committerMatei Zaharia <matei@databricks.com>2014-05-06 12:53:39 -0700
commitfbfe69de69aa6767d95167711622ec34c59a1f6d (patch)
tree5a57a06c21070e4f672664d9157342ddf4e284d5
parent7b978c1ac59718b85e512c46105b6af641afc3dc (diff)
downloadspark-fbfe69de69aa6767d95167711622ec34c59a1f6d.tar.gz
spark-fbfe69de69aa6767d95167711622ec34c59a1f6d.tar.bz2
spark-fbfe69de69aa6767d95167711622ec34c59a1f6d.zip
[SPARK-1685] Cancel retryTimer on restart of Worker or AppClient
See https://issues.apache.org/jira/browse/SPARK-1685 for a more complete description, but in essence: If the Worker or AppClient actor restarts before successfully registering with Master, multiple retryTimers will be running, which will lead to less than the full number of registration retries being attempted before the new actor is forced to give up. Author: Mark Hamstra <markhamstra@gmail.com> Closes #602 from markhamstra/SPARK-1685 and squashes the following commits: 11cc088 [Mark Hamstra] retryTimer -> registrationRetryTimer 69c348c [Mark Hamstra] Cancel retryTimer on restart of Worker or AppClient
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala13
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala10
2 files changed, 15 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 57085fc337..896913d796 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -60,6 +60,7 @@ private[spark] class AppClient(
var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
+ var registrationRetryTimer: Option[Cancellable] = None
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -83,21 +84,20 @@ private[spark] class AppClient(
def registerWithMaster() {
tryRegisterAllMasters()
-
import context.dispatcher
var retries = 0
- lazy val retryTimer: Cancellable =
+ registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
- retryTimer.cancel()
+ registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
- retryTimer // start timer
+ }
}
def changeMaster(url: String) {
@@ -177,6 +177,11 @@ private[spark] class AppClient(
alreadyDead = true
}
}
+
+ override def postStop() {
+ registrationRetryTimer.foreach(_.cancel())
+ }
+
}
def start() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index cd6bd2cd29..85d25dc7db 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -100,6 +100,8 @@ private[spark] class Worker(
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
val workerSource = new WorkerSource(this)
+ var registrationRetryTimer: Option[Cancellable] = None
+
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
@@ -161,13 +163,12 @@ private[spark] class Worker(
def registerWithMaster() {
tryRegisterAllMasters()
-
var retries = 0
- lazy val retryTimer: Cancellable =
+ registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
- retryTimer.cancel()
+ registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
@@ -175,7 +176,7 @@ private[spark] class Worker(
tryRegisterAllMasters()
}
}
- retryTimer // start timer
+ }
}
override def receive = {
@@ -344,6 +345,7 @@ private[spark] class Worker(
}
override def postStop() {
+ registrationRetryTimer.foreach(_.cancel())
executors.values.foreach(_.kill())
drivers.values.foreach(_.kill())
webUi.stop()