aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala13
1 files changed, 11 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 697c195e4a..5b01ddb298 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1375,18 +1375,27 @@ class DAGSchedulerSuite
assert(sc.parallelize(1 to 10, 2).count() === 10)
}
+ /**
+ * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
+ * Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.
+ * If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions
+ * and their differing causes as to which will represent result for job...
+ */
test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
val e = intercept[SparkDriverExecutionException] {
+ // Number of parallelized partitions implies number of tasks of job
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
- Seq(0, 1),
+ // For a robust test assertion, limit number of job tasks to 1; that is,
+ // if multiple RDD partitions, use id of any one partition, say, first partition id=0
+ Seq(0),
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
- // Make sure we can still run commands
+ // Make sure we can still run commands on our SparkContext
assert(sc.parallelize(1 to 10, 2).count() === 10)
}