diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 6 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala | 16 |
2 files changed, 22 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 745e3fa4e8..852ed8fe1f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -852,6 +852,9 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) @@ -955,6 +958,9 @@ class SparkContext( resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] = { + partitions.foreach{ p => + require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p") + } val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 85e8eb5dc3..f9e994b13d 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -373,6 +373,22 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(shuffled.lookup(5) === Seq(6,7)) assert(shuffled.lookup(-1) === Seq()) } + + test("lookup with bad partitioner") { + val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + + val p = new Partitioner { + def numPartitions: Int = 2 + + def getPartition(key: Any): Int = key.hashCode() % 2 + } + val shuffled = pairs.partitionBy(p) + + assert(shuffled.partitioner === Some(p)) + assert(shuffled.lookup(1) === Seq(2)) + intercept[IllegalArgumentException] {shuffled.lookup(-1)} + } + } /* |