aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-26 21:13:26 -0700
committerReynold Xin <rxin@apache.org>2014-06-26 21:13:26 -0700
commitd1636dd72fc4966413baeb97ba55b313dc1da63d (patch)
treecb20bcc9304a4a3324b71b3b7b9f95c8523a0179 /core/src/main/scala
parentbf578deaf2493081ceeb78dfd7617def5699a06e (diff)
downloadspark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.gz
spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.tar.bz2
spark-d1636dd72fc4966413baeb97ba55b313dc1da63d.zip
[SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
New UI: ![screen shot 2014-06-26 at 1 43 52 pm](https://cloud.githubusercontent.com/assets/323388/3404643/82b9ddc6-fd73-11e3-96f9-f7592a7aee79.png) Author: Reynold Xin <rxin@apache.org> Closes #1236 from rxin/ui-task-attempt and squashes the following commits: 3b645dd [Reynold Xin] Expose attemptId in Stage. c0474b1 [Reynold Xin] Beefed up unit test. c404bdd [Reynold Xin] Fix ReplayListenerSuite. f56be4b [Reynold Xin] Fixed JsonProtocolSuite. e29e0f7 [Reynold Xin] Minor update. 5e4354a [Reynold Xin] [SPARK-2297][UI] Make task attempt and speculation more explicit in UI.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala9
6 files changed, 39 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 378cf1aaeb..82163eadd5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -75,9 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent
+@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 9a4be43ee2..8ec482a6f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -106,6 +106,8 @@ private[spark] class Stage(
id
}
+ def attemptId: Int = nextAttemptId
+
val name = callSite.short
val details = callSite.long
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 4c62e4dc0b..6aecdfe8e6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -27,10 +27,12 @@ import org.apache.spark.annotation.DeveloperApi
class TaskInfo(
val taskId: Long,
val index: Int,
+ val attempt: Int,
val launchTime: Long,
val executorId: String,
val host: String,
- val taskLocality: TaskLocality.TaskLocality) {
+ val taskLocality: TaskLocality.TaskLocality,
+ val speculative: Boolean) {
/**
* The time when the task started remotely getting the result. Will not be set if the
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index c0898f64fb..83ff6b8550 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -335,17 +335,19 @@ private[spark] class TaskSetManager(
/**
* Dequeue a pending task for a given node and return its index and locality level.
* Only search for tasks matching the given locality constraint.
+ *
+ * @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
- : Option[(Int, TaskLocality.Value)] =
+ : Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
- return Some((index, TaskLocality.PROCESS_LOCAL))
+ return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
- return Some((index, TaskLocality.NODE_LOCAL))
+ return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
@@ -354,23 +356,25 @@ private[spark] class TaskSetManager(
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
- return Some((index, TaskLocality.RACK_LOCAL))
+ return Some((index, TaskLocality.RACK_LOCAL, false))
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
- return Some((index, TaskLocality.PROCESS_LOCAL))
+ return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
- return Some((index, TaskLocality.ANY))
+ return Some((index, TaskLocality.ANY, false))
}
}
// Finally, if all else has failed, find a speculative task
- findSpeculativeTask(execId, host, locality)
+ findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
+ (taskIndex, allowedLocality, true)
+ }
}
/**
@@ -391,7 +395,7 @@ private[spark] class TaskSetManager(
}
findTask(execId, host, allowedLocality) match {
- case Some((index, taskLocality)) => {
+ case Some((index, taskLocality, speculative)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
@@ -400,7 +404,9 @@ private[spark] class TaskSetManager(
taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
- val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
+ val attemptNum = taskAttempts(index).size
+ val info = new TaskInfo(
+ taskId, index, attemptNum + 1, curTime, execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8b65f0671b..8e3d5d1cd4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -95,8 +95,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</div>
// scalastyle:on
val taskHeaders: Seq[String] =
- Seq("Task Index", "Task ID", "Status", "Locality Level", "Executor", "Launch Time") ++
- Seq("Duration", "GC Time", "Result Ser Time") ++
+ Seq(
+ "Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
+ "Launch Time", "Duration", "GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
{if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
@@ -245,6 +246,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<tr>
<td>{info.index}</td>
<td>{info.taskId}</td>
+ <td sorttable_customkey={info.attempt.toString}>{
+ if (info.speculative) s"${info.attempt} (speculative)" else info.attempt.toString
+ }</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
@@ -255,9 +259,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
+ <!--
+ TODO: Add this back after we add support to hide certain columns.
<td sorttable_customkey={serializationTime.toString}>
{if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
</td>
+ -->
{if (shuffleRead) {
<td sorttable_customkey={shuffleReadSortable}>
{shuffleReadReadable}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 7cecbfe62a..6245b4b802 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -32,6 +32,8 @@ import org.apache.spark.storage._
import org.apache.spark._
private[spark] object JsonProtocol {
+ // TODO: Remove this file and put JSON serialization into each individual class.
+
private implicit val format = DefaultFormats
/** ------------------------------------------------- *
@@ -194,10 +196,12 @@ private[spark] object JsonProtocol {
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
("Task ID" -> taskInfo.taskId) ~
("Index" -> taskInfo.index) ~
+ ("Attempt" -> taskInfo.attempt) ~
("Launch Time" -> taskInfo.launchTime) ~
("Executor ID" -> taskInfo.executorId) ~
("Host" -> taskInfo.host) ~
("Locality" -> taskInfo.taskLocality.toString) ~
+ ("Speculative" -> taskInfo.speculative) ~
("Getting Result Time" -> taskInfo.gettingResultTime) ~
("Finish Time" -> taskInfo.finishTime) ~
("Failed" -> taskInfo.failed) ~
@@ -487,16 +491,19 @@ private[spark] object JsonProtocol {
def taskInfoFromJson(json: JValue): TaskInfo = {
val taskId = (json \ "Task ID").extract[Long]
val index = (json \ "Index").extract[Int]
+ val attempt = (json \ "Attempt").extractOpt[Int].getOrElse(1)
val launchTime = (json \ "Launch Time").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
+ val speculative = (json \ "Speculative").extractOpt[Boolean].getOrElse(false)
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
val finishTime = (json \ "Finish Time").extract[Long]
val failed = (json \ "Failed").extract[Boolean]
val serializedSize = (json \ "Serialized Size").extract[Int]
- val taskInfo = new TaskInfo(taskId, index, launchTime, executorId, host, taskLocality)
+ val taskInfo =
+ new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
taskInfo.gettingResultTime = gettingResultTime
taskInfo.finishTime = finishTime
taskInfo.failed = failed