aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSundeep Narravula <sundeepn@yahoo-inc.com>2013-11-30 16:18:12 -0800
committerSundeep Narravula <sundeepn@yahoo-inc.com>2013-11-30 16:18:12 -0800
commit4d53830eb79174cfd9641f6342727bc980d5c3e0 (patch)
tree8a5a0c804d4ce7621c4de0d899ec73062875bafd /core/src
parent743a31a7ca4421cbbd5b615b773997a06a7ab4ee (diff)
downloadspark-4d53830eb79174cfd9641f6342727bc980d5c3e0.tar.gz
spark-4d53830eb79174cfd9641f6342727bc980d5c3e0.tar.bz2
spark-4d53830eb79174cfd9641f6342727bc980d5c3e0.zip
Scheduler quits when createStage fails.
The current scheduler thread does not handle exceptions from createStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala10
1 files changed, 9 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4457525ac8..f6a4482679 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -384,7 +384,15 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
- val finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
+ var finalStage:Stage = null
+ try {
+ finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
+ } catch {
+ case e: Exception =>
+ logWarning("Creating new stage failed due to exception - job: " + jobId )
+ listener.jobFailed(e)
+ return false
+ }
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +