aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-29 11:26:30 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-29 11:26:30 -0700
commite721ff7e5a2d738750c40e95d6ba53898eaa7051 (patch)
treeee5baba0b4d210b8215226c33c0017d34500453a
parent473961d82e70f4cdb9e3d12b0dfcb4b70e6121ab (diff)
downloadspark-e721ff7e5a2d738750c40e95d6ba53898eaa7051.tar.gz
spark-e721ff7e5a2d738750c40e95d6ba53898eaa7051.tar.bz2
spark-e721ff7e5a2d738750c40e95d6ba53898eaa7051.zip
Allowing details for failed stages
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala1
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala30
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala87
5 files changed, 74 insertions, 51 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 82d419453b..d9ddc41aa2 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -618,8 +618,11 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, Some(task.generation))
}
+ case ExceptionFailure(className, description, stackTrace) =>
+ // Do nothing here, left up to the TaskScheduler to decide how to handle user failures
+
case other =>
- // Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage
+ // Unrecognized failure - abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other)
}
}
@@ -667,6 +670,7 @@ class DAGScheduler(
*/
private def abortStage(failedStage: Stage, reason: String) {
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
+ failedStage.completionTime = Some(System.currentTimeMillis())
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index d72b0bfc9f..6965cde5da 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -571,6 +571,7 @@ private[spark] class ClusterTaskSetManager(
return
case ef: ExceptionFailure =>
+ sched.listener.taskEnded(tasks(index), ef, null, null, info, null)
val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 70b69bb26f..499116f653 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -152,6 +152,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
info.markFailed()
decreaseRunningTasks(1)
val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader)
+ sched.listener.taskEnded(task, reason, null, null, info, null)
if (!finished(index)) {
copiesRunning(index) -= 1
numFailures(index) += 1
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index f584d1e187..aafa414055 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -12,12 +12,11 @@ import scala.Seq
import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import spark.ui.JettyUtils._
-import spark.SparkContext
+import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
-import spark.Success
-import spark.Utils
+import collection.mutable
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) {
@@ -51,7 +50,8 @@ private[spark] class JobProgressListener extends SparkListener {
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
- val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]()
+ val stageToTaskInfos =
+ HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {}
@@ -67,8 +67,6 @@ private[spark] class JobProgressListener extends SparkListener {
if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => {
- stageToTasksComplete.remove(s.id)
- stageToTasksFailed.remove(s.id)
stageToTaskInfos.remove(s.id)
})
stages.trimEnd(toRemove)
@@ -80,14 +78,18 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
- taskEnd.reason match {
- case Success =>
- stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- case _ =>
- stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- }
- val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]())
- taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics))
+ val failureInfo: Option[ExceptionFailure] =
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ Some(e)
+ case _ =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ None
+ }
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]())
+ taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index c9294a7261..ed96fc2994 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -9,7 +9,7 @@ import scala.xml.Node
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.util.Distribution
-import spark.Utils
+import spark.{ExceptionFailure, Utils}
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
@@ -38,56 +38,71 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskHeaders: Seq[String] =
Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
- {if (shuffleWrite) Seq("Shuffle Write") else Nil}
+ {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
+ Seq("Details")
val taskTable = listingTable(taskHeaders, taskRow, tasks)
- val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble}
- val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
- ms => parent.formatDuration(ms.toLong))
-
- def getQuantileCols(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
-
- val shuffleReadSizes = tasks.map {
- case(info, metrics) =>
- metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
- }
- val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
-
- val shuffleWriteSizes = tasks.map {
- case(info, metrics) =>
- metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
- }
- val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
-
- val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
- if (shuffleRead) shuffleReadQuantiles else Nil,
- if (shuffleWrite) shuffleWriteQuantiles else Nil)
-
- val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
- val quantileTable = listingTable(quantileHeaders, quantileRow, listings)
+ // Excludes tasks which failed and have incomplete metrics
+ val validTasks = tasks.filter(t => Option(t._2).isDefined)
+
+ val summaryTable: Option[Seq[Node]] =
+ if (validTasks.size == 0) {
+ None
+ }
+ else {
+ val serviceTimes = validTasks.map{case (info, metrics, exception) =>
+ metrics.executorRunTime.toDouble}
+ val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
+ ms => parent.formatDuration(ms.toLong))
+
+ def getQuantileCols(data: Seq[Double]) =
+ Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
+
+ val shuffleReadSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
+ }
+ val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
+
+ val shuffleWriteSizes = validTasks.map {
+ case(info, metrics, exception) =>
+ metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
+ }
+ val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+
+ val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+ if (shuffleRead) shuffleReadQuantiles else Nil,
+ if (shuffleWrite) shuffleWriteQuantiles else Nil)
+
+ val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
+ def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
+ Some(listingTable(quantileHeaders, quantileRow, listings))
+ }
val content =
- <h2>Summary Metrics</h2> ++ quantileTable ++ <h2>Tasks</h2> ++ taskTable;
+ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable;
headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
- def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
- def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = {
- val (info, metrics) = taskData
+ def taskRow(taskData: (TaskInfo, TaskMetrics, Option[ExceptionFailure])): Seq[Node] = {
+ def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
+ trace.map(e => <span style="display:block;">{e.toString}</span>)
+ val (info, metrics, exception) = taskData
<tr>
<td>{info.taskId}</td>
- <td>{parent.formatDuration(metrics.executorRunTime)}</td>
+ <td>{Option(metrics).map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
- {metrics.shuffleReadMetrics.map{m =>
- <td>{Utils.memoryBytesToString(m.remoteBytesRead)}</td>}.getOrElse("") }
- {metrics.shuffleWriteMetrics.map{m =>
- <td>{Utils.memoryBytesToString(m.shuffleBytesWritten)}</td>}.getOrElse("") }
+ {Option(metrics).flatMap{m => m.shuffleReadMetrics}.map{s =>
+ <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")}
+ {Option(metrics).flatMap{m => m.shuffleWriteMetrics}.map{s =>
+ <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")}
+ <td>{exception.map(e =>
+ <span>{e.className}<br/>{fmtStackTrace(e.stackTrace)}</span>).getOrElse("")}</td>
</tr>
}
}