diff options
author | Patrick Wendell <pwendell@gmail.com> | 2012-12-17 23:05:52 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2012-12-17 23:09:05 -0800 |
commit | bfac06e1f620efcd17beb16750dc57db6b424fb7 (patch) | |
tree | 9fc9cc56fd42070eee3e50e336d06910094b3a84 | |
parent | b82a6dd2c72d6555aeaa2b523ddf564434f5e10c (diff) | |
download | spark-bfac06e1f620efcd17beb16750dc57db6b424fb7.tar.gz spark-bfac06e1f620efcd17beb16750dc57db6b424fb7.tar.bz2 spark-bfac06e1f620efcd17beb16750dc57db6b424fb7.zip |
SPARK-616: Logging dead workers in Web UI.
This patch keeps track of which workers have died and marks them
as such in the master web UI. It also handles workers which die and
re-register using different actor ID's.
5 files changed, 19 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index b30c8e99b5..6ecebe626a 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -156,7 +156,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor if (spreadOutJobs) { // Try to spread out each job among all the nodes, until it has all its cores for (job <- waitingJobs if job.coresLeft > 0) { - val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse + val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) + .filter(canUse(job, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -203,6 +204,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int, publicAddress: String): WorkerInfo = { + // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them. + workers.filter(w => (w.host == host) && (w.state == WorkerState.DEAD)).foreach(workers -= _) val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) workers += worker idToWorker(worker.id) = worker @@ -213,7 +216,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor def removeWorker(worker: WorkerInfo) { logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) - workers -= worker + worker.setState(WorkerState.DEAD) idToWorker -= worker.id actorToWorker -= worker.actor addressToWorker -= worker.actor.path.address diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala index a0a698ef04..5a7f5fef8a 100644 --- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala @@ -14,7 +14,7 @@ private[spark] class WorkerInfo( val publicAddress: String) { var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info - + var state: WorkerState.Value = WorkerState.ALIVE var coresUsed = 0 var memoryUsed = 0 @@ -42,4 +42,8 @@ private[spark] class WorkerInfo( def webUiAddress : String = { "http://" + this.publicAddress + ":" + this.webUiPort } + + def setState(state: WorkerState.Value) = { + this.state = state + } } diff --git a/core/src/main/scala/spark/deploy/master/WorkerState.scala b/core/src/main/scala/spark/deploy/master/WorkerState.scala new file mode 100644 index 0000000000..0bf35014c8 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/WorkerState.scala @@ -0,0 +1,7 @@ +package spark.deploy.master + +private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") { + type WorkerState = Value + + val ALIVE, DEAD, DECOMMISSIONED = Value +} diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html index c32ab30401..be69e9bf02 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html @@ -7,6 +7,7 @@ <a href="@worker.webUiAddress">@worker.id</href> </td> <td>@{worker.host}:@{worker.port}</td> + <td>@worker.state</td> <td>@worker.cores (@worker.coresUsed Used)</td> <td>@{Utils.memoryMegabytesToString(worker.memory)} (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td> diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html index fad1af41dc..b249411a62 100644 --- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html +++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html @@ -5,6 +5,7 @@ <tr> <th>ID</th> <th>Address</th> + <th>State</th> <th>Cores</th> <th>Memory</th> </tr> |