diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-06-22 10:40:41 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-06-24 12:40:41 -0700 |
commit | f6e64b5cd6e6ac5ae3bee05af2832e1f71992310 (patch) | |
tree | 03daf4cd1d4312f9bdf0e82d948d434226eeca60 /core/src/main | |
parent | 7e9f1ed0decbbb261432dcbcd2179b829dbcbc82 (diff) | |
download | spark-f6e64b5cd6e6ac5ae3bee05af2832e1f71992310.tar.gz spark-f6e64b5cd6e6ac5ae3bee05af2832e1f71992310.tar.bz2 spark-f6e64b5cd6e6ac5ae3bee05af2832e1f71992310.zip |
Updating based on changes to JobLogger (and one small change to JobLogger)
Diffstat (limited to 'core/src/main')
5 files changed, 18 insertions, 15 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index bdd8792ce9..82d419453b 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -256,7 +256,7 @@ class DAGScheduler( eventQueue.put(toSubmit) waiter.awaitResult() match { case JobSucceeded => {} - case JobFailed(exception: Exception) => + case JobFailed(exception: Exception, _) => logInfo("Failed to run " + callSite) throw exception } @@ -324,7 +324,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None)))) } return true } @@ -671,7 +671,7 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))) activeJobs -= job resultStageToJob -= resultStage } diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 178bfaba3d..6a9d52f356 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -275,7 +275,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { var info = "JOB_ID=" + job.runId
reason match {
case JobSucceeded => info += " STATUS=SUCCESS"
- case JobFailed(exception) =>
+ case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala index 654131ee84..a0fdf391e6 100644 --- a/core/src/main/scala/spark/scheduler/JobResult.scala +++ b/core/src/main/scala/spark/scheduler/JobResult.scala @@ -6,4 +6,4 @@ package spark.scheduler private[spark] sealed trait JobResult private[spark] case object JobSucceeded extends JobResult -private[spark] case class JobFailed(exception: Exception) extends JobResult +private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala index 3cc6a86345..6ff2e29434 100644 --- a/core/src/main/scala/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala @@ -35,7 +35,7 @@ private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Un throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter") } jobFinished = true - jobResult = JobFailed(exception) + jobResult = JobFailed(exception, None) this.notifyAll() } } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index dea4f0e3b0..70f8e431f2 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -70,25 +70,28 @@ private[spark] class JobProgressListener extends SparkListener { activeStages += stageSubmitted.stage override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - val sid = taskEnd.event.task.stageId - taskEnd.event.reason match { + val sid = taskEnd.task.stageId + taskEnd.reason match { case Success => stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 case _ => stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 } val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]()) - taskList += ((taskEnd.event.taskInfo, taskEnd.event.taskMetrics)) + taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics)) stageToTaskInfos(sid) = taskList } - override def onJobEnd(jobEnd: SparkListenerEvents) { + override def onJobEnd(jobEnd: SparkListenerJobEnd) { jobEnd match { - case failed: SparkListenerJobFailed => - val stage = failed.failedStage - activeStages -= stage - failedStages += stage - trimIfNecessary(failedStages) + case end: SparkListenerJobEnd => + end.jobResult match { + case JobFailed(ex, Some(stage)) => + activeStages -= stage + failedStages += stage + trimIfNecessary(failedStages) + case _ => + } case _ => } } |