aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 21:30:52 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 21:30:52 -0700
commit143ef4f90da22537114b8b658d6419a34f16ce64 (patch)
tree077a0639afd4594c65a98b65f9c76a25b832dcc5 /core/src
parentc45758ddde882dc8aa537c2bf770e51087f97a4d (diff)
downloadspark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.gz
spark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.bz2
spark-143ef4f90da22537114b8b658d6419a34f16ce64.zip
Added a CoalescedRDD class for reducing the number of partitions in an RDD.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/CoalescedRDD.scala44
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala31
2 files changed, 75 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/CoalescedRDD.scala b/core/src/main/scala/spark/CoalescedRDD.scala
new file mode 100644
index 0000000000..a96f749543
--- /dev/null
+++ b/core/src/main/scala/spark/CoalescedRDD.scala
@@ -0,0 +1,44 @@
+package spark
+
+private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split
+
+/**
+ * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
+ * this RDD computes zero or more of the parent ones. Will produce exactly `maxPartitions` if the
+ * parent had more than this many partitions, or fewer if the parent had fewer.
+ *
+ * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
+ * or to avoid having a large number of small tasks when processing a directory with many files.
+ */
+class CoalescedRDD[T: ClassManifest](prev: RDD[T], maxPartitions: Int)
+ extends RDD[T](prev.context)
+ with Logging {
+
+ @transient val splits_ : Array[Split] = {
+ val prevSplits = prev.splits
+ if (prevSplits.length < maxPartitions) {
+ prevSplits.zipWithIndex.map{ case (s, idx) => new CoalescedRDDSplit(idx, Array(s)) }
+ } else {
+ (0 until maxPartitions).map { i =>
+ val rangeStart = (i * prevSplits.length) / maxPartitions
+ val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions
+ new CoalescedRDDSplit(i, prevSplits.slice(rangeStart, rangeEnd))
+ }.toArray
+ }
+ }
+
+ override def splits = splits_
+
+ override def compute(split: Split): Iterator[T] = {
+ split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap {
+ parentSplit => prev.iterator(parentSplit)
+ }
+ }
+
+ val dependencies = List(
+ new NarrowDependency(prev) {
+ def getParents(id: Int): Seq[Int] =
+ splits(id).asInstanceOf[CoalescedRDDSplit].parents.map(_.index)
+ }
+ )
+}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 04dbe3a3e4..961d05bc82 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -71,4 +71,35 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint()
assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
}
+
+ test("coalesced RDDs") {
+ sc = new SparkContext("local", "test")
+ val data = sc.parallelize(1 to 10, 10)
+
+ val coalesced1 = new CoalescedRDD(data, 2)
+ assert(coalesced1.collect().toList === (1 to 10).toList)
+ assert(coalesced1.glom().collect().map(_.toList).toList ===
+ List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
+
+ // Check that the narrow dependency is also specified correctly
+ assert(coalesced1.dependencies.head.getParents(0).toList === List(0, 1, 2, 3, 4))
+ assert(coalesced1.dependencies.head.getParents(1).toList === List(5, 6, 7, 8, 9))
+
+ val coalesced2 = new CoalescedRDD(data, 3)
+ assert(coalesced2.collect().toList === (1 to 10).toList)
+ assert(coalesced2.glom().collect().map(_.toList).toList ===
+ List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
+
+ val coalesced3 = new CoalescedRDD(data, 10)
+ assert(coalesced3.collect().toList === (1 to 10).toList)
+ assert(coalesced3.glom().collect().map(_.toList).toList ===
+ (1 to 10).map(x => List(x)).toList)
+
+ // If we try to coalesce into more partitions than the original RDD, it should just
+ // keep the original number of partitions.
+ val coalesced4 = new CoalescedRDD(data, 20)
+ assert(coalesced4.collect().toList === (1 to 10).toList)
+ assert(coalesced4.glom().collect().map(_.toList).toList ===
+ (1 to 10).map(x => List(x)).toList)
+ }
}