diff options
author | Reynold Xin <rxin@apache.org> | 2014-06-30 11:50:22 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-30 11:50:22 -0700 |
commit | 358ae1534d01ad9e69364a21441a7ef23c2cb516 (patch) | |
tree | 1bb1c51a40d666c6f68fb6d5abf2d976db03a1b9 /core/src/test/scala | |
parent | 6803642253b45c80ef8970606e350adabe4211ea (diff) | |
download | spark-358ae1534d01ad9e69364a21441a7ef23c2cb516.tar.gz spark-358ae1534d01ad9e69364a21441a7ef23c2cb516.tar.bz2 spark-358ae1534d01ad9e69364a21441a7ef23c2cb516.zip |
[SPARK-2322] Exception in resultHandler should NOT crash DAGScheduler and shutdown SparkContext.
This should go into 1.0.1.
Author: Reynold Xin <rxin@apache.org>
Closes #1264 from rxin/SPARK-2322 and squashes the following commits:
c77c07f [Reynold Xin] Added comment to SparkDriverExecutionException and a test case for accumulator.
5d8d920 [Reynold Xin] [SPARK-2322] Exception in resultHandler could crash DAGScheduler and shutdown SparkContext.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8dd2a9b9f7..9f498d579a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import scala.Tuple2 import scala.collection.mutable.{HashSet, HashMap, Map} import scala.language.reflectiveCalls @@ -38,6 +37,8 @@ class BuggyDAGEventProcessActor extends Actor { } } +class DAGSchedulerSuiteDummyException extends Exception + class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike with ImplicitSender with BeforeAndAfter with LocalSparkContext { @@ -593,6 +594,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + // TODO: Fix this and un-ignore the test. + ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") { + val acc = new Accumulator[Int](0, new AccumulatorParam[Int] { + override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2 + override def zero(initialValue: Int): Int = 0 + override def addInPlace(r1: Int, r2: Int): Int = { + throw new DAGSchedulerSuiteDummyException + } + }) + + // Run this on executors + intercept[SparkDriverExecutionException] { + sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } + } + + // Run this within a local thread + intercept[SparkDriverExecutionException] { + sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1) + } + + // Make sure we can still run local commands as well as cluster commands. + assert(sc.parallelize(1 to 10, 2).count() === 10) + assert(sc.parallelize(1 to 10, 2).first() === 1) + } + + test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") { + val e1 = intercept[SparkDriverExecutionException] { + val rdd = sc.parallelize(1 to 10, 2) + sc.runJob[Int, Int]( + rdd, + (context: TaskContext, iter: Iterator[Int]) => iter.size, + Seq(0), + allowLocal = true, + (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) + } + assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) + + val e2 = intercept[SparkDriverExecutionException] { + val rdd = sc.parallelize(1 to 10, 2) + sc.runJob[Int, Int]( + rdd, + (context: TaskContext, iter: Iterator[Int]) => iter.size, + Seq(0, 1), + allowLocal = false, + (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) + } + assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) + + // Make sure we can still run local commands as well as cluster commands. + assert(sc.parallelize(1 to 10, 2).count() === 10) + assert(sc.parallelize(1 to 10, 2).first() === 1) + } + test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") { val actorSystem = ActorSystem("test") val supervisor = actorSystem.actorOf( |