diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 697c195e4a..5b01ddb298 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1375,18 +1375,27 @@ class DAGSchedulerSuite assert(sc.parallelize(1 to 10, 2).count() === 10) } + /** + * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException. + * Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException. + * If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions + * and their differing causes as to which will represent result for job... + */ test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") { val e = intercept[SparkDriverExecutionException] { + // Number of parallelized partitions implies number of tasks of job val rdd = sc.parallelize(1 to 10, 2) sc.runJob[Int, Int]( rdd, (context: TaskContext, iter: Iterator[Int]) => iter.size, - Seq(0, 1), + // For a robust test assertion, limit number of job tasks to 1; that is, + // if multiple RDD partitions, use id of any one partition, say, first partition id=0 + Seq(0), (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) } assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) - // Make sure we can still run commands + // Make sure we can still run commands on our SparkContext assert(sc.parallelize(1 to 10, 2).count() === 10) } |