aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala13
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala10
2 files changed, 15 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 57085fc337..896913d796 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -60,6 +60,7 @@ private[spark] class AppClient(
var master: ActorSelection = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
var alreadyDead = false // To avoid calling listener.dead() multiple times
+ var registrationRetryTimer: Option[Cancellable] = None
override def preStart() {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -83,21 +84,20 @@ private[spark] class AppClient(
def registerWithMaster() {
tryRegisterAllMasters()
-
import context.dispatcher
var retries = 0
- lazy val retryTimer: Cancellable =
+ registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
- retryTimer.cancel()
+ registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
- retryTimer // start timer
+ }
}
def changeMaster(url: String) {
@@ -177,6 +177,11 @@ private[spark] class AppClient(
alreadyDead = true
}
}
+
+ override def postStop() {
+ registrationRetryTimer.foreach(_.cancel())
+ }
+
}
def start() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index cd6bd2cd29..85d25dc7db 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -100,6 +100,8 @@ private[spark] class Worker(
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
val workerSource = new WorkerSource(this)
+ var registrationRetryTimer: Option[Cancellable] = None
+
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
@@ -161,13 +163,12 @@ private[spark] class Worker(
def registerWithMaster() {
tryRegisterAllMasters()
-
var retries = 0
- lazy val retryTimer: Cancellable =
+ registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
retries += 1
if (registered) {
- retryTimer.cancel()
+ registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
@@ -175,7 +176,7 @@ private[spark] class Worker(
tryRegisterAllMasters()
}
}
- retryTimer // start timer
+ }
}
override def receive = {
@@ -344,6 +345,7 @@ private[spark] class Worker(
}
override def postStop() {
+ registrationRetryTimer.foreach(_.cancel())
executors.values.foreach(_.kill())
drivers.values.foreach(_.kill())
webUi.stop()