aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2012-12-17 23:05:52 -0800
committerPatrick Wendell <pwendell@gmail.com>2012-12-17 23:09:05 -0800
commitbfac06e1f620efcd17beb16750dc57db6b424fb7 (patch)
tree9fc9cc56fd42070eee3e50e336d06910094b3a84
parentb82a6dd2c72d6555aeaa2b523ddf564434f5e10c (diff)
downloadspark-bfac06e1f620efcd17beb16750dc57db6b424fb7.tar.gz
spark-bfac06e1f620efcd17beb16750dc57db6b424fb7.tar.bz2
spark-bfac06e1f620efcd17beb16750dc57db6b424fb7.zip
SPARK-616: Logging dead workers in Web UI.
This patch keeps track of which workers have died and marks them as such in the master web UI. It also handles workers which die and re-register using different actor ID's.
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala7
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala6
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerState.scala7
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_row.scala.html1
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_table.scala.html1
5 files changed, 19 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index b30c8e99b5..6ecebe626a 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -156,7 +156,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
if (spreadOutJobs) {
// Try to spread out each job among all the nodes, until it has all its cores
for (job <- waitingJobs if job.coresLeft > 0) {
- val usableWorkers = workers.toArray.filter(canUse(job, _)).sortBy(_.coresFree).reverse
+ val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+ .filter(canUse(job, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(job.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -203,6 +204,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = {
+ // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
+ workers.filter(w => (w.host == host) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
@@ -213,7 +216,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
- workers -= worker
+ worker.setState(WorkerState.DEAD)
idToWorker -= worker.id
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index a0a698ef04..5a7f5fef8a 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -14,7 +14,7 @@ private[spark] class WorkerInfo(
val publicAddress: String) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
-
+ var state: WorkerState.Value = WorkerState.ALIVE
var coresUsed = 0
var memoryUsed = 0
@@ -42,4 +42,8 @@ private[spark] class WorkerInfo(
def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}
+
+ def setState(state: WorkerState.Value) = {
+ this.state = state
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerState.scala b/core/src/main/scala/spark/deploy/master/WorkerState.scala
new file mode 100644
index 0000000000..0bf35014c8
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/WorkerState.scala
@@ -0,0 +1,7 @@
+package spark.deploy.master
+
+private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+ type WorkerState = Value
+
+ val ALIVE, DEAD, DECOMMISSIONED = Value
+}
diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
index c32ab30401..be69e9bf02 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
@@ -7,6 +7,7 @@
<a href="@worker.webUiAddress">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
+ <td>@worker.state</td>
<td>@worker.cores (@worker.coresUsed Used)</td>
<td>@{Utils.memoryMegabytesToString(worker.memory)}
(@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
index fad1af41dc..b249411a62 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
@@ -5,6 +5,7 @@
<tr>
<th>ID</th>
<th>Address</th>
+ <th>State</th>
<th>Cores</th>
<th>Memory</th>
</tr>