From 7ab0ce6501c37f0fc3a49e3332573ae4e4def3e8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 19 Oct 2015 16:14:50 -0700 Subject: [SPARK-11131][CORE] Fix race in worker registration protocol. Because the registration RPC was not really an RPC, but a bunch of disconnected messages, it was possible for other messages to be sent before the reply to the registration arrived, and that would confuse the Worker. Especially in local-cluster mode, the worker was succeptible to receiving an executor request before it received a message from the master saying registration succeeded. On top of the above, the change also fixes a ClassCastException when the registration fails, which also affects the executor registration protocol. Because the `ask` is issued with a specific return type, if the error message (of a different type) was returned instead, the code would just die with an exception. This is fixed by having a common base trait for these reply messages. Author: Marcelo Vanzin Closes #9138 from vanzin/SPARK-11131. --- .../org/apache/spark/deploy/DeployMessage.scala | 7 ++- .../org/apache/spark/deploy/master/Master.scala | 50 +++++++-------- .../org/apache/spark/deploy/worker/Worker.scala | 73 ++++++++++++++-------- .../executor/CoarseGrainedExecutorBackend.scala | 4 +- .../cluster/CoarseGrainedClusterMessage.scala | 4 ++ .../org/apache/spark/HeartbeatReceiverSuite.scala | 4 +- 6 files changed, 86 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index d8084a5765..3feb7cea59 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -69,9 +69,14 @@ private[deploy] object DeployMessages { // Master to Worker + sealed trait RegisterWorkerResponse + case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage + with RegisterWorkerResponse + + case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse - case class RegisterWorkerFailed(message: String) extends DeployMessage + case object MasterInStandby extends DeployMessage with RegisterWorkerResponse case class ReconnectWorker(masterUrl: String) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d518e92133..6715d6c70f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -233,31 +233,6 @@ private[deploy] class Master( System.exit(0) } - case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { - logInfo("Registering worker %s:%d with %d cores, %s RAM".format( - workerHost, workerPort, cores, Utils.megabytesToString(memory))) - if (state == RecoveryState.STANDBY) { - // ignore, don't send response - } else if (idToWorker.contains(id)) { - workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) - } else { - val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerUiPort, publicAddress) - if (registerWorker(worker)) { - persistenceEngine.addWorker(worker) - workerRef.send(RegisteredWorker(self, masterWebUiUrl)) - schedule() - } else { - val workerAddress = worker.endpoint.address - logWarning("Worker registration failed. Attempted to re-register worker at same " + - "address: " + workerAddress) - workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " - + workerAddress)) - } - } - } - case RegisterApplication(description, driver) => { // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { @@ -387,6 +362,31 @@ private[deploy] class Master( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RegisterWorker( + id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { + logInfo("Registering worker %s:%d with %d cores, %s RAM".format( + workerHost, workerPort, cores, Utils.megabytesToString(memory))) + if (state == RecoveryState.STANDBY) { + context.reply(MasterInStandby) + } else if (idToWorker.contains(id)) { + context.reply(RegisterWorkerFailed("Duplicate worker ID")) + } else { + val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, + workerRef, workerUiPort, publicAddress) + if (registerWorker(worker)) { + persistenceEngine.addWorker(worker) + context.reply(RegisteredWorker(self, masterWebUiUrl)) + schedule() + } else { + val workerAddress = worker.endpoint.address + logWarning("Worker registration failed. Attempted to re-register worker at same " + + "address: " + workerAddress) + context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + + workerAddress)) + } + } + } + case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 93a1b3f310..a45867e768 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} import scala.concurrent.ExecutionContext -import scala.util.Random +import scala.util.{Failure, Random, Success} import scala.util.control.NonFatal import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -213,8 +213,7 @@ private[deploy] class Worker( logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) - masterEndpoint.send(RegisterWorker( - workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + registerWithMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) @@ -271,8 +270,7 @@ private[deploy] class Worker( logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) - masterEndpoint.send(RegisterWorker( - workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + registerWithMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) @@ -341,25 +339,54 @@ private[deploy] class Worker( } } - override def receive: PartialFunction[Any, Unit] = { - case RegisteredWorker(masterRef, masterWebUiUrl) => - logInfo("Successfully registered with master " + masterRef.address.toSparkURL) - registered = true - changeMaster(masterRef, masterWebUiUrl) - forwordMessageScheduler.scheduleAtFixedRate(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { - self.send(SendHeartbeat) - } - }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) - if (CLEANUP_ENABLED) { - logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { + masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( + workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + .onComplete { + // This is a very fast action so we can use "ThreadUtils.sameThread" + case Success(msg) => + Utils.tryLogNonFatalError { + handleRegisterResponse(msg) + } + case Failure(e) => + logError(s"Cannot register with master: ${masterEndpoint.address}", e) + System.exit(1) + }(ThreadUtils.sameThread) + } + + private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { + msg match { + case RegisteredWorker(masterRef, masterWebUiUrl) => + logInfo("Successfully registered with master " + masterRef.address.toSparkURL) + registered = true + changeMaster(masterRef, masterWebUiUrl) forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(WorkDirCleanup) + self.send(SendHeartbeat) } - }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) - } + }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) + if (CLEANUP_ENABLED) { + logInfo( + s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + forwordMessageScheduler.scheduleAtFixedRate(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + self.send(WorkDirCleanup) + } + }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) + } + case RegisterWorkerFailed(message) => + if (!registered) { + logError("Worker registration failed: " + message) + System.exit(1) + } + + case MasterInStandby => + // Ignore. Master not yet ready. + } + } + + override def receive: PartialFunction[Any, Unit] = synchronized { case SendHeartbeat => if (connected) { sendToMaster(Heartbeat(workerId, self)) } @@ -399,12 +426,6 @@ private[deploy] class Worker( map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)) - case RegisterWorkerFailed(message) => - if (!registered) { - logError("Worker registration failed: " + message) - System.exit(1) - } - case ReconnectWorker(masterUrl) => logInfo(s"Master with url $masterUrl requested this worker to reconnect.") registerWithMaster() diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 49059de50b..a9c6a05ecd 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -59,12 +59,12 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[RegisteredExecutor.type]( + ref.ask[RegisterExecutorResponse]( RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { - Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor + Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse } case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index e0d25dc50c..4652df32ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -36,9 +36,13 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage + sealed trait RegisterExecutorResponse + case object RegisteredExecutor extends CoarseGrainedClusterMessage + with RegisterExecutorResponse case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage + with RegisterExecutorResponse // Executors to driver case class RegisterExecutor( diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 18f2229fea..3cd80c0f7d 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -173,9 +173,9 @@ class HeartbeatReceiverSuite val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv) val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) - fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type]( + fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty)) - fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type]( + fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty)) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) -- cgit v1.2.3