aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-10-19 16:14:50 -0700
committerAndrew Or <andrew@databricks.com>2015-10-19 16:14:50 -0700
commit7ab0ce6501c37f0fc3a49e3332573ae4e4def3e8 (patch)
tree3948d7a1c4c5f3f650708005e493d7961d8306c0
parent67582132bffbaaeaadc5cf8218f6239d03c39da0 (diff)
downloadspark-7ab0ce6501c37f0fc3a49e3332573ae4e4def3e8.tar.gz
spark-7ab0ce6501c37f0fc3a49e3332573ae4e4def3e8.tar.bz2
spark-7ab0ce6501c37f0fc3a49e3332573ae4e4def3e8.zip
[SPARK-11131][CORE] Fix race in worker registration protocol.
Because the registration RPC was not really an RPC, but a bunch of disconnected messages, it was possible for other messages to be sent before the reply to the registration arrived, and that would confuse the Worker. Especially in local-cluster mode, the worker was succeptible to receiving an executor request before it received a message from the master saying registration succeeded. On top of the above, the change also fixes a ClassCastException when the registration fails, which also affects the executor registration protocol. Because the `ask` is issued with a specific return type, if the error message (of a different type) was returned instead, the code would just die with an exception. This is fixed by having a common base trait for these reply messages. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9138 from vanzin/SPARK-11131.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala50
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
6 files changed, 86 insertions, 56 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index d8084a5765..3feb7cea59 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -69,9 +69,14 @@ private[deploy] object DeployMessages {
// Master to Worker
+ sealed trait RegisterWorkerResponse
+
case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
+ with RegisterWorkerResponse
+
+ case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse
- case class RegisterWorkerFailed(message: String) extends DeployMessage
+ case object MasterInStandby extends DeployMessage with RegisterWorkerResponse
case class ReconnectWorker(masterUrl: String) extends DeployMessage
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d518e92133..6715d6c70f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -233,31 +233,6 @@ private[deploy] class Master(
System.exit(0)
}
- case RegisterWorker(
- id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
- logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
- workerHost, workerPort, cores, Utils.megabytesToString(memory)))
- if (state == RecoveryState.STANDBY) {
- // ignore, don't send response
- } else if (idToWorker.contains(id)) {
- workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
- } else {
- val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
- workerRef, workerUiPort, publicAddress)
- if (registerWorker(worker)) {
- persistenceEngine.addWorker(worker)
- workerRef.send(RegisteredWorker(self, masterWebUiUrl))
- schedule()
- } else {
- val workerAddress = worker.endpoint.address
- logWarning("Worker registration failed. Attempted to re-register worker at same " +
- "address: " + workerAddress)
- workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
- + workerAddress))
- }
- }
- }
-
case RegisterApplication(description, driver) => {
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
@@ -387,6 +362,31 @@ private[deploy] class Master(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case RegisterWorker(
+ id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
+ logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
+ workerHost, workerPort, cores, Utils.megabytesToString(memory)))
+ if (state == RecoveryState.STANDBY) {
+ context.reply(MasterInStandby)
+ } else if (idToWorker.contains(id)) {
+ context.reply(RegisterWorkerFailed("Duplicate worker ID"))
+ } else {
+ val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+ workerRef, workerUiPort, publicAddress)
+ if (registerWorker(worker)) {
+ persistenceEngine.addWorker(worker)
+ context.reply(RegisteredWorker(self, masterWebUiUrl))
+ schedule()
+ } else {
+ val workerAddress = worker.endpoint.address
+ logWarning("Worker registration failed. Attempted to re-register worker at same " +
+ "address: " + workerAddress)
+ context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ + workerAddress))
+ }
+ }
+ }
+
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 93a1b3f310..a45867e768 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
-import scala.util.Random
+import scala.util.{Failure, Random, Success}
import scala.util.control.NonFatal
import org.apache.spark.{Logging, SecurityManager, SparkConf}
@@ -213,8 +213,7 @@ private[deploy] class Worker(
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
- masterEndpoint.send(RegisterWorker(
- workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+ registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -271,8 +270,7 @@ private[deploy] class Worker(
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
- masterEndpoint.send(RegisterWorker(
- workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+ registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -341,25 +339,54 @@ private[deploy] class Worker(
}
}
- override def receive: PartialFunction[Any, Unit] = {
- case RegisteredWorker(masterRef, masterWebUiUrl) =>
- logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
- registered = true
- changeMaster(masterRef, masterWebUiUrl)
- forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(SendHeartbeat)
- }
- }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
- if (CLEANUP_ENABLED) {
- logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
+ private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
+ masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
+ workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
+ .onComplete {
+ // This is a very fast action so we can use "ThreadUtils.sameThread"
+ case Success(msg) =>
+ Utils.tryLogNonFatalError {
+ handleRegisterResponse(msg)
+ }
+ case Failure(e) =>
+ logError(s"Cannot register with master: ${masterEndpoint.address}", e)
+ System.exit(1)
+ }(ThreadUtils.sameThread)
+ }
+
+ private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
+ msg match {
+ case RegisteredWorker(masterRef, masterWebUiUrl) =>
+ logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
+ registered = true
+ changeMaster(masterRef, masterWebUiUrl)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(WorkDirCleanup)
+ self.send(SendHeartbeat)
}
- }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
- }
+ }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
+ if (CLEANUP_ENABLED) {
+ logInfo(
+ s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
+ forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ self.send(WorkDirCleanup)
+ }
+ }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
+ }
+ case RegisterWorkerFailed(message) =>
+ if (!registered) {
+ logError("Worker registration failed: " + message)
+ System.exit(1)
+ }
+
+ case MasterInStandby =>
+ // Ignore. Master not yet ready.
+ }
+ }
+
+ override def receive: PartialFunction[Any, Unit] = synchronized {
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
@@ -399,12 +426,6 @@ private[deploy] class Worker(
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
- case RegisterWorkerFailed(message) =>
- if (!registered) {
- logError("Worker registration failed: " + message)
- System.exit(1)
- }
-
case ReconnectWorker(masterUrl) =>
logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
registerWithMaster()
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 49059de50b..a9c6a05ecd 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -59,12 +59,12 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
- ref.ask[RegisteredExecutor.type](
+ ref.ask[RegisterExecutorResponse](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
+ Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index e0d25dc50c..4652df32ef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -36,9 +36,13 @@ private[spark] object CoarseGrainedClusterMessages {
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
extends CoarseGrainedClusterMessage
+ sealed trait RegisterExecutorResponse
+
case object RegisteredExecutor extends CoarseGrainedClusterMessage
+ with RegisterExecutorResponse
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
+ with RegisterExecutorResponse
// Executors to driver
case class RegisterExecutor(
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18f2229fea..3cd80c0f7d 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -173,9 +173,9 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv)
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
- fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+ fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
- fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type](
+ fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)