aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-12-01 12:46:58 -0800
committerReynold Xin <rxin@apache.org>2013-12-01 12:46:58 -0800
commit740922f25d5f81617fbe02c7bcd1610d6426bbef (patch)
treefe5e688d66cd96519ad79636d51223d8e8ad3e26 /core
parent60e23a58b288dae3c87da28e1506323b1d88ee9e (diff)
parentbe3ea2394fa2e626fb6b5f2cd46e7156016c9b3f (diff)
downloadspark-740922f25d5f81617fbe02c7bcd1610d6426bbef.tar.gz
spark-740922f25d5f81617fbe02c7bcd1610d6426bbef.tar.bz2
spark-740922f25d5f81617fbe02c7bcd1610d6426bbef.zip
Merge pull request #219 from sundeepn/schedulerexception
Scheduler quits when newStage fails The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala12
1 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index bc37a70e98..a785a16a36 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -413,7 +413,17 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
- val finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
+ var finalStage: Stage = null
+ try {
+ // New stage creation at times and if its not protected, the scheduler thread is killed.
+ // e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted
+ finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
+ } catch {
+ case e: Exception =>
+ logWarning("Creating new stage failed due to exception - job: " + jobId, e)
+ listener.jobFailed(e)
+ return false
+ }
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +