aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-25 23:48:14 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-25 23:48:14 -0800
commit7026ee23e0a684e13f9d7dfbb8f85e810106d022 (patch)
tree97a43e03d3e9ac291b9be0ea2b50158fb7f93ae7 /core/src
parentd6cbec7598b7aea33f588849e6e2e324b8820340 (diff)
downloadspark-7026ee23e0a684e13f9d7dfbb8f85e810106d022.tar.gz
spark-7026ee23e0a684e13f9d7dfbb8f85e810106d022.tar.bz2
spark-7026ee23e0a684e13f9d7dfbb8f85e810106d022.zip
[SPARK-17755][CORE] Use workerRef to send RegisterWorkerResponse to avoid the race condition
## What changes were proposed in this pull request? The root cause of this issue is that RegisterWorkerResponse and LaunchExecutor are sent via two different channels (TCP connections) and their order is not guaranteed. This PR changes the master and worker codes to use `workerRef` to send RegisterWorkerResponse, so that RegisterWorkerResponse and LaunchExecutor are sent via the same connection. Hence `LaunchExecutor` will always be after `RegisterWorkerResponse` and never be ignored. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16345 from zsxwing/SPARK-17755.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala47
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala2
3 files changed, 32 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 4618e6117a..c5f7c077fe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -231,6 +231,29 @@ private[deploy] class Master(
logError("Leadership has been revoked -- master shutting down.")
System.exit(0)
+ case RegisterWorker(id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
+ logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
+ workerHost, workerPort, cores, Utils.megabytesToString(memory)))
+ if (state == RecoveryState.STANDBY) {
+ workerRef.send(MasterInStandby)
+ } else if (idToWorker.contains(id)) {
+ workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
+ } else {
+ val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
+ workerRef, workerWebUiUrl)
+ if (registerWorker(worker)) {
+ persistenceEngine.addWorker(worker)
+ workerRef.send(RegisteredWorker(self, masterWebUiUrl))
+ schedule()
+ } else {
+ val workerAddress = worker.endpoint.address
+ logWarning("Worker registration failed. Attempted to re-register worker at same " +
+ "address: " + workerAddress)
+ workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ + workerAddress))
+ }
+ }
+
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
@@ -386,30 +409,6 @@ private[deploy] class Master(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RegisterWorker(
- id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
- logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
- workerHost, workerPort, cores, Utils.megabytesToString(memory)))
- if (state == RecoveryState.STANDBY) {
- context.reply(MasterInStandby)
- } else if (idToWorker.contains(id)) {
- context.reply(RegisterWorkerFailed("Duplicate worker ID"))
- } else {
- val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
- workerRef, workerWebUiUrl)
- if (registerWorker(worker)) {
- persistenceEngine.addWorker(worker)
- context.reply(RegisteredWorker(self, masterWebUiUrl))
- schedule()
- } else {
- val workerAddress = worker.endpoint.address
- logWarning("Worker registration failed. Attempted to re-register worker at same " +
- "address: " + workerAddress)
- context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
- + workerAddress))
- }
- }
-
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0940f3c558..f963a46060 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap}
import scala.concurrent.ExecutionContext
-import scala.util.{Failure, Random, Success}
+import scala.util.Random
import scala.util.control.NonFatal
import org.apache.spark.{SecurityManager, SparkConf}
@@ -216,7 +216,7 @@ private[deploy] class Worker(
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
- registerWithMaster(masterEndpoint)
+ sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -272,7 +272,7 @@ private[deploy] class Worker(
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
- registerWithMaster(masterEndpoint)
+ sendRegisterMessageToMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
@@ -341,19 +341,8 @@ private[deploy] class Worker(
}
}
- private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
- masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
- workerId, host, port, self, cores, memory, workerWebUiUrl))
- .onComplete {
- // This is a very fast action so we can use "ThreadUtils.sameThread"
- case Success(msg) =>
- Utils.tryLogNonFatalError {
- handleRegisterResponse(msg)
- }
- case Failure(e) =>
- logError(s"Cannot register with master: ${masterEndpoint.address}", e)
- System.exit(1)
- }(ThreadUtils.sameThread)
+ private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {
+ masterEndpoint.send(RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl))
}
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
@@ -394,6 +383,9 @@ private[deploy] class Worker(
}
override def receive: PartialFunction[Any, Unit] = synchronized {
+ case msg: RegisterWorkerResponse =>
+ handleRegisterResponse(msg)
+
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 831a7bcb12..da7253b2a5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -447,7 +447,7 @@ class MasterSuite extends SparkFunSuite
}
})
- master.self.ask(
+ master.self.send(
RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, "http://localhost:8080"))
val executors = (0 until 3).map { i =>
new ExecutorDescription(appId = i.toString, execId = i, 2, ExecutorState.RUNNING)