aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorshellberg <sah@zepler.org>2015-10-15 18:07:10 +0100
committerSean Owen <sowen@cloudera.com>2015-10-15 18:07:10 +0100
commit523adc24a683930304f408d477607edfe9de7b76 (patch)
treedcd2d6d9825842ef76215f8a9db6a327613a55b7 /core
parentaec4400beffc569c13cceea2d0c481dfa3f34175 (diff)
downloadspark-523adc24a683930304f408d477607edfe9de7b76.tar.gz
spark-523adc24a683930304f408d477607edfe9de7b76.tar.bz2
spark-523adc24a683930304f408d477607edfe9de7b76.zip
[SPARK-11066] Update DAGScheduler's "misbehaved ResultHandler"
Restrict tasks (of job) to only 1 to ensure that the causing Exception asserted for job failure is the deliberately thrown DAGSchedulerSuiteDummyException intended, not an UnsupportedOperationException from any second/subsequent tasks that can propagate from a race condition during code execution. Author: shellberg <sah@zepler.org> Closes #9076 from shellberg/shellberg-DAGSchedulerSuite-misbehavedResultHandlerTest-patch-1.
Diffstat (limited to 'core')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala13
1 files changed, 11 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 697c195e4a..5b01ddb298 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1375,18 +1375,27 @@ class DAGSchedulerSuite
assert(sc.parallelize(1 to 10, 2).count() === 10)
}
+ /**
+ * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
+ * Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.
+ * If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions
+ * and their differing causes as to which will represent result for job...
+ */
test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
val e = intercept[SparkDriverExecutionException] {
+ // Number of parallelized partitions implies number of tasks of job
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
- Seq(0, 1),
+ // For a robust test assertion, limit number of job tasks to 1; that is,
+ // if multiple RDD partitions, use id of any one partition, say, first partition id=0
+ Seq(0),
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
- // Make sure we can still run commands
+ // Make sure we can still run commands on our SparkContext
assert(sc.parallelize(1 to 10, 2).count() === 10)
}