diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-29 21:30:52 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-09-29 21:30:52 -0700 |
commit | 143ef4f90da22537114b8b658d6419a34f16ce64 (patch) | |
tree | 077a0639afd4594c65a98b65f9c76a25b832dcc5 /core/src/test/scala | |
parent | c45758ddde882dc8aa537c2bf770e51087f97a4d (diff) | |
download | spark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.gz spark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.bz2 spark-143ef4f90da22537114b8b658d6419a34f16ce64.zip |
Added a CoalescedRDD class for reducing the number of partitions in an RDD.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 04dbe3a3e4..961d05bc82 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -71,4 +71,35 @@ class RDDSuite extends FunSuite with BeforeAndAfter { val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint() assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)) } + + test("coalesced RDDs") { + sc = new SparkContext("local", "test") + val data = sc.parallelize(1 to 10, 10) + + val coalesced1 = new CoalescedRDD(data, 2) + assert(coalesced1.collect().toList === (1 to 10).toList) + assert(coalesced1.glom().collect().map(_.toList).toList === + List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))) + + // Check that the narrow dependency is also specified correctly + assert(coalesced1.dependencies.head.getParents(0).toList === List(0, 1, 2, 3, 4)) + assert(coalesced1.dependencies.head.getParents(1).toList === List(5, 6, 7, 8, 9)) + + val coalesced2 = new CoalescedRDD(data, 3) + assert(coalesced2.collect().toList === (1 to 10).toList) + assert(coalesced2.glom().collect().map(_.toList).toList === + List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10))) + + val coalesced3 = new CoalescedRDD(data, 10) + assert(coalesced3.collect().toList === (1 to 10).toList) + assert(coalesced3.glom().collect().map(_.toList).toList === + (1 to 10).map(x => List(x)).toList) + + // If we try to coalesce into more partitions than the original RDD, it should just + // keep the original number of partitions. + val coalesced4 = new CoalescedRDD(data, 20) + assert(coalesced4.collect().toList === (1 to 10).toList) + assert(coalesced4.glom().collect().map(_.toList).toList === + (1 to 10).map(x => List(x)).toList) + } } |