diff options
author | mcheah <mcheah@palantir.com> | 2014-10-20 11:35:18 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-10-20 11:35:18 -0700 |
commit | 4afe9a4852ebeb4cc77322a14225cd3dec165f3f (patch) | |
tree | dde032f9763a07cff5ca59b92730b8cc7def4e8a /core/src/main | |
parent | ea054e1fc70e09e0babcdae2a37f6f7aa6a035f2 (diff) | |
download | spark-4afe9a4852ebeb4cc77322a14225cd3dec165f3f.tar.gz spark-4afe9a4852ebeb4cc77322a14225cd3dec165f3f.tar.bz2 spark-4afe9a4852ebeb4cc77322a14225cd3dec165f3f.zip |
[SPARK-3736] Workers reconnect when disassociated from the master.
Before, if the master node is killed and restarted, the worker nodes
would not attempt to reconnect to the Master. Therefore, when the Master
node was restarted, the worker nodes needed to be restarted as well.
Now, when the Master node is disconnected, the worker nodes will
continuously ping the master node in attempts to reconnect to it. Once
the master node restarts, it will detect one of the registration
requests from its former workers. The result is that the cluster
re-enters a healthy state.
In addition, when the master does not receive a heartbeat from the
worker, the worker was removed; however, when the worker sent a
heartbeat to the master, the master used to ignore the heartbeat. Now,
a master that receives a heartbeat from a worker that had been
disconnected will request the worker to re-attempt the registration
process, at which point the worker will send a RegisterWorker request
and be re-connected accordingly.
Re-connection attempts per worker are submitted every N seconds, where N
is configured by the property spark.worker.reconnect.interval - this has
a default of 60 seconds right now.
Author: mcheah <mcheah@palantir.com>
Closes #2828 from mccheah/reconnect-dead-workers and squashes the following commits:
83f8bc9 [mcheah] [SPARK-3736] More informative log message, and fixing some indentation.
fe0e02f [mcheah] [SPARK-3736] Moving reconnection logic to registerWithMaster().
94ddeca [mcheah] [SPARK-3736] Changing a log warning to a log info.
a698e35 [mcheah] [SPARK-3736] Addressing PR comment to make some defs private.
b9a3077 [mcheah] [SPARK-3736] Addressing PR comments related to reconnection.
2ad5ed5 [mcheah] [SPARK-3736] Cancel attempts to reconnect if the master changes.
b5b34af [mcheah] [SPARK-3736] Workers reconnect when disassociated from the master.
Diffstat (limited to 'core/src/main')
3 files changed, 72 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index a7368f9f3d..b9dd8557ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -71,6 +71,8 @@ private[deploy] object DeployMessages { case class RegisterWorkerFailed(message: String) extends DeployMessage + case class ReconnectWorker(masterUrl: String) extends DeployMessage + case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage case class LaunchExecutor( diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f98b531316..3b6bb9fe12 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -341,7 +341,14 @@ private[spark] class Master( case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => - logWarning("Got heartbeat from unregistered worker " + workerId) + if (workers.map(_.id).contains(workerId)) { + logWarning(s"Got heartbeat from unregistered worker $workerId." + + " Asking it to re-register.") + sender ! ReconnectWorker(masterUrl) + } else { + logWarning(s"Got heartbeat from unregistered worker $workerId." + + " This worker was never registered, so ignoring the heartbeat.") + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 9b52cb06fb..c4a8ec2e5e 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -20,12 +20,14 @@ package org.apache.spark.deploy.worker import java.io.File import java.io.IOException import java.text.SimpleDateFormat -import java.util.Date +import java.util.{UUID, Date} +import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.Random import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} @@ -64,8 +66,22 @@ private[spark] class Worker( // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 - val REGISTRATION_TIMEOUT = 20.seconds - val REGISTRATION_RETRIES = 3 + // Model retries to connect to the master, after Hadoop's model. + // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds) + // Afterwards, the next 10 attempts are between 30 and 90 seconds. + // A bit of randomness is introduced so that not all of the workers attempt to reconnect at + // the same time. + val INITIAL_REGISTRATION_RETRIES = 6 + val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10 + val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500 + val REGISTRATION_RETRY_FUZZ_MULTIPLIER = { + val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits) + randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND + } + val INITIAL_REGISTRATION_RETRY_INTERVAL = (math.round(10 * + REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds + val PROLONGED_REGISTRATION_RETRY_INTERVAL = (math.round(60 + * REGISTRATION_RETRY_FUZZ_MULTIPLIER)).seconds val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false) // How often worker will clean up old app folders @@ -103,6 +119,7 @@ private[spark] class Worker( var coresUsed = 0 var memoryUsed = 0 + var connectionAttemptCount = 0 val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr) val workerSource = new WorkerSource(this) @@ -158,7 +175,7 @@ private[spark] class Worker( connected = true } - def tryRegisterAllMasters() { + private def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) @@ -166,26 +183,47 @@ private[spark] class Worker( } } - def registerWithMaster() { - tryRegisterAllMasters() - var retries = 0 - registrationRetryTimer = Some { - context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - Utils.tryOrExit { - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - System.exit(1) - } else { - tryRegisterAllMasters() + private def retryConnectToMaster() { + Utils.tryOrExit { + connectionAttemptCount += 1 + logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount") + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + registrationRetryTimer = None + } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { + tryRegisterAllMasters() + if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { + registrationRetryTimer.foreach(_.cancel()) + registrationRetryTimer = Some { + context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL, + PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) } } + } else { + logError("All masters are unresponsive! Giving up.") + System.exit(1) } } } + def registerWithMaster() { + // DisassociatedEvent may be triggered multiple times, so don't attempt registration + // if there are outstanding registration attempts scheduled. + registrationRetryTimer match { + case None => + registered = false + tryRegisterAllMasters() + connectionAttemptCount = 0 + registrationRetryTimer = Some { + context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, + INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + } + case Some(_) => + logInfo("Not spawning another attempt to register with the master, since there is an" + + " attempt scheduled already.") + } + } + override def receiveWithLogging = { case RegisteredWorker(masterUrl, masterWebUiUrl) => logInfo("Successfully registered with master " + masterUrl) @@ -243,6 +281,10 @@ private[spark] class Worker( System.exit(1) } + case ReconnectWorker(masterUrl) => + logInfo(s"Master with url $masterUrl requested this worker to reconnect.") + registerWithMaster() + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") @@ -362,9 +404,10 @@ private[spark] class Worker( } } - def masterDisconnected() { + private def masterDisconnected() { logError("Connection to master failed! Waiting for master to reconnect...") connected = false + registerWithMaster() } def generateWorkerId(): String = { |