aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-30 11:50:22 -0700
committerReynold Xin <rxin@apache.org>2014-06-30 11:50:22 -0700
commit358ae1534d01ad9e69364a21441a7ef23c2cb516 (patch)
tree1bb1c51a40d666c6f68fb6d5abf2d976db03a1b9 /core/src/test
parent6803642253b45c80ef8970606e350adabe4211ea (diff)
downloadspark-358ae1534d01ad9e69364a21441a7ef23c2cb516.tar.gz
spark-358ae1534d01ad9e69364a21441a7ef23c2cb516.tar.bz2
spark-358ae1534d01ad9e69364a21441a7ef23c2cb516.zip
[SPARK-2322] Exception in resultHandler should NOT crash DAGScheduler and shutdown SparkContext.
This should go into 1.0.1. Author: Reynold Xin <rxin@apache.org> Closes #1264 from rxin/SPARK-2322 and squashes the following commits: c77c07f [Reynold Xin] Added comment to SparkDriverExecutionException and a test case for accumulator. 5d8d920 [Reynold Xin] [SPARK-2322] Exception in resultHandler could crash DAGScheduler and shutdown SparkContext.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala56
1 files changed, 55 insertions, 1 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8dd2a9b9f7..9f498d579a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import scala.Tuple2
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.language.reflectiveCalls
@@ -38,6 +37,8 @@ class BuggyDAGEventProcessActor extends Actor {
}
}
+class DAGSchedulerSuiteDummyException extends Exception
+
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
with ImplicitSender with BeforeAndAfter with LocalSparkContext {
@@ -593,6 +594,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
+ // TODO: Fix this and un-ignore the test.
+ ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
+ val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
+ override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
+ override def zero(initialValue: Int): Int = 0
+ override def addInPlace(r1: Int, r2: Int): Int = {
+ throw new DAGSchedulerSuiteDummyException
+ }
+ })
+
+ // Run this on executors
+ intercept[SparkDriverExecutionException] {
+ sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
+ }
+
+ // Run this within a local thread
+ intercept[SparkDriverExecutionException] {
+ sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
+ }
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
+ test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
+ val e1 = intercept[SparkDriverExecutionException] {
+ val rdd = sc.parallelize(1 to 10, 2)
+ sc.runJob[Int, Int](
+ rdd,
+ (context: TaskContext, iter: Iterator[Int]) => iter.size,
+ Seq(0),
+ allowLocal = true,
+ (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+ }
+ assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+ val e2 = intercept[SparkDriverExecutionException] {
+ val rdd = sc.parallelize(1 to 10, 2)
+ sc.runJob[Int, Int](
+ rdd,
+ (context: TaskContext, iter: Iterator[Int]) => iter.size,
+ Seq(0, 1),
+ allowLocal = false,
+ (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+ }
+ assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+ // Make sure we can still run local commands as well as cluster commands.
+ assert(sc.parallelize(1 to 10, 2).count() === 10)
+ assert(sc.parallelize(1 to 10, 2).first() === 1)
+ }
+
test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") {
val actorSystem = ActorSystem("test")
val supervisor = actorSystem.actorOf(