aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorZhen Peng <zhenpeng01@baidu.com>2014-05-26 21:30:25 -0700
committerReynold Xin <rxin@apache.org>2014-05-26 21:30:25 -0700
commit8d271c90fa496cb24e2b7362ef0497563591b97d (patch)
tree15b0d7d53c4c15191d4811d9420e683b0d2e641b /core
parent56c771cb2d00a5843c391ae6561536ee46e535d4 (diff)
downloadspark-8d271c90fa496cb24e2b7362ef0497563591b97d.tar.gz
spark-8d271c90fa496cb24e2b7362ef0497563591b97d.tar.bz2
spark-8d271c90fa496cb24e2b7362ef0497563591b97d.zip
SPARK-1929 DAGScheduler suspended by local task OOM
DAGScheduler does not handle local task OOM properly, and will wait for the job result forever. Author: Zhen Peng <zhenpeng01@baidu.com> Closes #883 from zhpengg/bugfix-dag-scheduler-oom and squashes the following commits: 76f7eda [Zhen Peng] remove redundant memory allocations aa63161 [Zhen Peng] SPARK-1929 DAGScheduler suspended by local task OOM
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala14
2 files changed, 19 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ff411e24a3..c70aa0e6e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -17,7 +17,7 @@
package org.apache.spark.scheduler
-import java.io.NotSerializableException
+import java.io.{NotSerializableException, PrintWriter, StringWriter}
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
@@ -580,6 +580,10 @@ class DAGScheduler(
case e: Exception =>
jobResult = JobFailed(e)
job.listener.jobFailed(e)
+ case oom: OutOfMemoryError =>
+ val exception = new SparkException("job failed for Out of memory exception", oom)
+ jobResult = JobFailed(exception)
+ job.listener.jobFailed(exception)
} finally {
val s = job.finalStage
stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d172dd1ac8..81e64c1846 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -256,6 +256,20 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ test("local job oom") {
+ val rdd = new MyRDD(sc, Nil) {
+ override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+ throw new java.lang.OutOfMemoryError("test local job oom")
+ override def getPartitions = Array( new Partition { override def index = 0 } )
+ override def getPreferredLocations(split: Partition) = Nil
+ override def toString = "DAGSchedulerSuite Local RDD"
+ }
+ val jobId = scheduler.nextJobId.getAndIncrement()
+ runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+ assert(results.size == 0)
+ assertDataStructuresEmpty
+ }
+
test("run trivial job w/ dependency") {
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))