aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala14
2 files changed, 19 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e24a3..c70aa0e6e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.io.NotSerializableException
+import java.io.{NotSerializableException, PrintWriter, StringWriter}
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
@@ -580,6 +580,10 @@ class DAGScheduler(
case e: Exception =>
jobResult = JobFailed(e)
job.listener.jobFailed(e)
+ case oom: OutOfMemoryError =>
+ val exception = new SparkException("job failed for Out of memory exception", oom)
+ jobResult = JobFailed(exception)
+ job.listener.jobFailed(exception)
} finally {
val s = job.finalStage
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d172dd1ac8..81e64c1846 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("local job oom") {
+ val rdd = new MyRDD(sc, Nil) {
+ override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+ throw new java.lang.OutOfMemoryError("test local job oom")
+ override def getPartitions = Array( new Partition { override def index = 0 } )
+ override def getPreferredLocations(split: Partition) = Nil
+ override def toString = "DAGSchedulerSuite Local RDD"
+ }
+ val jobId = scheduler.nextJobId.getAndIncrement()
+ runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+ assert(results.size == 0)
+ assertDataStructuresEmpty
+ }
+
test("run trivial job w/ dependency") {
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))