aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-11-24 17:49:14 -0800
committerMark Hamstra <markhamstra@gmail.com>2013-12-03 09:57:32 -0800
commitc9fcd909d0f86b08935a132409888b30e989bca4 (patch)
tree2e2c6d8a7eaabb38b799873b0c9442864f8181ee /core
parent9ae2d094a967782e3f5a624dd854059a40430ee6 (diff)
downloadspark-c9fcd909d0f86b08935a132409888b30e989bca4.tar.gz
spark-c9fcd909d0f86b08935a132409888b30e989bca4.tar.bz2
spark-c9fcd909d0f86b08935a132409888b30e989bca4.zip
Local jobs post SparkListenerJobEnd, and DAGScheduler data structure
cleanup always occurs before any posting of SparkListenerJobEnd.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala2
2 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 01c5133e6e..b371a2412f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -597,12 +597,13 @@ class DAGScheduler(
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
- case LocalJobCompleted(stage) =>
- val jobId = stageIdToJobIds(stage.id).head
+ case LocalJobCompleted(job, result) =>
+ val stage = job.finalStage
stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job,
stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through
stageToInfos -= stage // completion events or stage abort
- jobIdToStageIds -= jobId
+ jobIdToStageIds -= job.jobId
+ listenerBus.post(SparkListenerJobEnd(job, result))
case TaskSetFailed(taskSet, reason) =>
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
@@ -672,6 +673,7 @@ class DAGScheduler(
// Broken out for easier testing in DAGSchedulerSuite.
protected def runLocallyWithinThread(job: ActiveJob) {
+ var jobResult: JobResult = JobSucceeded
try {
SparkEnv.set(env)
val rdd = job.finalStage.rdd
@@ -686,9 +688,10 @@ class DAGScheduler(
}
} catch {
case e: Exception =>
+ jobResult = JobFailed(e, Some(job.finalStage))
job.listener.jobFailed(e)
} finally {
- eventProcessActor ! LocalJobCompleted(job.finalStage)
+ eventProcessActor ! LocalJobCompleted(job, jobResult)
}
}
@@ -835,8 +838,8 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
- listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
jobIdToStageIdsRemove(job.jobId)
+ listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
@@ -987,10 +990,10 @@ class DAGScheduler(
val error = new SparkException("Job %d cancelled".format(jobId))
val job = idToActiveJob(jobId)
job.listener.jobFailed(error)
- listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage))))
jobIdToStageIds -= jobId
activeJobs -= job
idToActiveJob -= jobId
+ listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage))))
}
}
@@ -1009,11 +1012,11 @@ class DAGScheduler(
val job = resultStageToJob(resultStage)
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
- listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
jobIdToStageIdsRemove(job.jobId)
idToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
+ listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index bf8dfb5ac7..aa496b7ac6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,7 +65,7 @@ private[scheduler] case class CompletionEvent(
taskMetrics: TaskMetrics)
extends DAGSchedulerEvent
-private[scheduler] case class LocalJobCompleted(stage: Stage) extends DAGSchedulerEvent
+private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent
private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent