aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-06-27 14:40:45 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-27 14:41:03 -0700
commite1bbf1a080296e4d3d692379f06b7db0d0629573 (patch)
treeb68e0a5e2813bd2a767c1a5f72716ee63a15a6a3
parenta2dbb4807136b3c66ffd353340a54ad704c6f99e (diff)
downloadspark-e1bbf1a080296e4d3d692379f06b7db0d0629573.tar.gz
spark-e1bbf1a080296e4d3d692379f06b7db0d0629573.tar.bz2
spark-e1bbf1a080296e4d3d692379f06b7db0d0629573.zip
[SPARK-8606] Prevent exceptions in RDD.getPreferredLocations() from crashing DAGScheduler
If `RDD.getPreferredLocations()` throws an exception it may crash the DAGScheduler and SparkContext. This patch addresses this by adding a try-catch block. Author: Josh Rosen <joshrosen@databricks.com> Closes #7023 from JoshRosen/SPARK-8606 and squashes the following commits: 770b169 [Josh Rosen] Fix getPreferredLocations() DAGScheduler crash with try block. 44a9b55 [Josh Rosen] Add test of a buggy getPartitions() method 19aa9f7 [Josh Rosen] Add (failing) regression test for getPreferredLocations() DAGScheduler crash (cherry picked from commit 0b5abbf5f96a5f6bfd15a65e8788cf3fa96fe54c) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala37
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala31
2 files changed, 53 insertions, 15 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7fc1f0a752..fd20ebbaf0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -886,22 +886,29 @@ class DAGScheduler(
return
}
- val tasks: Seq[Task[_]] = stage match {
- case stage: ShuffleMapStage =>
- partitionsToCompute.map { id =>
- val locs = getPreferredLocs(stage.rdd, id)
- val part = stage.rdd.partitions(id)
- new ShuffleMapTask(stage.id, taskBinary, part, locs)
- }
+ val tasks: Seq[Task[_]] = try {
+ stage match {
+ case stage: ShuffleMapStage =>
+ partitionsToCompute.map { id =>
+ val locs = getPreferredLocs(stage.rdd, id)
+ val part = stage.rdd.partitions(id)
+ new ShuffleMapTask(stage.id, taskBinary, part, locs)
+ }
- case stage: ResultStage =>
- val job = stage.resultOfJob.get
- partitionsToCompute.map { id =>
- val p: Int = job.partitions(id)
- val part = stage.rdd.partitions(p)
- val locs = getPreferredLocs(stage.rdd, p)
- new ResultTask(stage.id, taskBinary, part, locs, id)
- }
+ case stage: ResultStage =>
+ val job = stage.resultOfJob.get
+ partitionsToCompute.map { id =>
+ val p: Int = job.partitions(id)
+ val part = stage.rdd.partitions(p)
+ val locs = getPreferredLocs(stage.rdd, p)
+ new ResultTask(stage.id, taskBinary, part, locs, id)
+ }
+ }
+ } catch {
+ case NonFatal(e) =>
+ abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
+ runningStages -= stage
+ return
}
if (tasks.size > 0) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index b2ecbb232e..a13db529a4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -757,6 +757,37 @@ class DAGSchedulerSuite
assert(sc.parallelize(1 to 10, 2).first() === 1)
}
+ test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
+ val e1 = intercept[DAGSchedulerSuiteDummyException] {
+ val rdd = new MyRDD(sc, 2, Nil) {
+ override def getPartitions: Array[Partition] = {
+ throw new DAGSchedulerSuiteDummyException
+ }
+ }
+ rdd.reduceByKey(_ + _, 1).count()
+ }
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
+ test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
+ val e1 = intercept[SparkException] {
+ val rdd = new MyRDD(sc, 2, Nil) {
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ throw new DAGSchedulerSuiteDummyException
+ }
+ }
+ rdd.count()
+ }
+ assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
test("accumulator not calculated for resubmitted result stage") {
// just for register
val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)