aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-07-29 16:32:55 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-07-29 16:32:55 -0700
commitc99b67440537aacba836624b0bfebc513b20f6b3 (patch)
tree45d17fe82b9bb6918d42258470aa81ea749f8935 /core
parentfe7298b587b89abffefab2febac4e3861ca2c1c4 (diff)
parent2d6da9195ab8feeaf3fc478ec09d22568d6aa64b (diff)
downloadspark-c99b67440537aacba836624b0bfebc513b20f6b3.tar.gz
spark-c99b67440537aacba836624b0bfebc513b20f6b3.tar.bz2
spark-c99b67440537aacba836624b0bfebc513b20f6b3.zip
Merge pull request #735 from karenfeng/ui-807
Totals for shuffle data and CPU time
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala14
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala56
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala64
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala35
4 files changed, 121 insertions, 48 deletions
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index db1c902955..b70153fd30 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -113,7 +113,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
- val executorToTasksActive = HashMap[String, HashSet[Long]]()
+ val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
@@ -121,9 +121,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
- if (!executorToTasksActive.contains(eid))
- executorToTasksActive(eid) = HashSet[Long]()
- executorToTasksActive(eid) += taskStart.taskInfo.taskId
+ val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ activeTasks += taskStart.taskInfo
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
@@ -132,9 +131,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
- if (!executorToTasksActive.contains(eid))
- executorToTasksActive(eid) = HashSet[Long]()
- executorToTasksActive(eid) -= taskEnd.taskInfo.taskId
+ val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+ activeTasks -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -142,7 +140,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
(Some(e), e.metrics)
case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
- (None, Some(taskEnd.taskMetrics))
+ (None, Option(taskEnd.taskMetrics))
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index f31af3cda6..646ae5ecbc 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -25,9 +25,10 @@ import scala.Some
import scala.xml.{NodeSeq, Node}
import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
import spark.storage.StorageLevel
+import spark.ui.Page._
+import spark.ui.UIUtils._
+import spark.Utils
/** Page showing list of all ongoing and recently finished stages */
private[spark] class IndexPage(parent: JobProgressUI) {
@@ -38,6 +39,12 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
+ val now = System.currentTimeMillis()
+
+ var activeTime = 0L
+ for (tasks <- listener.stageToTasksActive.values; t <- tasks) {
+ activeTime += t.timeRunning(now)
+ }
/** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
@@ -48,7 +55,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<th>Submitted</th>
<th>Duration</th>
<th colspan="2">Tasks: Complete/Total</th>
- <th>Shuffle Activity</th>
+ <th>Shuffle Read</th>
+ <th>Shuffle Write</th>
<th>Stored RDD</th>
</thead>
<tbody>
@@ -57,11 +65,33 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</table>
}
+ val summary: NodeSeq =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.totalTime + activeTime)}
+ </li>
+ {if (listener.totalShuffleRead > 0)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(listener.totalShuffleRead)}
+ </li>
+ }
+ {if (listener.totalShuffleWrite > 0)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(listener.totalShuffleWrite)}
+ </li>
+ }
+ </ul>
+ </div>
val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
val completedStageTable = stageTable(stageRow, completedStages)
val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
- val content = <h2>Active Stages</h2> ++ activeStageTable ++
+ val content = summary ++
+ <h2>Active Stages</h2> ++ activeStageTable ++
<h2>Completed Stages</h2> ++ completedStageTable ++
<h2>Failed Stages</h2> ++ failedStageTable
@@ -94,13 +124,16 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
- val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
- val shuffleInfo = (read, write) match {
- case (true, true) => "Read/Write"
- case (true, false) => "Read"
- case (false, true) => "Write"
- case _ => ""
+
+ val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
+ }
+ val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
}
+
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
@@ -117,7 +150,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case _ =>
}}
</td>
- <td>{shuffleInfo}</td>
+ <td>{shuffleRead}</td>
+ <td>{shuffleWrite}</td>
<td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
<a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
{Option(s.rdd.name).getOrElse(s.rdd.id)}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 6e332415db..09d24b6302 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -65,7 +65,15 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
- val stageToTasksActive = HashMap[Int, HashSet[Long]]()
+ // Total metrics reflect metrics only for completed tasks
+ var totalTime = 0L
+ var totalShuffleRead = 0L
+ var totalShuffleWrite = 0L
+
+ val stageToTime = HashMap[Int, Long]()
+ val stageToShuffleRead = HashMap[Int, Long]()
+ val stageToShuffleWrite = HashMap[Int, Long]()
+ val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
@@ -86,6 +94,12 @@ private[spark] class JobProgressListener extends SparkListener {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id)
+ stageToTime.remove(s.id)
+ stageToShuffleRead.remove(s.id)
+ stageToShuffleWrite.remove(s.id)
+ stageToTasksActive.remove(s.id)
+ stageToTasksComplete.remove(s.id)
+ stageToTasksFailed.remove(s.id)
})
stages.trimEnd(toRemove)
}
@@ -96,9 +110,8 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId
- if (!stageToTasksActive.contains(sid))
- stageToTasksActive(sid) = HashSet[Long]()
- stageToTasksActive(sid) += taskStart.taskInfo.taskId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
@@ -107,9 +120,8 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
- if (!stageToTasksActive.contains(sid))
- stageToTasksActive(sid) = HashSet[Long]()
- stageToTasksActive(sid) -= taskEnd.taskInfo.taskId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -117,8 +129,26 @@ private[spark] class JobProgressListener extends SparkListener {
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Some(taskEnd.taskMetrics))
+ (None, Option(taskEnd.taskMetrics))
}
+
+ stageToTime.getOrElseUpdate(sid, 0L)
+ val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+ stageToTime(sid) += time
+ totalTime += time
+
+ stageToShuffleRead.getOrElseUpdate(sid, 0L)
+ val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+ s.remoteBytesRead).getOrElse(0L)
+ stageToShuffleRead(sid) += shuffleRead
+ totalShuffleRead += shuffleRead
+
+ stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+ val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+ s.shuffleBytesWritten).getOrElse(0L)
+ stageToShuffleWrite(sid) += shuffleWrite
+ totalShuffleWrite += shuffleWrite
+
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
@@ -139,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener {
case _ =>
}
}
-
- /** Is this stage's input from a shuffle read. */
- def hasShuffleRead(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
-
- /** Is this stage's output to a shuffle write. */
- def hasShuffleWrite(stageID: Int): Boolean = {
- // This is written in a slightly complicated way to avoid having to scan all tasks
- for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
- if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
- }
- return false // No tasks have finished for this stage
- }
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 654f347723..e327cb3947 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -37,6 +37,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val stageId = request.getParameter("id").toInt
+ val now = System.currentTimeMillis()
if (!listener.stageToTaskInfos.contains(stageId)) {
val content =
@@ -49,8 +50,35 @@ private[spark] class StagePage(parent: JobProgressUI) {
val tasks = listener.stageToTaskInfos(stageId)
- val shuffleRead = listener.hasShuffleRead(stageId)
- val shuffleWrite = listener.hasShuffleWrite(stageId)
+ val shuffleRead = listener.stageToShuffleRead(stageId) > 0
+ val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
+
+ var activeTime = 0L
+ listener.stageToTasksActive(stageId).foreach { t =>
+ activeTime += t.timeRunning(now)
+ }
+
+ val summary =
+ <div>
+ <ul class="unstyled">
+ <li>
+ <strong>CPU time: </strong>
+ {parent.formatDuration(listener.stageToTime(stageId) + activeTime)}
+ </li>
+ {if (shuffleRead)
+ <li>
+ <strong>Shuffle read: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))}
+ </li>
+ }
+ {if (shuffleWrite)
+ <li>
+ <strong>Shuffle write: </strong>
+ {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))}
+ </li>
+ }
+ </ul>
+ </div>
val taskHeaders: Seq[String] =
Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
@@ -98,7 +126,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
}
val content =
- <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable;
+ summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
+ <h2>Tasks</h2> ++ taskTable;
headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}