diff options
author | Reynold Xin <rxin@apache.org> | 2013-12-01 12:46:58 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2013-12-01 12:46:58 -0800 |
commit | 740922f25d5f81617fbe02c7bcd1610d6426bbef (patch) | |
tree | fe5e688d66cd96519ad79636d51223d8e8ad3e26 /core | |
parent | 60e23a58b288dae3c87da28e1506323b1d88ee9e (diff) | |
parent | be3ea2394fa2e626fb6b5f2cd46e7156016c9b3f (diff) | |
download | spark-740922f25d5f81617fbe02c7bcd1610d6426bbef.tar.gz spark-740922f25d5f81617fbe02c7bcd1610d6426bbef.tar.bz2 spark-740922f25d5f81617fbe02c7bcd1610d6426bbef.zip |
Merge pull request #219 from sundeepn/schedulerexception
Scheduler quits when newStage fails
The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bc37a70e98..a785a16a36 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -413,7 +413,17 @@ class DAGScheduler( private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = { event match { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => - val finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) + var finalStage: Stage = null + try { + // New stage creation at times and if its not protected, the scheduler thread is killed. + // e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted + finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) + } catch { + case e: Exception => + logWarning("Creating new stage failed due to exception - job: " + jobId, e) + listener.jobFailed(e) + return false + } val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length + |