aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/ShuffledRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/rdd/ShuffledRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala125
1 files changed, 10 insertions, 115 deletions
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index be120acc71..f832633646 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,30 +1,23 @@
package spark.rdd
-import scala.collection.mutable.ArrayBuffer
-import java.util.{HashMap => JHashMap}
+import spark.{OneToOneDependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Split, TaskContext}
-import spark.Aggregator
-import spark.Partitioner
-import spark.RangePartitioner
-import spark.RDD
-import spark.ShuffleDependency
-import spark.SparkEnv
-import spark.Split
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
override def hashCode(): Int = idx
}
-
/**
* The resulting RDD from a shuffle (e.g. repartitioning of data).
+ * @param parent the parent RDD.
+ * @param part the partitioner used to partition the RDD
+ * @tparam K the key class.
+ * @tparam V the value class.
*/
-abstract class ShuffledRDD[K, V, C](
+class ShuffledRDD[K, V](
@transient parent: RDD[(K, V)],
- aggregator: Option[Aggregator[K, V, C]],
- part: Partitioner)
- extends RDD[(K, C)](parent.context) {
+ part: Partitioner) extends RDD[(K, V)](parent.context) {
override val partitioner = Some(part)
@@ -35,108 +28,10 @@ abstract class ShuffledRDD[K, V, C](
override def preferredLocations(split: Split) = Nil
- val dep = new ShuffleDependency(parent, aggregator, part)
+ val dep = new ShuffleDependency(parent, part)
override val dependencies = List(dep)
-}
-
-
-/**
- * Repartition a key-value pair RDD.
- */
-class RepartitionShuffledRDD[K, V](
- @transient parent: RDD[(K, V)],
- part: Partitioner)
- extends ShuffledRDD[K, V, V](
- parent,
- None,
- part) {
-
- override def compute(split: Split): Iterator[(K, V)] = {
- val buf = new ArrayBuffer[(K, V)]
- val fetcher = SparkEnv.get.shuffleFetcher
- def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
- fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- buf.iterator
- }
-}
-
-
-/**
- * A sort-based shuffle (that doesn't apply aggregation). It does so by first
- * repartitioning the RDD by range, and then sort within each range.
- */
-class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
- @transient parent: RDD[(K, V)],
- ascending: Boolean,
- numSplits: Int)
- extends RepartitionShuffledRDD[K, V](
- parent,
- new RangePartitioner(numSplits, parent, ascending)) {
-
- override def compute(split: Split): Iterator[(K, V)] = {
- // By separating this from RepartitionShuffledRDD, we avoided a
- // buf.iterator.toArray call, thus avoiding building up the buffer twice.
- val buf = new ArrayBuffer[(K, V)]
- def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) }
- SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- if (ascending) {
- buf.sortWith((x, y) => x._1 < y._1).iterator
- } else {
- buf.sortWith((x, y) => x._1 > y._1).iterator
- }
- }
-}
-
-
-/**
- * The resulting RDD from shuffle and running (hash-based) aggregation.
- */
-class ShuffledAggregatedRDD[K, V, C](
- @transient parent: RDD[(K, V)],
- aggregator: Aggregator[K, V, C],
- part : Partitioner)
- extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) {
-
- override def compute(split: Split): Iterator[(K, C)] = {
- val combiners = new JHashMap[K, C]
- val fetcher = SparkEnv.get.shuffleFetcher
-
- if (aggregator.mapSideCombine) {
- // Apply combiners on map partitions. In this case, post-shuffle we get a
- // list of outputs from the combiners and merge them using mergeCombiners.
- def mergePairWithMapSideCombiners(k: K, c: C) {
- val oldC = combiners.get(k)
- if (oldC == null) {
- combiners.put(k, c)
- } else {
- combiners.put(k, aggregator.mergeCombiners(oldC, c))
- }
- }
- fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners)
- } else {
- // Do not apply combiners on map partitions (i.e. map side aggregation is
- // turned off). Post-shuffle we get a list of values and we use mergeValue
- // to merge them.
- def mergePairWithoutMapSideCombiners(k: K, v: V) {
- val oldC = combiners.get(k)
- if (oldC == null) {
- combiners.put(k, aggregator.createCombiner(v))
- } else {
- combiners.put(k, aggregator.mergeValue(oldC, v))
- }
- }
- fetcher.fetch[K, V](dep.shuffleId, split.index, mergePairWithoutMapSideCombiners)
- }
-
- return new Iterator[(K, C)] {
- var iter = combiners.entrySet().iterator()
-
- def hasNext: Boolean = iter.hasNext()
- def next(): (K, C) = {
- val entry = iter.next()
- (entry.getKey, entry.getValue)
- }
- }
+ override def compute(split: Split, context: TaskContext): Iterator[(K, V)] = {
+ SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index)
}
}