aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 21:30:52 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 21:30:52 -0700
commit143ef4f90da22537114b8b658d6419a34f16ce64 (patch)
tree077a0639afd4594c65a98b65f9c76a25b832dcc5 /core/src/main
parentc45758ddde882dc8aa537c2bf770e51087f97a4d (diff)
downloadspark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.gz
spark-143ef4f90da22537114b8b658d6419a34f16ce64.tar.bz2
spark-143ef4f90da22537114b8b658d6419a34f16ce64.zip
Added a CoalescedRDD class for reducing the number of partitions in an RDD.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/CoalescedRDD.scala44
1 files changed, 44 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/CoalescedRDD.scala b/core/src/main/scala/spark/CoalescedRDD.scala
new file mode 100644
index 0000000000..a96f749543
--- /dev/null
+++ b/core/src/main/scala/spark/CoalescedRDD.scala
@@ -0,0 +1,44 @@
+package spark
+
+private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split
+
+/**
+ * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
+ * this RDD computes zero or more of the parent ones. Will produce exactly `maxPartitions` if the
+ * parent had more than this many partitions, or fewer if the parent had fewer.
+ *
+ * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
+ * or to avoid having a large number of small tasks when processing a directory with many files.
+ */
+class CoalescedRDD[T: ClassManifest](prev: RDD[T], maxPartitions: Int)
+ extends RDD[T](prev.context)
+ with Logging {
+
+ @transient val splits_ : Array[Split] = {
+ val prevSplits = prev.splits
+ if (prevSplits.length < maxPartitions) {
+ prevSplits.zipWithIndex.map{ case (s, idx) => new CoalescedRDDSplit(idx, Array(s)) }
+ } else {
+ (0 until maxPartitions).map { i =>
+ val rangeStart = (i * prevSplits.length) / maxPartitions
+ val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions
+ new CoalescedRDDSplit(i, prevSplits.slice(rangeStart, rangeEnd))
+ }.toArray
+ }
+ }
+
+ override def splits = splits_
+
+ override def compute(split: Split): Iterator[T] = {
+ split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap {
+ parentSplit => prev.iterator(parentSplit)
+ }
+ }
+
+ val dependencies = List(
+ new NarrowDependency(prev) {
+ def getParents(id: Int): Seq[Int] =
+ splits(id).asInstanceOf[CoalescedRDDSplit].parents.map(_.index)
+ }
+ )
+}