aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala25
3 files changed, 37 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ff5d796ee2..6a354ed4d1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -520,10 +520,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Distribute a local Scala collection to form an RDD.
*
- * @note Parallelize acts lazily. If `seq` is a mutable collection and is
- * altered after the call to parallelize and before the first action on the
- * RDD, the resultant RDD will reflect the modified collection. Pass a copy of
- * the argument to avoid this.
+ * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
+ * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
+ * modified collection. Pass a copy of the argument to avoid this.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 87b22de6ae..f12d0cffab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -111,7 +111,8 @@ private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
- * it efficient to run Spark over RDDs representing large sets of numbers.
+ * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
+ * is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
@@ -127,19 +128,15 @@ private object ParallelCollectionRDD {
})
}
seq match {
- case r: Range.Inclusive => {
- val sign = if (r.step < 0) {
- -1
- } else {
- 1
- }
- slice(new Range(
- r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
- }
case r: Range => {
- positions(r.length, numSlices).map({
- case (start, end) =>
+ positions(r.length, numSlices).zipWithIndex.map({ case ((start, end), index) =>
+ // If the range is inclusive, use inclusive range for the last slice
+ if (r.isInclusive && index == numSlices - 1) {
+ new Range.Inclusive(r.start + start * r.step, r.end, r.step)
+ }
+ else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
+ }
}).toSeq.asInstanceOf[Seq[Seq[T]]]
}
case nr: NumericRange[_] => {
diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
index 1b112f1a41..cd193ae4f5 100644
--- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -76,6 +76,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices(0).mkString(",") === (0 to 32).mkString(","))
assert(slices(1).mkString(",") === (33 to 66).mkString(","))
assert(slices(2).mkString(",") === (67 to 100).mkString(","))
+ assert(slices(2).isInstanceOf[Range.Inclusive])
}
test("empty data") {
@@ -227,4 +228,28 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
}
+
+ test("inclusive ranges with Int.MaxValue and Int.MinValue") {
+ val data1 = 1 to Int.MaxValue
+ val slices1 = ParallelCollectionRDD.slice(data1, 3)
+ assert(slices1.size === 3)
+ assert(slices1.map(_.size).sum === Int.MaxValue)
+ assert(slices1(2).isInstanceOf[Range.Inclusive])
+ val data2 = -2 to Int.MinValue by -1
+ val slices2 = ParallelCollectionRDD.slice(data2, 3)
+ assert(slices2.size == 3)
+ assert(slices2.map(_.size).sum === Int.MaxValue)
+ assert(slices2(2).isInstanceOf[Range.Inclusive])
+ }
+
+ test("empty ranges with Int.MaxValue and Int.MinValue") {
+ val data1 = Int.MaxValue until Int.MaxValue
+ val slices1 = ParallelCollectionRDD.slice(data1, 5)
+ assert(slices1.size === 5)
+ for (i <- 0 until 5) assert(slices1(i).size === 0)
+ val data2 = Int.MaxValue until Int.MaxValue
+ val slices2 = ParallelCollectionRDD.slice(data2, 5)
+ assert(slices2.size === 5)
+ for (i <- 0 until 5) assert(slices2(i).size === 0)
+ }
}