diff options
Diffstat (limited to 'core/src/main/scala/spark/scheduler/DAGScheduler.scala')
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index b320be8863..f599eb00bd 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -40,7 +40,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with eventQueue.put(HostLost(host)) } - // Called by TaskScheduler to cancel an entier TaskSet due to repeated failures. + // Called by TaskScheduler to cancel an entire TaskSet due to repeated failures. override def taskSetFailed(taskSet: TaskSet, reason: String) { eventQueue.put(TaskSetFailed(taskSet, reason)) } @@ -54,8 +54,6 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with // resubmit failed stages val POLL_TIMEOUT = 10L - private val lock = new Object // Used for access to the entire DAGScheduler - private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent] val nextRunId = new AtomicInteger(0) @@ -337,9 +335,12 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val rdd = job.finalStage.rdd val split = rdd.splits(job.partitions(0)) val taskContext = new TaskContext(job.finalStage.id, job.partitions(0), 0) - val result = job.func(taskContext, rdd.iterator(split, taskContext)) - taskContext.executeOnCompleteCallbacks() - job.listener.taskSucceeded(0, result) + try { + val result = job.func(taskContext, rdd.iterator(split, taskContext)) + job.listener.taskSucceeded(0, result) + } finally { + taskContext.executeOnCompleteCallbacks() + } } catch { case e: Exception => job.listener.jobFailed(e) |