aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala19
1 files changed, 13 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d49401f2fe..380055728f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -176,10 +176,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerWebUiPort, publicAddress)
- registerWorker(worker)
- persistenceEngine.addWorker(worker)
- sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
- schedule()
+ if (registerWorker(worker)) {
+ persistenceEngine.addWorker(worker)
+ sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
+ schedule()
+ } else {
+ val workerAddress = worker.actor.path.address
+ logWarning("Worker registration failed. Attempted to re-register worker at same address: " +
+ workerAddress)
+ sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)
+ }
}
}
@@ -511,7 +517,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
- def registerWorker(worker: WorkerInfo): Unit = {
+ def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
@@ -523,13 +529,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
- return
+ return false
}
workers += worker
idToWorker(worker.id) = worker
actorToWorker(worker.actor) = worker
addressToWorker(workerAddress) = worker
+ true
}
def removeWorker(worker: WorkerInfo) {