aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-20 23:33:37 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-11-07 16:49:53 -0800
commitbb1bce79240da22c2677d9f8159683cdf73158c2 (patch)
tree24973a07126c7e2e3c6920942b932e0e988d1f43
parente2b8477487fd6edfabfbaaea8dca97bffb6d0d40 (diff)
downloadspark-bb1bce79240da22c2677d9f8159683cdf73158c2.tar.gz
spark-bb1bce79240da22c2677d9f8159683cdf73158c2.tar.bz2
spark-bb1bce79240da22c2677d9f8159683cdf73158c2.zip
Various fixes to standalone mode and web UI:
- Don't report a job as finishing multiple times - Don't show state of workers as LOADING when they're running - Show start and finish times in web UI - Sort web UI tables by ID and time by default
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala6
-rw-r--r--core/src/main/scala/spark/deploy/WebUI.scala30
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala23
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala33
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/twirl/spark/deploy/master/index.scala.html15
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_row.scala.html11
-rw-r--r--core/src/main/twirl/spark/deploy/master/job_table.scala.html9
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_row.scala.html6
-rw-r--r--core/src/main/twirl/spark/deploy/master/worker_table.scala.html4
-rw-r--r--core/src/main/twirl/spark/deploy/worker/executor_row.scala.html8
-rw-r--r--core/src/main/twirl/spark/deploy/worker/index.scala.html6
13 files changed, 110 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index d2b63d6e0d..7a1089c816 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -67,8 +67,8 @@ private[spark] case object RequestMasterState
// Master to MasterWebUI
private[spark]
-case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
- completedJobs: List[JobInfo])
+case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo],
+ completedJobs: Array[JobInfo])
// WorkerWebUI to Worker
private[spark] case object RequestWorkerState
@@ -78,4 +78,4 @@ private[spark] case object RequestWorkerState
private[spark]
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
- coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file
+ coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala
new file mode 100644
index 0000000000..ad1a1092b2
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/WebUI.scala
@@ -0,0 +1,30 @@
+package spark.deploy
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+/**
+ * Utilities used throughout the web UI.
+ */
+private[spark] object WebUI {
+ val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+
+ def formatDate(date: Date): String = DATE_FORMAT.format(date)
+
+ def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
+
+ def formatDuration(milliseconds: Long): String = {
+ val seconds = milliseconds.toDouble / 1000
+ if (seconds < 60) {
+ return "%.0f s".format(seconds)
+ }
+ val minutes = seconds / 60
+ if (minutes < 10) {
+ return "%.1f min".format(minutes)
+ } else if (minutes < 60) {
+ return "%.0f min".format(minutes)
+ }
+ val hours = minutes / 60
+ return "%.1f h".format(hours)
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 8795c09cc1..130b031a2a 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -5,11 +5,17 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
-private[spark]
-class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
+private[spark] class JobInfo(
+ val startTime: Long,
+ val id: String,
+ val desc: JobDescription,
+ val submitDate: Date,
+ val actor: ActorRef)
+{
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
+ var endTime = -1L
private var nextExecutorId = 0
@@ -41,4 +47,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
_retryCount += 1
_retryCount
}
+
+ def markFinished(endState: JobState.Value) {
+ state = endState
+ endTime = System.currentTimeMillis()
+ }
+
+ def duration: Long = {
+ if (endTime != -1) {
+ endTime - startTime
+ } else {
+ System.currentTimeMillis() - startTime
+ }
+ }
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 6010f7cff2..5ef7411f4d 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -123,7 +123,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
case RequestMasterState => {
- sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
+ sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray)
}
}
@@ -179,8 +179,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
- val date = new Date
- val job = new JobInfo(newJobId(date), desc, date, actor)
+ val now = System.currentTimeMillis()
+ val date = new Date(now)
+ val job = new JobInfo(now, newJobId(date), desc, date, actor)
jobs += job
idToJob(job.id) = job
actorToJob(sender) = job
@@ -189,19 +190,21 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
def removeJob(job: JobInfo) {
- logInfo("Removing job " + job.id)
- jobs -= job
- idToJob -= job.id
- actorToJob -= job.actor
- addressToWorker -= job.actor.path.address
- completedJobs += job // Remember it in our history
- waitingJobs -= job
- for (exec <- job.executors.values) {
- exec.worker.removeExecutor(exec)
- exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
+ if (jobs.contains(job)) {
+ logInfo("Removing job " + job.id)
+ jobs -= job
+ idToJob -= job.id
+ actorToJob -= job.actor
+ addressToWorker -= job.actor.path.address
+ completedJobs += job // Remember it in our history
+ waitingJobs -= job
+ for (exec <- job.executors.values) {
+ exec.worker.removeExecutor(exec)
+ exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
+ }
+ job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed
+ schedule()
}
- job.state = JobState.FINISHED
- schedule()
}
/** Generate a new job ID given a job's submission date */
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 700a41c770..3cdd3721f5 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -36,7 +36,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
// A bit ugly an inefficient, but we won't have a number of jobs
// so large that it will make a significant difference.
- (masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match {
+ (masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match {
case Some(job) => spark.deploy.master.html.job_details.render(job)
case _ => null
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 474c9364fd..67d41dda29 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -123,7 +123,7 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
+ master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)
case ExecutorStateChanged(jobId, execId, state, message) =>
master ! ExecutorStateChanged(jobId, execId, state, message)
diff --git a/core/src/main/twirl/spark/deploy/master/index.scala.html b/core/src/main/twirl/spark/deploy/master/index.scala.html
index 7562076b00..18c32e5a1f 100644
--- a/core/src/main/twirl/spark/deploy/master/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/index.scala.html
@@ -1,5 +1,6 @@
@(state: spark.deploy.MasterState)
@import spark.deploy.master._
+@import spark.Utils
@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) {
@@ -8,9 +9,11 @@
<div class="span12">
<ul class="unstyled">
<li><strong>URL:</strong> spark://@(state.uri)</li>
- <li><strong>Number of Workers:</strong> @state.workers.size </li>
- <li><strong>Cores:</strong> @state.workers.map(_.cores).sum Total, @state.workers.map(_.coresUsed).sum Used</li>
- <li><strong>Memory:</strong> @state.workers.map(_.memory).sum Total, @state.workers.map(_.memoryUsed).sum Used</li>
+ <li><strong>Workers:</strong> @state.workers.size </li>
+ <li><strong>Cores:</strong> @{state.workers.map(_.cores).sum} Total,
+ @{state.workers.map(_.coresUsed).sum} Used</li>
+ <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
+ @{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Jobs:</strong> @state.activeJobs.size Running, @state.completedJobs.size Completed </li>
</ul>
</div>
@@ -21,7 +24,7 @@
<div class="span12">
<h3> Cluster Summary </h3>
<br/>
- @worker_table(state.workers)
+ @worker_table(state.workers.sortBy(_.id))
</div>
</div>
@@ -32,7 +35,7 @@
<div class="span12">
<h3> Running Jobs </h3>
<br/>
- @job_table(state.activeJobs)
+ @job_table(state.activeJobs.sortBy(_.startTime).reverse)
</div>
</div>
@@ -43,7 +46,7 @@
<div class="span12">
<h3> Completed Jobs </h3>
<br/>
- @job_table(state.completedJobs)
+ @job_table(state.completedJobs.sortBy(_.endTime).reverse)
</div>
</div>
diff --git a/core/src/main/twirl/spark/deploy/master/job_row.scala.html b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
index 7c4865bb6e..fff7953e7d 100644
--- a/core/src/main/twirl/spark/deploy/master/job_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_row.scala.html
@@ -1,5 +1,9 @@
@(job: spark.deploy.master.JobInfo)
+@import spark.Utils
+@import spark.deploy.WebUI.formatDate
+@import spark.deploy.WebUI.formatDuration
+
<tr>
<td>
<a href="job?jobId=@(job.id)">@job.id</a>
@@ -13,8 +17,9 @@
, @job.coresLeft
}
</td>
- <td>@job.desc.memoryPerSlave</td>
- <td>@job.submitDate</td>
+ <td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
+ <td>@formatDate(job.submitDate)</td>
<td>@job.desc.user</td>
<td>@job.state.toString()</td>
-</tr> \ No newline at end of file
+ <td>@formatDuration(job.duration)</td>
+</tr>
diff --git a/core/src/main/twirl/spark/deploy/master/job_table.scala.html b/core/src/main/twirl/spark/deploy/master/job_table.scala.html
index 52bad6c4b8..d267d6e85e 100644
--- a/core/src/main/twirl/spark/deploy/master/job_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/job_table.scala.html
@@ -1,4 +1,4 @@
-@(jobs: List[spark.deploy.master.JobInfo])
+@(jobs: Array[spark.deploy.master.JobInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
@@ -6,10 +6,11 @@
<th>JobID</th>
<th>Description</th>
<th>Cores</th>
- <th>Memory per Slave</th>
- <th>Submit Date</th>
+ <th>Memory per Node</th>
+ <th>Submit Time</th>
<th>User</th>
<th>State</th>
+ <th>Duration</th>
</tr>
</thead>
<tbody>
@@ -17,4 +18,4 @@
@job_row(j)
}
</tbody>
-</table> \ No newline at end of file
+</table>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
index 017cc4859e..3dcba3a545 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_row.scala.html
@@ -1,11 +1,13 @@
@(worker: spark.deploy.master.WorkerInfo)
+@import spark.Utils
+
<tr>
<td>
<a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
</td>
<td>@{worker.host}:@{worker.port}</td>
<td>@worker.cores (@worker.coresUsed Used)</td>
- <td>@{spark.Utils.memoryMegabytesToString(worker.memory)}
- (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
+ <td>@{Utils.memoryMegabytesToString(worker.memory)}
+ (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
</tr>
diff --git a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
index 2028842297..fad1af41dc 100644
--- a/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
+++ b/core/src/main/twirl/spark/deploy/master/worker_table.scala.html
@@ -1,4 +1,4 @@
-@(workers: List[spark.deploy.master.WorkerInfo])
+@(workers: Array[spark.deploy.master.WorkerInfo])
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
@@ -14,4 +14,4 @@
@worker_row(w)
}
</tbody>
-</table> \ No newline at end of file
+</table>
diff --git a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
index c3842dbf85..ea9542461e 100644
--- a/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/executor_row.scala.html
@@ -1,20 +1,20 @@
@(executor: spark.deploy.worker.ExecutorRunner)
+@import spark.Utils
+
<tr>
<td>@executor.execId</td>
<td>@executor.cores</td>
- <td>@executor.memory</td>
+ <td>@Utils.memoryMegabytesToString(executor.memory)</td>
<td>
<ul class="unstyled">
<li><strong>ID:</strong> @executor.jobId</li>
<li><strong>Name:</strong> @executor.jobDesc.name</li>
<li><strong>User:</strong> @executor.jobDesc.user</li>
- <li><strong>Cores:</strong> @executor.jobDesc.cores </li>
- <li><strong>Memory per Slave:</strong> @executor.jobDesc.memoryPerSlave</li>
</ul>
</td>
<td>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stdout">stdout</a>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stderr">stderr</a>
</td>
-</tr> \ No newline at end of file
+</tr>
diff --git a/core/src/main/twirl/spark/deploy/worker/index.scala.html b/core/src/main/twirl/spark/deploy/worker/index.scala.html
index 69746ed02c..b247307dab 100644
--- a/core/src/main/twirl/spark/deploy/worker/index.scala.html
+++ b/core/src/main/twirl/spark/deploy/worker/index.scala.html
@@ -1,5 +1,7 @@
@(worker: spark.deploy.WorkerState)
+@import spark.Utils
+
@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) {
<!-- Worker Details -->
@@ -12,8 +14,8 @@
(WebUI at <a href="@worker.masterWebUiUrl">@worker.masterWebUiUrl</a>)
</li>
<li><strong>Cores:</strong> @worker.cores (@worker.coresUsed Used)</li>
- <li><strong>Memory:</strong> @{spark.Utils.memoryMegabytesToString(worker.memory)}
- (@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
+ <li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(worker.memory)}
+ (@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
</ul>
</div>
</div>