diff options
author | shellberg <sah@zepler.org> | 2015-10-15 18:07:10 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-10-15 18:07:10 +0100 |
commit | 523adc24a683930304f408d477607edfe9de7b76 (patch) | |
tree | dcd2d6d9825842ef76215f8a9db6a327613a55b7 /core | |
parent | aec4400beffc569c13cceea2d0c481dfa3f34175 (diff) | |
download | spark-523adc24a683930304f408d477607edfe9de7b76.tar.gz spark-523adc24a683930304f408d477607edfe9de7b76.tar.bz2 spark-523adc24a683930304f408d477607edfe9de7b76.zip |
[SPARK-11066] Update DAGScheduler's "misbehaved ResultHandler"
Restrict tasks (of job) to only 1 to ensure that the causing Exception asserted for job failure is the deliberately thrown DAGSchedulerSuiteDummyException intended, not an UnsupportedOperationException from any second/subsequent tasks that can propagate from a race condition during code execution.
Author: shellberg <sah@zepler.org>
Closes #9076 from shellberg/shellberg-DAGSchedulerSuite-misbehavedResultHandlerTest-patch-1.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 697c195e4a..5b01ddb298 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1375,18 +1375,27 @@ class DAGSchedulerSuite assert(sc.parallelize(1 to 10, 2).count() === 10) } + /** + * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException. + * Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException. + * If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions + * and their differing causes as to which will represent result for job... + */ test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") { val e = intercept[SparkDriverExecutionException] { + // Number of parallelized partitions implies number of tasks of job val rdd = sc.parallelize(1 to 10, 2) sc.runJob[Int, Int]( rdd, (context: TaskContext, iter: Iterator[Int]) => iter.size, - Seq(0, 1), + // For a robust test assertion, limit number of job tasks to 1; that is, + // if multiple RDD partitions, use id of any one partition, say, first partition id=0 + Seq(0), (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) } assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) - // Make sure we can still run commands + // Make sure we can still run commands on our SparkContext assert(sc.parallelize(1 to 10, 2).count() === 10) } |