diff options
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 20 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/JettyUtils.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/IndexPage.scala | 23 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/JobProgressUI.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/spark/ui/jobs/StagePage.scala | 14 | ||||
-rw-r--r-- | core/src/test/scala/spark/ui/UISuite.scala | 23 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/Duration.scala | 6 |
7 files changed, 80 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index f3621c6bee..6966ee9ee9 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -486,6 +486,26 @@ private object Utils extends Logging { } /** + * Returns a human-readable string representing a duration such as "35ms" + */ + def msDurationToString(ms: Long): String = { + val second = 1000 + val minute = 60 * second + val hour = 60 * minute + + ms match { + case t if t < second => + "%dms".format(t) + case t if t < minute => + "%d.%03ds".format(t / second, t % second) + case t if t < hour => + "%d:%02d".format(t / minute, (t % minute) / second) + case t => + "%d:%02d:%02d".format(t / hour, t % hour / minute, (t % hour) % minute / second) + } + } + + /** * Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB". */ def memoryMegabytesToString(megabytes: Long): String = { diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala index 1f311bb75f..fde3606740 100644 --- a/core/src/main/scala/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/spark/ui/JettyUtils.scala @@ -26,7 +26,7 @@ private[spark] object JettyUtils extends Logging { createHandler(responder, "text/json", (in: JValue) => pretty(render(in))) implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = - createHandler(responder, "text/html") + createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString) implicit def textResponderToHandler(responder: Responder[String]): Handler = createHandler(responder, "text/plain") diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 152e1a79b0..9b34cdd27f 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -19,8 +19,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { val dateFmt = parent.dateFmt def render(request: HttpServletRequest): Seq[Node] = { - val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total", - "Shuffle Activity", "Stored RDD") + val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Progress", + "Tasks: Complete/Total", "Shuffle Activity", "Stored RDD") val activeStages = listener.activeStages.toSeq val completedStages = listener.completedStages.reverse.toSeq val failedStages = listener.failedStages.reverse.toSeq @@ -43,6 +43,20 @@ private[spark] class IndexPage(parent: JobProgressUI) { } } + def makeSlider(completed: Int, total: Int): Seq[Node] = { + val width=130 + val height=15 + val completeWidth = (completed.toDouble / total) * width + + <svg width={width.toString} height={height.toString}> + <rect width={width.toString} height={height.toString} + fill="white" stroke="black" stroke-width="1" /> + <rect width={completeWidth.toString} height={height.toString} + fill="rgb(206,206,247)" stroke="black" stroke-width="1" /> + </svg> + } + + def stageRow(showLink: Boolean = true)(s: Stage): Seq[Node] = { val submissionTime = s.submissionTime match { case Some(t) => dateFmt.format(new Date(t)) @@ -55,6 +69,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { case (false, true) => "Write" case _ => "" } + val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) + val totalTasks = s.numPartitions <tr> {if (showLink) {<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>} @@ -63,7 +79,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { <td>{submissionTime}</td> <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td> - <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions} + <td>{makeSlider(completedTasks, totalTasks)}</td> + <td>{completedTasks} / {totalTasks} {listener.stageToTasksFailed.getOrElse(s.id, 0) match { case f if f > 0 => "(%s failed)".format(f) case _ => diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 46ba1d31af..a18bf0f81e 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -17,6 +17,7 @@ import spark.scheduler._ import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics import spark.Success +import spark.Utils /** Web UI showing progress status of all jobs in the given SparkContext. */ private[spark] class JobProgressUI(val sc: SparkContext) { @@ -32,7 +33,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) { sc.addSparkListener(listener) } - def formatDuration(ms: Long) = Duration(ms, "milliseconds").printHMS + def formatDuration(ms: Long) = Utils.msDurationToString(ms) def getHandlers = Seq[(String, Handler)]( ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)), diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 03200861e0..518f48cb81 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -19,20 +19,30 @@ private[spark] class StagePage(parent: JobProgressUI) { def render(request: HttpServletRequest): Seq[Node] = { val stageId = request.getParameter("id").toInt + + if (!listener.stageToTaskInfos.contains(stageId)) { + val content = + <div> + <h2>Summary Metrics</h2> No tasks have finished yet + <h2>Tasks</h2> No tasks have finished yet + </div> + return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId)) + } + val tasks = listener.stageToTaskInfos(stageId) val shuffleRead = listener.hasShuffleRead(stageId) val shuffleWrite = listener.hasShuffleWrite(stageId) val taskHeaders: Seq[String] = - Seq("Task ID", "Service Time", "Locality Level", "Worker", "Launch Time") ++ + Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++ {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ {if (shuffleWrite) Seq("Shuffle Write") else Nil} val taskTable = listingTable(taskHeaders, taskRow, tasks) val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble} - val serviceQuantiles = "Service Time" +: Distribution(serviceTimes).get.getQuantiles().map( + val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( ms => parent.formatDuration(ms.toLong)) def getQuantileCols(data: Seq[Double]) = diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala index 6519bf77c6..127ab5ebc2 100644 --- a/core/src/test/scala/spark/ui/UISuite.scala +++ b/core/src/test/scala/spark/ui/UISuite.scala @@ -2,10 +2,11 @@ package spark.ui import org.scalatest.FunSuite import org.eclipse.jetty.server.Server -import util.{Try, Success, Failure} import java.net.ServerSocket +import scala.util.{Failure, Success, Try} +import spark.Utils -private[spark] class UISuite extends FunSuite { +class UISuite extends FunSuite { test("jetty port increases under contention") { val startPort = 33333 val server = new Server(startPort) @@ -23,7 +24,23 @@ private[spark] class UISuite extends FunSuite { assert(boundPort != 0) Try {new ServerSocket(boundPort)} match { case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) - case Failure(e) => + case Failure (e) => } } + + test("string formatting of time durations") { + val second = 1000 + val minute = second * 60 + val hour = minute * 60 + def str = Utils.msDurationToString(_) + + assert(str(123) === "123ms") + assert(str(second) === "1.000s") + assert(str(second + 452) === "1.452s") + assert(str(hour) === "1:00:00") + assert(str(minute) === "1:00") + assert(str(minute + 4 * second + 34) === "1:04") + assert(str(10 * hour + minute + 4 * second) === "10:01:04") + assert(str(10 * hour + 59 * minute + 59 * second + 999) === "10:59:59") + } } diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala index ee26206e24..c2135195d8 100644 --- a/streaming/src/main/scala/spark/streaming/Duration.scala +++ b/streaming/src/main/scala/spark/streaming/Duration.scala @@ -1,5 +1,7 @@ package spark.streaming +import spark.Utils + case class Duration (private val millis: Long) { def < (that: Duration): Boolean = (this.millis < that.millis) @@ -32,8 +34,10 @@ case class Duration (private val millis: Long) { def toFormattedString: String = millis.toString def milliseconds: Long = millis -} + def prettyPrint = Utils.msDurationToString(millis) + +} /** * Helper object that creates instance of [[spark.streaming.Duration]] representing |