aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-22 10:40:41 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-24 12:40:41 -0700
commitf6e64b5cd6e6ac5ae3bee05af2832e1f71992310 (patch)
tree03daf4cd1d4312f9bdf0e82d948d434226eeca60 /core/src/main
parent7e9f1ed0decbbb261432dcbcd2179b829dbcbc82 (diff)
downloadspark-f6e64b5cd6e6ac5ae3bee05af2832e1f71992310.tar.gz
spark-f6e64b5cd6e6ac5ae3bee05af2832e1f71992310.tar.bz2
spark-f6e64b5cd6e6ac5ae3bee05af2832e1f71992310.zip
Updating based on changes to JobLogger (and one small change to JobLogger)
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobResult.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobWaiter.scala2
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala21
5 files changed, 18 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index bdd8792ce9..82d419453b 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -256,7 +256,7 @@ class DAGScheduler(
eventQueue.put(toSubmit)
waiter.awaitResult() match {
case JobSucceeded => {}
- case JobFailed(exception: Exception) =>
+ case JobFailed(exception: Exception, _) =>
logInfo("Failed to run " + callSite)
throw exception
}
@@ -324,7 +324,7 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None))))
}
return true
}
@@ -671,7 +671,7 @@ class DAGScheduler(
val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))))
activeJobs -= job
resultStageToJob -= resultStage
}
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 178bfaba3d..6a9d52f356 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -275,7 +275,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
var info = "JOB_ID=" + job.runId
reason match {
case JobSucceeded => info += " STATUS=SUCCESS"
- case JobFailed(exception) =>
+ case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala
index 654131ee84..a0fdf391e6 100644
--- a/core/src/main/scala/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/spark/scheduler/JobResult.scala
@@ -6,4 +6,4 @@ package spark.scheduler
private[spark] sealed trait JobResult
private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception) extends JobResult
+private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult
diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala
index 3cc6a86345..6ff2e29434 100644
--- a/core/src/main/scala/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala
@@ -35,7 +35,7 @@ private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Un
throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
}
jobFinished = true
- jobResult = JobFailed(exception)
+ jobResult = JobFailed(exception, None)
this.notifyAll()
}
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index dea4f0e3b0..70f8e431f2 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -70,25 +70,28 @@ private[spark] class JobProgressListener extends SparkListener {
activeStages += stageSubmitted.stage
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val sid = taskEnd.event.task.stageId
- taskEnd.event.reason match {
+ val sid = taskEnd.task.stageId
+ taskEnd.reason match {
case Success =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
case _ =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
}
val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]())
- taskList += ((taskEnd.event.taskInfo, taskEnd.event.taskMetrics))
+ taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics))
stageToTaskInfos(sid) = taskList
}
- override def onJobEnd(jobEnd: SparkListenerEvents) {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
jobEnd match {
- case failed: SparkListenerJobFailed =>
- val stage = failed.failedStage
- activeStages -= stage
- failedStages += stage
- trimIfNecessary(failedStages)
+ case end: SparkListenerJobEnd =>
+ end.jobResult match {
+ case JobFailed(ex, Some(stage)) =>
+ activeStages -= stage
+ failedStages += stage
+ trimIfNecessary(failedStages)
+ case _ =>
+ }
case _ =>
}
}