From 1b2ab1cd1b7cab9076f3c511188a610eda935701 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 25 Nov 2014 15:46:26 -0800 Subject: [SPARK-4592] Avoid duplicate worker registrations in standalone mode **Summary.** On failover, the Master may receive duplicate registrations from the same worker, causing the worker to exit. This is caused by this commit https://github.com/apache/spark/commit/4afe9a4852ebeb4cc77322a14225cd3dec165f3f, which adds logic for the worker to re-register with the master in case of failures. However, the following race condition may occur: (1) Master A fails and Worker attempts to reconnect to all masters (2) Master B takes over and notifies Worker (3) Worker responds by registering with Master B (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, causing the same Worker to register with Master B twice **Fix.** Instead of attempting to register with all known masters, the worker should re-register with only the one that it has been communicating with. This is safe because the fact that a failover has occurred means the old master must have died. Then, when the worker is finally notified of a new master, it gives up on the old one in favor of the new one. **Caveat.** Even this fix is subject to more obscure race conditions. For instance, if Master B fails and Master A recovers immediately, then Master A may still observe duplicate worker registrations. However, this and other potential race conditions summarized in [SPARK-4592](https://issues.apache.org/jira/browse/SPARK-4592), are much, much less likely than the one described above, which is deterministically reproducible. Author: Andrew Or Closes #3447 from andrewor14/standalone-failover and squashes the following commits: 0d9716c [Andrew Or] Move re-registration logic to actor for thread-safety 79286dc [Andrew Or] Preserve old behavior for initial retries 83b321c [Andrew Or] Tweak wording 1fce6a9 [Andrew Or] Active master actor could be null in the beginning b6f269e [Andrew Or] Avoid duplicate worker registrations --- .../org/apache/spark/deploy/DeployMessage.scala | 2 + .../org/apache/spark/deploy/worker/Worker.scala | 52 +++++++++++++++++++--- 2 files changed, 47 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b9dd8557ee..c46f84de84 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -92,6 +92,8 @@ private[deploy] object DeployMessages { case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders + case object ReregisterWithMaster // used when a worker attempts to reconnect to a master + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ca262de832..eb11163538 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -21,7 +21,6 @@ import java.io.File import java.io.IOException import java.text.SimpleDateFormat import java.util.{UUID, Date} -import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap @@ -177,6 +176,9 @@ private[spark] class Worker( throw new SparkException("Invalid spark URL: " + x) } connected = true + // Cancel any outstanding re-registration attempts because we found a new master + registrationRetryTimer.foreach(_.cancel()) + registrationRetryTimer = None } private def tryRegisterAllMasters() { @@ -187,7 +189,12 @@ private[spark] class Worker( } } - private def retryConnectToMaster() { + /** + * Re-register with the master because a network failure or a master failure has occurred. + * If the re-registration attempt threshold is exceeded, the worker exits with error. + * Note that for thread-safety this should only be called from the actor. + */ + private def reregisterWithMaster(): Unit = { Utils.tryOrExit { connectionAttemptCount += 1 if (registered) { @@ -195,12 +202,40 @@ private[spark] class Worker( registrationRetryTimer = None } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)") - tryRegisterAllMasters() + /** + * Re-register with the active master this worker has been communicating with. If there + * is none, then it means this worker is still bootstrapping and hasn't established a + * connection with a master yet, in which case we should re-register with all masters. + * + * It is important to re-register only with the active master during failures. Otherwise, + * if the worker unconditionally attempts to re-register with all masters, the following + * race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592: + * + * (1) Master A fails and Worker attempts to reconnect to all masters + * (2) Master B takes over and notifies Worker + * (3) Worker responds by registering with Master B + * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, + * causing the same Worker to register with Master B twice + * + * Instead, if we only register with the known active master, we can assume that the + * old master must have died because another master has taken over. Note that this is + * still not safe if the old master recovers within this interval, but this is a much + * less likely scenario. + */ + if (master != null) { + master ! RegisterWorker( + workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + } else { + // We are retrying the initial registration + tryRegisterAllMasters() + } + // We have exceeded the initial registration retry threshold + // All retries from now on should use a higher interval if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) { registrationRetryTimer.foreach(_.cancel()) registrationRetryTimer = Some { context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL, - PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + PROLONGED_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } } } else { @@ -220,7 +255,7 @@ private[spark] class Worker( connectionAttemptCount = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL, - INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster) + INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster) } case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + @@ -400,12 +435,15 @@ private[spark] class Worker( logInfo(s"$x Disassociated !") masterDisconnected() - case RequestWorkerState => { + case RequestWorkerState => sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, drivers.values.toList, finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) - } + + case ReregisterWithMaster => + reregisterWithMaster() + } private def masterDisconnected() { -- cgit v1.2.3