aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-07-15 22:50:11 -0700
committerReynold Xin <reynoldx@gmail.com>2013-07-15 22:50:11 -0700
commit69316603d6bf11ecf1ea3dab63df178bad835e2d (patch)
treee5934f7c1e485870b01ab764a26be2ed2a30bbd8 /core/src/main
parented8415b2fa08c24dab9b435dd388d29416f45527 (diff)
downloadspark-69316603d6bf11ecf1ea3dab63df178bad835e2d.tar.gz
spark-69316603d6bf11ecf1ea3dab63df178bad835e2d.tar.bz2
spark-69316603d6bf11ecf1ea3dab63df178bad835e2d.zip
Throw a more meaningful message when runJob is called to launch tasks on non-existent partitions.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 3d3b9ea011..8173ef709d 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -251,6 +251,15 @@ class DAGScheduler(
if (partitions.size == 0) {
return
}
+
+ // Check to make sure we are not launching a task on a partition that does not exist.
+ val maxPartitions = finalRdd.partitions.length
+ partitions.find(p => p >= maxPartitions).foreach { p =>
+ throw new IllegalArgumentException(
+ "Attempting to access a non-existent partition: " + p + ". " +
+ "Total number of partitions: " + maxPartitions)
+ }
+
val (toSubmit, waiter) = prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler, properties)
eventQueue.put(toSubmit)