diff options
author | Reynold Xin <rxin@apache.org> | 2014-06-26 21:13:26 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-26 21:13:26 -0700 |
commit | d1636dd72fc4966413baeb97ba55b313dc1da63d (patch) | |
tree | cb20bcc9304a4a3324b71b3b7b9f95c8523a0179 /core/src/main/scala | |
parent | bf578deaf2493081ceeb78dfd7617def5699a06e (diff) | |
download | spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.gz spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.bz2 spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.zip |
[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
New UI:
![screen shot 2014-06-26 at 1 43 52 pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png)
Author: Reynold Xin <rxin@apache.org>
Closes #1236 from rxin/ui-task-attempt and squashes the following commits:
3b645dd [Reynold Xin] Expose attemptId in Stage.
c0474b1 [Reynold Xin] Beefed up unit test.
c404bdd [Reynold Xin] Fix ReplayListenerSuite.
f56be4b [Reynold Xin] Fixed JsonProtocolSuite.
e29e0f7 [Reynold Xin] Minor update.
5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
Diffstat (limited to 'core/src/main/scala')
6 files changed, 39 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 378cf1aaeb..82163eadd5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId) @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent +@DeveloperApi case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String) extends SparkListenerEvent +@DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 9a4be43ee2..8ec482a6f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -106,6 +106,8 @@ private[spark] class Stage( id } + def attemptId: Int = nextAttemptId + val name = callSite.short val details = callSite.long diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 4c62e4dc0b..6aecdfe8e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi class TaskInfo( val taskId: Long, val index: Int, + val attempt: Int, val launchTime: Long, val executorId: String, val host: String, - val taskLocality: TaskLocality.TaskLocality) { + val taskLocality: TaskLocality.TaskLocality, + val speculative: Boolean) { /** * The time when the task started remotely getting the result. Will not be set if the diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c0898f64fb..83ff6b8550 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -335,17 +335,19 @@ private[spark] class TaskSetManager( /** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. + * + * @return An option containing (task index within the task set, locality, is speculative?) */ private def findTask(execId: String, host: String, locality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value)] = + : Option[(Int, TaskLocality.Value, Boolean)] = { for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { - return Some((index, TaskLocality.PROCESS_LOCAL)) + return Some((index, TaskLocality.PROCESS_LOCAL, false)) } if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { - return Some((index, TaskLocality.NODE_LOCAL)) + return Some((index, TaskLocality.NODE_LOCAL, false)) } } @@ -354,23 +356,25 @@ private[spark] class TaskSetManager( rack <- sched.getRackForHost(host) index <- findTaskFromList(execId, getPendingTasksForRack(rack)) } { - return Some((index, TaskLocality.RACK_LOCAL)) + return Some((index, TaskLocality.RACK_LOCAL, false)) } } // Look for no-pref tasks after rack-local tasks since they can run anywhere. for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { - return Some((index, TaskLocality.PROCESS_LOCAL)) + return Some((index, TaskLocality.PROCESS_LOCAL, false)) } if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { for (index <- findTaskFromList(execId, allPendingTasks)) { - return Some((index, TaskLocality.ANY)) + return Some((index, TaskLocality.ANY, false)) } } // Finally, if all else has failed, find a speculative task - findSpeculativeTask(execId, host, locality) + findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) => + (taskIndex, allowedLocality, true) + } } /** @@ -391,7 +395,7 @@ private[spark] class TaskSetManager( } findTask(execId, host, allowedLocality) match { - case Some((index, taskLocality)) => { + case Some((index, taskLocality, speculative)) => { // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() @@ -400,7 +404,9 @@ private[spark] class TaskSetManager( taskSet.id, index, taskId, execId, host, taskLocality)) // Do various bookkeeping copiesRunning(index) += 1 - val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality) + val attemptNum = taskAttempts(index).size + val info = new TaskInfo( + taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8b65f0671b..8e3d5d1cd4 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { </div> // scalastyle:on val taskHeaders: Seq[String] = - Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++ - Seq("Duration", "GC Time", "Result Ser Time") ++ + Seq( + "Index", "ID", "Attempt", "Status", "Locality Level", "Executor", + "Launch Time", "Duration", "GC Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++ @@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { <tr> <td>{info.index}</td> <td>{info.taskId}</td> + <td sorttable_customkey={info.attempt.toString}>{ + if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString + }</td> <td>{info.status}</td> <td>{info.taskLocality}</td> <td>{info.host}</td> @@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { <td sorttable_customkey={gcTime.toString}> {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} </td> + <!-- + TODO: Add this back after we add support to hide certain columns. <td sorttable_customkey={serializationTime.toString}> {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""} </td> + --> {if (shuffleRead) { <td sorttable_customkey={shuffleReadSortable}> {shuffleReadReadable} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 7cecbfe62a..6245b4b802 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -32,6 +32,8 @@ import org.apache.spark.storage._ import org.apache.spark._ private[spark] object JsonProtocol { + // TODO: Remove this file and put JSON serialization into each individual class. + private implicit val format = DefaultFormats /** ------------------------------------------------- * @@ -194,10 +196,12 @@ private[spark] object JsonProtocol { def taskInfoToJson(taskInfo: TaskInfo): JValue = { ("Task ID" -> taskInfo.taskId) ~ ("Index" -> taskInfo.index) ~ + ("Attempt" -> taskInfo.attempt) ~ ("Launch Time" -> taskInfo.launchTime) ~ ("Executor ID" -> taskInfo.executorId) ~ ("Host" -> taskInfo.host) ~ ("Locality" -> taskInfo.taskLocality.toString) ~ + ("Speculative" -> taskInfo.speculative) ~ ("Getting Result Time" -> taskInfo.gettingResultTime) ~ ("Finish Time" -> taskInfo.finishTime) ~ ("Failed" -> taskInfo.failed) ~ @@ -487,16 +491,19 @@ private[spark] object JsonProtocol { def taskInfoFromJson(json: JValue): TaskInfo = { val taskId = (json \ "Task ID").extract[Long] val index = (json \ "Index").extract[Int] + val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1) val launchTime = (json \ "Launch Time").extract[Long] val executorId = (json \ "Executor ID").extract[String] val host = (json \ "Host").extract[String] val taskLocality = TaskLocality.withName((json \ "Locality").extract[String]) + val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false) val gettingResultTime = (json \ "Getting Result Time").extract[Long] val finishTime = (json \ "Finish Time").extract[Long] val failed = (json \ "Failed").extract[Boolean] val serializedSize = (json \ "Serialized Size").extract[Int] - val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality) + val taskInfo = + new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative) taskInfo.gettingResultTime = gettingResultTime taskInfo.finishTime = finishTime taskInfo.failed = failed |