aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala11
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala5
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala16
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala19
4 files changed, 45 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index a1ebd48b01..c693b722ac 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -51,6 +51,17 @@ class TaskInfo(
def running: Boolean = !finished
+ def status: String = {
+ if (running)
+ "RUNNING"
+ else if (failed)
+ "FAILED"
+ else if (successful)
+ "SUCCESS"
+ else
+ "UNKNOWN"
+ }
+
def duration: Long = {
if (!finished) {
throw new UnsupportedOperationException("duration() called on unfinished tasks")
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index bad5c442ab..db1c902955 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -124,6 +124,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
if (!executorToTasksActive.contains(eid))
executorToTasksActive(eid) = HashSet[Long]()
executorToTasksActive(eid) += taskStart.taskInfo.taskId
+ val taskList = executorToTaskInfos.getOrElse(
+ eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ executorToTaskInfos(eid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -142,6 +146,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 44dcf82d11..6e332415db 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -65,6 +65,7 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
+ val stageToTasksActive = HashMap[Int, HashSet[Long]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
@@ -93,8 +94,22 @@ private[spark] class JobProgressListener extends SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
activeStages += stageSubmitted.stage
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val sid = taskStart.task.stageId
+ if (!stageToTasksActive.contains(sid))
+ stageToTasksActive(sid) = HashSet[Long]()
+ stageToTasksActive(sid) += taskStart.taskInfo.taskId
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ stageToTaskInfos(sid) = taskList
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
+ if (!stageToTasksActive.contains(sid))
+ stageToTasksActive(sid) = HashSet[Long]()
+ stageToTasksActive(sid) -= taskEnd.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@@ -106,6 +121,7 @@ private[spark] class JobProgressListener extends SparkListener {
}
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 292966f23a..654f347723 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -41,8 +41,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
if (!listener.stageToTaskInfos.contains(stageId)) {
val content =
<div>
- <h2>Summary Metrics</h2> No tasks have finished yet
- <h2>Tasks</h2> No tasks have finished yet
+ <h2>Summary Metrics</h2> No tasks have started yet
+ <h2>Tasks</h2> No tasks have started yet
</div>
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
@@ -53,7 +53,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWrite = listener.hasShuffleWrite(stageId)
val taskHeaders: Seq[String] =
- Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
+ Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Details")
@@ -61,7 +61,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskTable = listingTable(taskHeaders, taskRow, tasks)
// Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => Option(t._2).isDefined)
+ val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined))
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@@ -108,10 +108,17 @@ private[spark] class StagePage(parent: JobProgressUI) {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
+
+ val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
+ else metrics.map(m => m.executorRunTime).getOrElse(1)
+ val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
+ else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
+
<tr>
<td>{info.taskId}</td>
- <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
- {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
+ <td>{info.status}</td>
+ <td sorttable_customkey={duration.toString}>
+ {formatDuration}
</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>