aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala16
2 files changed, 22 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 745e3fa4e8..852ed8fe1f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -852,6 +852,9 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
+ partitions.foreach{ p =>
+ require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
+ }
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
@@ -955,6 +958,9 @@ class SparkContext(
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
+ partitions.foreach{ p =>
+ require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
+ }
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 85e8eb5dc3..f9e994b13d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -373,6 +373,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
assert(shuffled.lookup(5) === Seq(6,7))
assert(shuffled.lookup(-1) === Seq())
}
+
+ test("lookup with bad partitioner") {
+ val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
+
+ val p = new Partitioner {
+ def numPartitions: Int = 2
+
+ def getPartition(key: Any): Int = key.hashCode() % 2
+ }
+ val shuffled = pairs.partitionBy(p)
+
+ assert(shuffled.partitioner === Some(p))
+ assert(shuffled.lookup(1) === Seq(2))
+ intercept[IllegalArgumentException] {shuffled.lookup(-1)}
+ }
+
}
/*