diff options
author | Reynold Xin <rxin@apache.org> | 2014-06-30 11:50:22 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-06-30 11:52:30 -0700 |
commit | d468b3d748a2459fb937feb9a6a84ec5f2258dc3 (patch) | |
tree | f0b96ffc75e6e9e31eeb09af596cade020e97961 /core/src/test | |
parent | febec742c82852c736fe09110667e82092f2e5f9 (diff) | |
download | spark-d468b3d748a2459fb937feb9a6a84ec5f2258dc3.tar.gz spark-d468b3d748a2459fb937feb9a6a84ec5f2258dc3.tar.bz2 spark-d468b3d748a2459fb937feb9a6a84ec5f2258dc3.zip |
[SPARK-2322] Exception in resultHandler should NOT crash DAGScheduler and shutdown SparkContext.
This should go into 1.0.1.
Author: Reynold Xin <rxin@apache.org>
Closes #1264 from rxin/SPARK-2322 and squashes the following commits:
c77c07f [Reynold Xin] Added comment to SparkDriverExecutionException and a test case for accumulator.
5d8d920 [Reynold Xin] [SPARK-2322] Exception in resultHandler could crash DAGScheduler and shutdown SparkContext.
(cherry picked from commit 358ae1534d01ad9e69364a21441a7ef23c2cb516)
Signed-off-by: Reynold Xin <rxin@apache.org>
Conflicts:
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 23a350be56..cea0fe11e1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.scheduler -import scala.Tuple2 import scala.collection.mutable.{HashSet, HashMap, Map} import scala.language.reflectiveCalls @@ -37,6 +36,8 @@ class BuggyDAGEventProcessActor extends Actor { } } +class DAGSchedulerSuiteDummyException extends Exception + class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike with ImplicitSender with BeforeAndAfter with LocalSparkContext { @@ -578,6 +579,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + // TODO: Fix this and un-ignore the test. + ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") { + val acc = new Accumulator[Int](0, new AccumulatorParam[Int] { + override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2 + override def zero(initialValue: Int): Int = 0 + override def addInPlace(r1: Int, r2: Int): Int = { + throw new DAGSchedulerSuiteDummyException + } + }) + + // Run this on executors + intercept[SparkDriverExecutionException] { + sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) } + } + + // Run this within a local thread + intercept[SparkDriverExecutionException] { + sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1) + } + + // Make sure we can still run local commands as well as cluster commands. + assert(sc.parallelize(1 to 10, 2).count() === 10) + assert(sc.parallelize(1 to 10, 2).first() === 1) + } + + test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") { + val e1 = intercept[SparkDriverExecutionException] { + val rdd = sc.parallelize(1 to 10, 2) + sc.runJob[Int, Int]( + rdd, + (context: TaskContext, iter: Iterator[Int]) => iter.size, + Seq(0), + allowLocal = true, + (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) + } + assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) + + val e2 = intercept[SparkDriverExecutionException] { + val rdd = sc.parallelize(1 to 10, 2) + sc.runJob[Int, Int]( + rdd, + (context: TaskContext, iter: Iterator[Int]) => iter.size, + Seq(0, 1), + allowLocal = false, + (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) + } + assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) + + // Make sure we can still run local commands as well as cluster commands. + assert(sc.parallelize(1 to 10, 2).count() === 10) + assert(sc.parallelize(1 to 10, 2).first() === 1) + } + test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") { val actorSystem = ActorSystem("test") val supervisor = actorSystem.actorOf( |