aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormcheah <mcheah@palantir.com>2014-10-20 11:35:18 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-20 11:35:18 -0700
commit4afe9a4852ebeb4cc77322a14225cd3dec165f3f (patch)
treedde032f9763a07cff5ca59b92730b8cc7def4e8a
parentea054e1fc70e09e0babcdae2a37f6f7aa6a035f2 (diff)
downloadspark-4afe9a4852ebeb4cc77322a14225cd3dec165f3f.tar.gz
spark-4afe9a4852ebeb4cc77322a14225cd3dec165f3f.tar.bz2
spark-4afe9a4852ebeb4cc77322a14225cd3dec165f3f.zip
[SPARK-3736] Workers reconnect when disassociated from the master.
Before, if the master node is killed and restarted, the worker nodes would not attempt to reconnect to the Master. Therefore, when the Master node was restarted, the worker nodes needed to be restarted as well. Now, when the Master node is disconnected, the worker nodes will continuously ping the master node in attempts to reconnect to it. Once the master node restarts, it will detect one of the registration requests from its former workers. The result is that the cluster re-enters a healthy state. In addition, when the master does not receive a heartbeat from the worker, the worker was removed; however, when the worker sent a heartbeat to the master, the master used to ignore the heartbeat. Now, a master that receives a heartbeat from a worker that had been disconnected will request the worker to re-attempt the registration process, at which point the worker will send a RegisterWorker request and be re-connected accordingly. Re-connection attempts per worker are submitted every N seconds, where N is configured by the property spark.worker.reconnect.interval - this has a default of 60 seconds right now. Author: mcheah <mcheah@palantir.com> Closes #2828 from mccheah/reconnect-dead-workers and squashes the following commits: 83f8bc9 [mcheah] [SPARK-3736] More informative log message, and fixing some indentation. fe0e02f [mcheah] [SPARK-3736] Moving reconnection logic to registerWithMaster(). 94ddeca [mcheah] [SPARK-3736] Changing a log warning to a log info. a698e35 [mcheah] [SPARK-3736] Addressing PR comment to make some defs private. b9a3077 [mcheah] [SPARK-3736] Addressing PR comments related to reconnection. 2ad5ed5 [mcheah] [SPARK-3736] Cancel attempts to reconnect if the master changes. b5b34af [mcheah] [SPARK-3736] Workers reconnect when disassociated from the master.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala9
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala81
3 files changed, 72 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index a7368f9f3d..b9dd8557ee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -71,6 +71,8 @@ private[deploy] object DeployMessages {
case class RegisterWorkerFailed(message: String) extends DeployMessage
+ case class ReconnectWorker(masterUrl: String) extends DeployMessage
+
case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
case class LaunchExecutor(
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f98b531316..3b6bb9fe12 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -341,7 +341,14 @@ private[spark] class Master(
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
- logWarning("Got heartbeat from unregistered worker " + workerId)
+ if (workers.map(_.id).contains(workerId)) {
+ logWarning(s"Got heartbeat from unregistered worker $workerId." +
+ " Asking it to re-register.")
+ sender ! ReconnectWorker(masterUrl)
+ } else {
+ logWarning(s"Got heartbeat from unregistered worker $workerId." +
+ " This worker was never registered, so ignoring the heartbeat.")
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 9b52cb06fb..c4a8ec2e5e 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -20,12 +20,14 @@ package org.apache.spark.deploy.worker
import java.io.File
import java.io.IOException
import java.text.SimpleDateFormat
-import java.util.Date
+import java.util.{UUID, Date}
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import scala.language.postfixOps
+import scala.util.Random
import akka.actor._
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
@@ -64,8 +66,22 @@ private[spark] class Worker(
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
- val REGISTRATION_TIMEOUT = 20.seconds
- val REGISTRATION_RETRIES = 3
+ // Model retries to connect to the master, after Hadoop's model.
+ // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
+ // Afterwards, the next 10 attempts are between 30 and 90 seconds.
+ // A bit of randomness is introduced so that not all of the workers attempt to reconnect at
+ // the same time.
+ val INITIAL_REGISTRATION_RETRIES = 6
+ val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
+ val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
+ val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
+ val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
+ randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
+ }
+ val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 *
+ REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
+ val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60
+ * REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds
val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
@@ -103,6 +119,7 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
+ var connectionAttemptCount = 0
val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
val workerSource = new WorkerSource(this)
@@ -158,7 +175,7 @@ private[spark] class Worker(
connected = true
}
- def tryRegisterAllMasters() {
+ private def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
@@ -166,26 +183,47 @@ private[spark] class Worker(
}
}
- def registerWithMaster() {
- tryRegisterAllMasters()
- var retries = 0
- registrationRetryTimer = Some {
- context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
- Utils.tryOrExit {
- retries += 1
- if (registered) {
- registrationRetryTimer.foreach(_.cancel())
- } else if (retries >= REGISTRATION_RETRIES) {
- logError("All masters are unresponsive! Giving up.")
- System.exit(1)
- } else {
- tryRegisterAllMasters()
+ private def retryConnectToMaster() {
+ Utils.tryOrExit {
+ connectionAttemptCount += 1
+ logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount")
+ if (registered) {
+ registrationRetryTimer.foreach(_.cancel())
+ registrationRetryTimer = None
+ } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
+ tryRegisterAllMasters()
+ if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
+ registrationRetryTimer.foreach(_.cancel())
+ registrationRetryTimer = Some {
+ context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL,
+ PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
}
}
+ } else {
+ logError("All masters are unresponsive! Giving up.")
+ System.exit(1)
}
}
}
+ def registerWithMaster() {
+ // DisassociatedEvent may be triggered multiple times, so don't attempt registration
+ // if there are outstanding registration attempts scheduled.
+ registrationRetryTimer match {
+ case None =>
+ registered = false
+ tryRegisterAllMasters()
+ connectionAttemptCount = 0
+ registrationRetryTimer = Some {
+ context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
+ INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
+ }
+ case Some(_) =>
+ logInfo("Not spawning another attempt to register with the master, since there is an" +
+ " attempt scheduled already.")
+ }
+ }
+
override def receiveWithLogging = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
@@ -243,6 +281,10 @@ private[spark] class Worker(
System.exit(1)
}
+ case ReconnectWorker(masterUrl) =>
+ logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
+ registerWithMaster()
+
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
@@ -362,9 +404,10 @@ private[spark] class Worker(
}
}
- def masterDisconnected() {
+ private def masterDisconnected() {
logError("Connection to master failed! Waiting for master to reconnect...")
connected = false
+ registerWithMaster()
}
def generateWorkerId(): String = {