aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 21:30:52 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 21:30:52 -0700
commit143ef4f90da22537114b8b658d6419a34f16ce64 (patch)
tree077a0639afd4594c65a98b65f9c76a25b832dcc5 /core/src/test/scala
parentc45758ddde882dc8aa537c2bf770e51087f97a4d (diff)
downloadspark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.gz
spark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.bz2
spark-143ef4f90da22537114b8b658d6419a34f16ce64.zip
Added a CoalescedRDD class for reducing the number of partitions in an RDD.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala31
1 files changed, 31 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 04dbe3a3e4..961d05bc82 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -71,4 +71,35 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint()
assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
}
+
+ test("coalesced RDDs") {
+ sc = new SparkContext("local", "test")
+ val data = sc.parallelize(1 to 10, 10)
+
+ val coalesced1 = new CoalescedRDD(data, 2)
+ assert(coalesced1.collect().toList === (1 to 10).toList)
+ assert(coalesced1.glom().collect().map(_.toList).toList ===
+ List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
+
+ // Check that the narrow dependency is also specified correctly
+ assert(coalesced1.dependencies.head.getParents(0).toList === List(0, 1, 2, 3, 4))
+ assert(coalesced1.dependencies.head.getParents(1).toList === List(5, 6, 7, 8, 9))
+
+ val coalesced2 = new CoalescedRDD(data, 3)
+ assert(coalesced2.collect().toList === (1 to 10).toList)
+ assert(coalesced2.glom().collect().map(_.toList).toList ===
+ List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
+
+ val coalesced3 = new CoalescedRDD(data, 10)
+ assert(coalesced3.collect().toList === (1 to 10).toList)
+ assert(coalesced3.glom().collect().map(_.toList).toList ===
+ (1 to 10).map(x => List(x)).toList)
+
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = new CoalescedRDD(data, 20)
+ assert(coalesced4.collect().toList === (1 to 10).toList)
+ assert(coalesced4.glom().collect().map(_.toList).toList ===
+ (1 to 10).map(x => List(x)).toList)
+ }
}