aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/Utils.scala20
-rw-r--r--core/src/main/scala/spark/ui/JettyUtils.scala2
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala23
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala3
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala14
-rw-r--r--core/src/test/scala/spark/ui/UISuite.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/Duration.scala6
7 files changed, 80 insertions, 11 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index f3621c6bee..6966ee9ee9 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -486,6 +486,26 @@ private object Utils extends Logging {
}
/**
+ * Returns a human-readable string representing a duration such as "35ms"
+ */
+ def msDurationToString(ms: Long): String = {
+ val second = 1000
+ val minute = 60 * second
+ val hour = 60 * minute
+
+ ms match {
+ case t if t < second =>
+ "%dms".format(t)
+ case t if t < minute =>
+ "%d.%03ds".format(t / second, t % second)
+ case t if t < hour =>
+ "%d:%02d".format(t / minute, (t % minute) / second)
+ case t =>
+ "%d:%02d:%02d".format(t / hour, t % hour / minute, (t % hour) % minute / second)
+ }
+ }
+
+ /**
* Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
*/
def memoryMegabytesToString(megabytes: Long): String = {
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index 1f311bb75f..fde3606740 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -26,7 +26,7 @@ private[spark] object JettyUtils extends Logging {
createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
- createHandler(responder, "text/html")
+ createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString)
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 152e1a79b0..9b34cdd27f 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -19,8 +19,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
- val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total",
- "Shuffle Activity", "Stored RDD")
+ val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Progress",
+ "Tasks: Complete/Total", "Shuffle Activity", "Stored RDD")
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
@@ -43,6 +43,20 @@ private[spark] class IndexPage(parent: JobProgressUI) {
}
}
+ def makeSlider(completed: Int, total: Int): Seq[Node] = {
+ val width=130
+ val height=15
+ val completeWidth = (completed.toDouble / total) * width
+
+ <svg width={width.toString} height={height.toString}>
+ <rect width={width.toString} height={height.toString}
+ fill="white" stroke="black" stroke-width="1" />
+ <rect width={completeWidth.toString} height={height.toString}
+ fill="rgb(206,206,247)" stroke="black" stroke-width="1" />
+ </svg>
+ }
+
+
def stageRow(showLink: Boolean = true)(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
@@ -55,6 +69,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case (false, true) => "Write"
case _ => ""
}
+ val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
+ val totalTasks = s.numPartitions
<tr>
{if (showLink) {<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>}
@@ -63,7 +79,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
- <td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions}
+ <td>{makeSlider(completedTasks, totalTasks)}</td>
+ <td>{completedTasks} / {totalTasks}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ =>
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 46ba1d31af..a18bf0f81e 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -17,6 +17,7 @@ import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import spark.Success
+import spark.Utils
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) {
@@ -32,7 +33,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
sc.addSparkListener(listener)
}
- def formatDuration(ms: Long) = Duration(ms, "milliseconds").printHMS
+ def formatDuration(ms: Long) = Utils.msDurationToString(ms)
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 03200861e0..518f48cb81 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -19,20 +19,30 @@ private[spark] class StagePage(parent: JobProgressUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val stageId = request.getParameter("id").toInt
+
+ if (!listener.stageToTaskInfos.contains(stageId)) {
+ val content =
+ <div>
+ <h2>Summary Metrics</h2> No tasks have finished yet
+ <h2>Tasks</h2> No tasks have finished yet
+ </div>
+ return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId))
+ }
+
val tasks = listener.stageToTaskInfos(stageId)
val shuffleRead = listener.hasShuffleRead(stageId)
val shuffleWrite = listener.hasShuffleWrite(stageId)
val taskHeaders: Seq[String] =
- Seq("Task ID", "Service Time", "Locality Level", "Worker", "Launch Time") ++
+ Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil}
val taskTable = listingTable(taskHeaders, taskRow, tasks)
val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble}
- val serviceQuantiles = "Service Time" +: Distribution(serviceTimes).get.getQuantiles().map(
+ val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
def getQuantileCols(data: Seq[Double]) =
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
index 6519bf77c6..127ab5ebc2 100644
--- a/core/src/test/scala/spark/ui/UISuite.scala
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -2,10 +2,11 @@ package spark.ui
import org.scalatest.FunSuite
import org.eclipse.jetty.server.Server
-import util.{Try, Success, Failure}
import java.net.ServerSocket
+import scala.util.{Failure, Success, Try}
+import spark.Utils
-private[spark] class UISuite extends FunSuite {
+class UISuite extends FunSuite {
test("jetty port increases under contention") {
val startPort = 33333
val server = new Server(startPort)
@@ -23,7 +24,23 @@ private[spark] class UISuite extends FunSuite {
assert(boundPort != 0)
Try {new ServerSocket(boundPort)} match {
case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
- case Failure(e) =>
+ case Failure (e) =>
}
}
+
+ test("string formatting of time durations") {
+ val second = 1000
+ val minute = second * 60
+ val hour = minute * 60
+ def str = Utils.msDurationToString(_)
+
+ assert(str(123) === "123ms")
+ assert(str(second) === "1.000s")
+ assert(str(second + 452) === "1.452s")
+ assert(str(hour) === "1:00:00")
+ assert(str(minute) === "1:00")
+ assert(str(minute + 4 * second + 34) === "1:04")
+ assert(str(10 * hour + minute + 4 * second) === "10:01:04")
+ assert(str(10 * hour + 59 * minute + 59 * second + 999) === "10:59:59")
+ }
}
diff --git a/streaming/src/main/scala/spark/streaming/Duration.scala b/streaming/src/main/scala/spark/streaming/Duration.scala
index ee26206e24..c2135195d8 100644
--- a/streaming/src/main/scala/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/spark/streaming/Duration.scala
@@ -1,5 +1,7 @@
package spark.streaming
+import spark.Utils
+
case class Duration (private val millis: Long) {
def < (that: Duration): Boolean = (this.millis < that.millis)
@@ -32,8 +34,10 @@ case class Duration (private val millis: Long) {
def toFormattedString: String = millis.toString
def milliseconds: Long = millis
-}
+ def prettyPrint = Utils.msDurationToString(millis)
+
+}
/**
* Helper object that creates instance of [[spark.streaming.Duration]] representing