aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-26 14:00:45 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-26 14:03:22 -0700
commit6587ef7c1783961e6ef250afa387271a1bd6e277 (patch)
treef9fcb6525b5e656a2449bac8794f3e5e7ee74e60 /core
parent32a1ad75313472b1b098f7ec99335686d3fe4fc3 (diff)
downloadspark-6587ef7c1783961e6ef250afa387271a1bd6e277.tar.gz
spark-6587ef7c1783961e6ef250afa387271a1bd6e277.tar.bz2
spark-6587ef7c1783961e6ef250afa387271a1bd6e277.zip
[SPARK-2286][UI] Report exception/errors for failed tasks that are not ExceptionFailure
Also added inline doc for each TaskEndReason. Author: Reynold Xin <rxin@apache.org> Closes #1225 from rxin/SPARK-2286 and squashes the following commits: 6a7959d [Reynold Xin] Fix unit test failure. cf9d5eb [Reynold Xin] Merge branch 'master' into SPARK-2286 a61fae1 [Reynold Xin] Move to line above ... 38c7391 [Reynold Xin] [SPARK-2286][UI] Report exception/errors for failed tasks that are not ExceptionFailure.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala4
5 files changed, 77 insertions, 28 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index a3074916d1..5e8bd8c8e5 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -30,27 +30,69 @@ import org.apache.spark.storage.BlockManagerId
@DeveloperApi
sealed trait TaskEndReason
+/**
+ * :: DeveloperApi ::
+ * Task succeeded.
+ */
@DeveloperApi
case object Success extends TaskEndReason
+/**
+ * :: DeveloperApi ::
+ * Various possible reasons why a task failed.
+ */
+@DeveloperApi
+sealed trait TaskFailedReason extends TaskEndReason {
+ /** Error message displayed in the web UI. */
+ def toErrorString: String
+}
+
+/**
+ * :: DeveloperApi ::
+ * A [[org.apache.spark.scheduler.ShuffleMapTask]] that completed successfully earlier, but we
+ * lost the executor before the stage completed. This means Spark needs to reschedule the task
+ * to be re-executed on a different executor.
+ */
@DeveloperApi
-case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
+case object Resubmitted extends TaskFailedReason {
+ override def toErrorString: String = "Resubmitted (resubmitted due to lost executor)"
+}
+/**
+ * :: DeveloperApi ::
+ * Task failed to fetch shuffle data from a remote node. Probably means we have lost the remote
+ * executors the task is trying to fetch from, and thus need to rerun the previous stage.
+ */
@DeveloperApi
case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
- extends TaskEndReason
+ extends TaskFailedReason {
+ override def toErrorString: String = {
+ val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
+ s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId)"
+ }
+}
+/**
+ * :: DeveloperApi ::
+ * Task failed due to a runtime exception. This is the most common failure case and also captures
+ * user program exceptions.
+ */
@DeveloperApi
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
metrics: Option[TaskMetrics])
- extends TaskEndReason
+ extends TaskFailedReason {
+ override def toErrorString: String = {
+ val stackTraceString = if (stackTrace == null) "null" else stackTrace.mkString("\n")
+ s"$className ($description}\n$stackTraceString"
+ }
+}
/**
* :: DeveloperApi ::
@@ -58,10 +100,18 @@ case class ExceptionFailure(
* it was fetched.
*/
@DeveloperApi
-case object TaskResultLost extends TaskEndReason
+case object TaskResultLost extends TaskFailedReason {
+ override def toErrorString: String = "TaskResultLost (result lost from block manager)"
+}
+/**
+ * :: DeveloperApi ::
+ * Task was killed intentionally and needs to be rescheduled.
+ */
@DeveloperApi
-case object TaskKilled extends TaskEndReason
+case object TaskKilled extends TaskFailedReason {
+ override def toErrorString: String = "TaskKilled (killed intentionally)"
+}
/**
* :: DeveloperApi ::
@@ -69,7 +119,9 @@ case object TaskKilled extends TaskEndReason
* the task crashed the JVM.
*/
@DeveloperApi
-case object ExecutorLostFailure extends TaskEndReason
+case object ExecutorLostFailure extends TaskFailedReason {
+ override def toErrorString: String = "ExecutorLostFailure (executor lost)"
+}
/**
* :: DeveloperApi ::
@@ -77,4 +129,6 @@ case object ExecutorLostFailure extends TaskEndReason
* deserializing the task result.
*/
@DeveloperApi
-case object UnknownReason extends TaskEndReason
+case object UnknownReason extends TaskFailedReason {
+ override def toErrorString: String = "UnknownReason"
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index b5bcdd7e99..c0898f64fb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -641,7 +641,9 @@ private[spark] class TaskSetManager(
addPendingTask(index, readding=true)
}
- // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
+ // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage.
+ // The reason is the next stage wouldn't be able to fetch the data from this dead executor
+ // so we would need to rerun these tasks on other executors.
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index bfefe4dbc4..381a5443df 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable.{HashMap, ListBuffer}
-import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success}
+import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
@@ -51,6 +51,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
var totalShuffleRead = 0L
var totalShuffleWrite = 0L
+ // TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
@@ -183,17 +184,17 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
// Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
tasksActive.remove(info.taskId)
- val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+ val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
taskEnd.reason match {
case org.apache.spark.Success =>
stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
- case e: ExceptionFailure =>
+ case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
- (Some(e), e.metrics)
- case e: org.apache.spark.TaskEndReason =>
+ (Some(e.toErrorString), e.metrics)
+ case e: TaskFailedReason => // All other failure cases
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
- (None, None)
+ (Some(e.toErrorString), None)
}
stageIdToTime.getOrElseUpdate(sid, 0L)
@@ -221,7 +222,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
- taskMap(info.taskId) = new TaskUIData(info, metrics, failureInfo)
+ taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
stageIdToTaskData(sid) = taskMap
}
}
@@ -256,7 +257,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
case class TaskUIData(
taskInfo: TaskInfo,
taskMetrics: Option[TaskMetrics] = None,
- exception: Option[ExceptionFailure] = None)
+ errorMessage: Option[String] = None)
private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 4bce472036..8b65f0671b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -210,10 +210,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
(taskData: TaskUIData): Seq[Node] = {
- def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
- trace.map(e => <span style="display:block;">{e.toString}</span>)
-
- taskData match { case TaskUIData(info, metrics, exception) =>
+ taskData match { case TaskUIData(info, metrics, errorMessage) =>
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
@@ -283,12 +280,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
</td>
}}
<td>
- {exception.map { e =>
- <span>
- {e.className} ({e.description})<br/>
- {fmtStackTrace(e.stackTrace)}
- </span>
- }.getOrElse("")}
+ {errorMessage.map { e => <pre>{e}</pre> }.getOrElse("")}
</td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a3f824a4e1..30971f7696 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -91,13 +91,13 @@ private[ui] class StageTableBase(
{s.name}
</a>
- val details = if (s.details.nonEmpty) (
+ val details = if (s.details.nonEmpty) {
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
class="expand-details">
+show details
</span>
<pre class="stage-details collapsed">{s.details}</pre>
- )
+ }
listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)