aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-20 09:02:56 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-22 10:31:36 -0700
commit9fd5dc3ea9016b59667fd903992843605087e43b (patch)
tree1c5736f3aa15fc7f47dd1b10735e2d836014ca2e
parentbc4a811c57ee126559721a3a4284510baef184d1 (diff)
downloadspark-9fd5dc3ea9016b59667fd903992843605087e43b.tar.gz
spark-9fd5dc3ea9016b59667fd903992843605087e43b.tar.bz2
spark-9fd5dc3ea9016b59667fd903992843605087e43b.zip
Initial steps towards job progress UI
-rw-r--r--core/src/main/scala/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala1
-rw-r--r--core/src/main/scala/spark/ui/JobProgressUI.scala178
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala2
-rw-r--r--core/src/main/scala/spark/ui/WebUI.scala3
6 files changed, 181 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index cc634b3ec4..155a5ee721 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -94,10 +94,6 @@ class SparkContext(
isLocal)
SparkEnv.set(env)
- // Start the Spark UI
- private[spark] val ui = new SparkUI(this)
- ui.start()
-
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
@@ -215,6 +211,10 @@ class SparkContext(
@volatile private var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
+ // Start the Spark UI
+ private[spark] val ui = new SparkUI(this)
+ ui.start()
+
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val conf = SparkHadoopUtil.newConfiguration()
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index f7d60be5db..bdd8792ce9 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -503,6 +503,7 @@ class DAGScheduler(
case _ => "Unkown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
+ stage.completionTime = Some(System.currentTimeMillis)
val stageComp = StageCompleted(stageToInfos(stage))
sparkListeners.foreach{_.onStageCompleted(stageComp)}
running -= stage
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 7fc9e13fd9..539cf8233b 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -34,6 +34,7 @@ private[spark] class Stage(
/** When first task was submitted to scheduler. */
var submissionTime: Option[Long] = None
+ var completionTime: Option[Long] = None
private var nextAttemptId = 0
diff --git a/core/src/main/scala/spark/ui/JobProgressUI.scala b/core/src/main/scala/spark/ui/JobProgressUI.scala
index eb782d85c8..1f12df10b8 100644
--- a/core/src/main/scala/spark/ui/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/JobProgressUI.scala
@@ -1,14 +1,182 @@
package spark.ui
-import spark.SparkContext
-import spark.scheduler.SparkListener
+import spark.{Utils, SparkContext}
+import spark.scheduler._
+import spark.scheduler.SparkListenerTaskEnd
+import spark.scheduler.StageCompleted
+import spark.scheduler.SparkListenerStageSubmitted
+import org.eclipse.jetty.server.Handler
+import javax.servlet.http.HttpServletRequest
+import xml.Node
+import WebUI._
+import collection.mutable._
+import spark.Success
+import akka.util.Duration
+import java.text.SimpleDateFormat
+import java.util.Date
+import spark.scheduler.cluster.TaskInfo
+import collection.mutable
+import org.hsqldb.lib.HashMappedList
+import spark.executor.TaskMetrics
+import spark.scheduler.SparkListenerTaskEnd
+import scala.Some
+import spark.scheduler.SparkListenerStageSubmitted
+import scala.Seq
+import spark.scheduler.StageCompleted
+import spark.scheduler.SparkListenerJobStart
private[spark]
-class JobProgressUI(sc: SparkContext) {
- sc.addSparkListener()
+class JobProgressUI(sc: SparkContext) extends UIComponent {
+ val listener = new JobProgressListener
+ val fmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss")
+
+ sc.addSparkListener(listener)
+
+ def getHandlers = Seq[(String, Handler)](
+ ("/stages/stage", (request: HttpServletRequest) => stagePage(request)),
+ ("/stages", (request: HttpServletRequest) => indexPage)
+ )
+
+ def stagePage(request: HttpServletRequest): Seq[Node] = {
+ val stageId = request.getParameter("id").toInt
+ val content =
+ <h2>Percentile Metrics</h2>
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>Service Time</th>
+ <th>Remote Bytes Read</th>
+ <th>Shuffle Bytes Written</th>
+ </tr>
+ </thead>
+ <tbody>
+ {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }}
+ </tbody>
+ </table>
+ <h2>Tasks</h2>
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>Task ID</th>
+ <th>Service Time (ms)</th>
+ <th>Locality level</th>
+ <th>Worker</th>
+ <th>Launch Time</th>
+ </tr>
+ </thead>
+ <tbody>
+ {listener.stageToTaskInfos(stageId).map{ case(i, m) => taskRow(i, m) }}
+ </tbody>
+ </table>;
+ WebUI.headerSparkPage(content, "Stage Details: %s".format(stageId))
+ }
+
+
+
+ def taskRow(info: TaskInfo, metrics: TaskMetrics): Seq[Node] = {
+ <tr>
+ <td>{info.taskId}</td>
+ <td>{metrics.executorRunTime}</td>
+ <td>{info.taskLocality}</td>
+ <td>{info.hostPort}</td>
+ <td>{fmt.format(new Date(info.launchTime))}</td>
+ </tr>
+ }
+
+ def indexPage: Seq[Node] = {
+ val content =
+ <h2>Active Stages</h2>
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>Stage ID</th>
+ <th>Origin</th>
+ <th>Submitted</th>
+ <th>Duration</th>
+ <th>Tasks: Complete/Total</th>
+ </tr>
+ </thead>
+ <tbody>
+ {listener.activeStages.map(stageRow)}
+ </tbody>
+ </table>
+ <h2>Completed Stages</h2>
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <tr>
+ <th>Stage ID</th>
+ <th>Origin</th>
+ <th>Submitted</th>
+ <th>Duration</th>
+ <th>Tasks: Complete/Total</th>
+ </tr>
+ </thead>
+ <tbody>
+ {listener.completedStages.map(stageRow)}
+ </tbody>
+ </table>;
+
+ WebUI.headerSparkPage(content, "Spark Stages")
+ }
+
+ def getElapsedTime(submitted: Option[Long], completed: Long): String = {
+ submitted match {
+ case Some(t) => Duration(completed - t, "milliseconds").printHMS
+ case _ => "Unknown"
+ }
+ }
+
+ def stageRow(s: Stage): Seq[Node] = {
+ val submissionTime = s.submissionTime match {
+ case Some(t) => fmt.format(new Date(t))
+ case None => "Unknown"
+ }
+ <tr>
+ <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>
+ <td>{s.origin}</td>
+ <td>{submissionTime}</td>
+ <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+ <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions}
+ {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
+ case f if f > 0 => "(%s failed)".format(f)
+ case _ =>
+ }}
+ </td>
+ </tr>
+ }
}
-class JobProgressListener extends SparkListener {
+private[spark] class JobProgressListener extends SparkListener {
+ val activeStages = HashSet[Stage]()
+ val stageToTasksComplete = HashMap[Int, Int]()
+ val stageToTasksFailed = HashMap[Int, Int]()
+ val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]()
+ val completedStages = ListBuffer[Stage]() // Todo (pwendell): Evict these over time
+
+ override def onJobStart(jobStart: SparkListenerJobStart) { }
+
+ override def onStageCompleted(stageCompleted: StageCompleted) = {
+ val stage = stageCompleted.stageInfo.stage
+ activeStages -= stage
+ stage +=: completedStages
+ }
+
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
+ activeStages += stageSubmitted.stage
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val sid = taskEnd.event.task.stageId
+ taskEnd.event.reason match {
+ case Success =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ case _ =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ }
+ val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]())
+ taskList += ((taskEnd.event.taskInfo, taskEnd.event.taskMetrics))
+ stageToTaskInfos(sid) = taskList
+ }
+ override def onJobEnd(jobEnd: SparkListenerEvents) { }
}
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 6c9affc1f7..63b75df36f 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -14,7 +14,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
("*", (request: HttpServletRequest) => WebUI.headerSparkPage(<h1>Test</h1>, "Test page"))
)
- val components = Seq(new BlockManagerUI(sc))
+ val components = Seq(new BlockManagerUI(sc), new JobProgressUI(sc))
def start() {
/** Start an HTTP server to run the Web interface */
diff --git a/core/src/main/scala/spark/ui/WebUI.scala b/core/src/main/scala/spark/ui/WebUI.scala
index 16251e727a..e80d48dd4a 100644
--- a/core/src/main/scala/spark/ui/WebUI.scala
+++ b/core/src/main/scala/spark/ui/WebUI.scala
@@ -58,7 +58,6 @@ object WebUI extends Logging {
handler
} else {
val contextHandler = new ContextHandler(path)
- println("Adding handler for path: " + path)
contextHandler.setHandler(handler)
contextHandler.asInstanceOf[org.eclipse.jetty.server.Handler]
}
@@ -84,7 +83,7 @@ object WebUI extends Logging {
/** Page with Spark logo, title, and Spark UI headers */
def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = {
val newContent =
- <h2><a href="/storage">Storage</a> | <a href="/jobs">Jobs</a> </h2><hl/>;
+ <h2><a href="/storage">Storage</a> | <a href="/stages">Jobs</a> </h2><hl/>;
sparkPage(newContent ++ content, title)
}