aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2014-11-25 15:46:26 -0800
committerAndrew Or <andrew@databricks.com>2014-11-25 15:46:26 -0800
commit1b2ab1cd1b7cab9076f3c511188a610eda935701 (patch)
tree1a4503dd21aeec0976670dc89a51c11881e923c3 /core
parent8838ad7c135a585cde015dc38b5cb23314502dd9 (diff)
downloadspark-1b2ab1cd1b7cab9076f3c511188a610eda935701.tar.gz
spark-1b2ab1cd1b7cab9076f3c511188a610eda935701.tar.bz2
spark-1b2ab1cd1b7cab9076f3c511188a610eda935701.zip
[SPARK-4592] Avoid duplicate worker registrations in standalone mode
**Summary.** On failover, the Master may receive duplicate registrations from the same worker, causing the worker to exit. This is caused by this commit https://github.com/apache/spark/commit/4afe9a4852ebeb4cc77322a14225cd3dec165f3f, which adds logic for the worker to re-register with the master in case of failures. However, the following race condition may occur: (1) Master A fails and Worker attempts to reconnect to all masters (2) Master B takes over and notifies Worker (3) Worker responds by registering with Master B (4) Meanwhile, Worker's previous reconnection attempt reaches Master B, causing the same Worker to register with Master B twice **Fix.** Instead of attempting to register with all known masters, the worker should re-register with only the one that it has been communicating with. This is safe because the fact that a failover has occurred means the old master must have died. Then, when the worker is finally notified of a new master, it gives up on the old one in favor of the new one. **Caveat.** Even this fix is subject to more obscure race conditions. For instance, if Master B fails and Master A recovers immediately, then Master A may still observe duplicate worker registrations. However, this and other potential race conditions summarized in [SPARK-4592](https://issues.apache.org/jira/browse/SPARK-4592), are much, much less likely than the one described above, which is deterministically reproducible. Author: Andrew Or <andrew@databricks.com> Closes #3447 from andrewor14/standalone-failover and squashes the following commits: 0d9716c [Andrew Or] Move re-registration logic to actor for thread-safety 79286dc [Andrew Or] Preserve old behavior for initial retries 83b321c [Andrew Or] Tweak wording 1fce6a9 [Andrew Or] Active master actor could be null in the beginning b6f269e [Andrew Or] Avoid duplicate worker registrations
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala52
2 files changed, 47 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index b9dd8557ee..c46f84de84 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -92,6 +92,8 @@ private[deploy] object DeployMessages {
case object WorkDirCleanup // Sent to Worker actor periodically for cleaning up app folders
+ case object ReregisterWithMaster // used when a worker attempts to reconnect to a master
+
// AppClient to Master
case class RegisterApplication(appDescription: ApplicationDescription)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ca262de832..eb11163538 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.io.IOException
import java.text.SimpleDateFormat
import java.util.{UUID, Date}
-import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
@@ -177,6 +176,9 @@ private[spark] class Worker(
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
+ // Cancel any outstanding re-registration attempts because we found a new master
+ registrationRetryTimer.foreach(_.cancel())
+ registrationRetryTimer = None
}
private def tryRegisterAllMasters() {
@@ -187,7 +189,12 @@ private[spark] class Worker(
}
}
- private def retryConnectToMaster() {
+ /**
+ * Re-register with the master because a network failure or a master failure has occurred.
+ * If the re-registration attempt threshold is exceeded, the worker exits with error.
+ * Note that for thread-safety this should only be called from the actor.
+ */
+ private def reregisterWithMaster(): Unit = {
Utils.tryOrExit {
connectionAttemptCount += 1
if (registered) {
@@ -195,12 +202,40 @@ private[spark] class Worker(
registrationRetryTimer = None
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
- tryRegisterAllMasters()
+ /**
+ * Re-register with the active master this worker has been communicating with. If there
+ * is none, then it means this worker is still bootstrapping and hasn't established a
+ * connection with a master yet, in which case we should re-register with all masters.
+ *
+ * It is important to re-register only with the active master during failures. Otherwise,
+ * if the worker unconditionally attempts to re-register with all masters, the following
+ * race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592:
+ *
+ * (1) Master A fails and Worker attempts to reconnect to all masters
+ * (2) Master B takes over and notifies Worker
+ * (3) Worker responds by registering with Master B
+ * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B,
+ * causing the same Worker to register with Master B twice
+ *
+ * Instead, if we only register with the known active master, we can assume that the
+ * old master must have died because another master has taken over. Note that this is
+ * still not safe if the old master recovers within this interval, but this is a much
+ * less likely scenario.
+ */
+ if (master != null) {
+ master ! RegisterWorker(
+ workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
+ } else {
+ // We are retrying the initial registration
+ tryRegisterAllMasters()
+ }
+ // We have exceeded the initial registration retry threshold
+ // All retries from now on should use a higher interval
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel())
registrationRetryTimer = Some {
context.system.scheduler.schedule(PROLONGED_REGISTRATION_RETRY_INTERVAL,
- PROLONGED_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
+ PROLONGED_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
}
} else {
@@ -220,7 +255,7 @@ private[spark] class Worker(
connectionAttemptCount = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
- INITIAL_REGISTRATION_RETRY_INTERVAL)(retryConnectToMaster)
+ INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
}
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
@@ -400,12 +435,15 @@ private[spark] class Worker(
logInfo(s"$x Disassociated !")
masterDisconnected()
- case RequestWorkerState => {
+ case RequestWorkerState =>
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, drivers.values.toList,
finishedDrivers.values.toList, activeMasterUrl, cores, memory,
coresUsed, memoryUsed, activeMasterWebUiUrl)
- }
+
+ case ReregisterWithMaster =>
+ reregisterWithMaster()
+
}
private def masterDisconnected() {