aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaren Feng <karenfeng.us@gmail.com>2013-07-24 10:53:02 -0700
committerKaren Feng <karenfeng.us@gmail.com>2013-07-24 10:53:02 -0700
commit93c6015f82d4d27a6f09686b1e849be1cbbd0615 (patch)
treef6e5bfed3e18153f9f745a78f2a1ac377e4a673d
parenta73f3ee536502c08c9643a7109461e124f815006 (diff)
downloadspark-93c6015f82d4d27a6f09686b1e849be1cbbd0615.tar.gz
spark-93c6015f82d4d27a6f09686b1e849be1cbbd0615.tar.bz2
spark-93c6015f82d4d27a6f09686b1e849be1cbbd0615.zip
Shows task status and running tasks on Stage Page: fixes SPARK-804 and 811
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala12
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala5
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala16
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala19
4 files changed, 46 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index a1ebd48b01..f840100eca 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -51,6 +51,18 @@ class TaskInfo(
def running: Boolean = !finished
+ def status: String = {
+ if (running)
+ return "RUNNING"
+ if (failed)
+ return "FAILED"
+ if (successful)
+ return "SUCCESSFUL"
+ if (finished)
+ return "FINISHED"
+ "UNKNOWN"
+ }
+
def duration: Long = {
if (!finished) {
throw new UnsupportedOperationException("duration() called on unfinished tasks")
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index bad5c442ab..db1c902955 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -124,6 +124,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
if (!executorToTasksActive.contains(eid))
executorToTasksActive(eid) = HashSet[Long]()
executorToTasksActive(eid) += taskStart.taskInfo.taskId
+ val taskList = executorToTaskInfos.getOrElse(
+ eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ executorToTaskInfos(eid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -142,6 +146,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 44dcf82d11..6e332415db 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -65,6 +65,7 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
+ val stageToTasksActive = HashMap[Int, HashSet[Long]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
@@ -93,8 +94,22 @@ private[spark] class JobProgressListener extends SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
activeStages += stageSubmitted.stage
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val sid = taskStart.task.stageId
+ if (!stageToTasksActive.contains(sid))
+ stageToTasksActive(sid) = HashSet[Long]()
+ stageToTasksActive(sid) += taskStart.taskInfo.taskId
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ stageToTaskInfos(sid) = taskList
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
+ if (!stageToTasksActive.contains(sid))
+ stageToTasksActive(sid) = HashSet[Long]()
+ stageToTasksActive(sid) -= taskEnd.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -106,6 +121,7 @@ private[spark] class JobProgressListener extends SparkListener {
}
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 292966f23a..b77b29045e 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -41,8 +41,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
if (!listener.stageToTaskInfos.contains(stageId)) {
val content =
<div>
- <h2>Summary Metrics</h2> No tasks have finished yet
- <h2>Tasks</h2> No tasks have finished yet
+ <h2>Summary Metrics</h2> No tasks have started yet
+ <h2>Tasks</h2> No tasks have started yet
</div>
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
@@ -53,7 +53,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWrite = listener.hasShuffleWrite(stageId)
val taskHeaders: Seq[String] =
- Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
+ Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Details")
@@ -61,7 +61,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskTable = listingTable(taskHeaders, taskRow, tasks)
// Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => Option(t._2).isDefined)
+ val validTasks = tasks.filter(t => t._1.status == "SUCCESSFUL" && (Option(t._2).isDefined))
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@@ -108,10 +108,17 @@ private[spark] class StagePage(parent: JobProgressUI) {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
+
+ val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
+ else metrics.map(m => m.executorRunTime).getOrElse(1)
+ val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
+ else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+
<tr>
<td>{info.taskId}</td>
- <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
- {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
+ <td>{info.status}</td>
+ <td sorttable_customkey={duration.toString}>
+ {formatDuration}
</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>