From 4775c55641f281523f105f9272f164033242a0aa Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 8 Oct 2012 17:29:33 -0700 Subject: Change ShuffleFetcher to return an Iterator. --- .../scala/spark/BlockStoreShuffleFetcher.scala | 22 ++-- core/src/main/scala/spark/PairRDDFunctions.scala | 44 ++++---- core/src/main/scala/spark/RDD.scala | 5 +- core/src/main/scala/spark/ShuffleFetcher.scala | 10 +- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 8 +- .../main/scala/spark/rdd/MapPartitionsRDD.scala | 5 +- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 112 +-------------------- 7 files changed, 49 insertions(+), 157 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 4554db2249..86432d0127 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -1,18 +1,12 @@ package spark -import java.io.EOFException -import java.net.URL - import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import spark.storage.BlockException import spark.storage.BlockManagerId -import it.unimi.dsi.fastutil.io.FastBufferedInputStream - private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { - def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) { + override def fetch[K, V](shuffleId: Int, reduceId: Int) = { logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) val blockManager = SparkEnv.get.blockManager @@ -31,14 +25,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2))) } - for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) { + def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = { + val blockId = blockPair._1 + val blockOption = blockPair._2 blockOption match { case Some(block) => { - val values = block - for(value <- values) { - val v = value.asInstanceOf[(K, V)] - func(v._1, v._2) - } + block.asInstanceOf[Iterator[(K, V)]] } case None => { val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r @@ -53,8 +45,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin } } } - - logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format( - shuffleId, reduceId, System.currentTimeMillis - startTime)) + blockManager.getMultiple(blocksByAddress).flatMap(unpackBlock) } } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 0240fd95c7..36cfda9cdb 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -1,10 +1,6 @@ package spark -import java.io.EOFException -import java.io.ObjectInputStream -import java.net.URL import java.util.{Date, HashMap => JHashMap} -import java.util.concurrent.atomic.AtomicLong import java.text.SimpleDateFormat import scala.collection.Map @@ -14,18 +10,11 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileOutputCommitter import org.apache.hadoop.mapred.FileOutputFormat import org.apache.hadoop.mapred.HadoopWriter import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.OutputCommitter import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} @@ -67,15 +56,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( partitioner: Partitioner, mapSideCombine: Boolean = true): RDD[(K, C)] = { val aggregator = - if (mapSideCombine) { - new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - } else { - // Don't apply map-side combiner. - // A sanity check to make sure mergeCombiners is not defined. - assert(mergeCombiners == null) - new Aggregator[K, V, C](createCombiner, mergeValue, null, false) - } - new ShuffledAggregatedRDD(self, aggregator, partitioner) + new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + if (mapSideCombine) { + val combiners = new ShuffledRDD[K, V, C](self, Some(aggregator), partitioner) + combiners.mapPartitions(aggregator.combineCombinersByKey(_), true) + } else { + // Don't apply map-side combiner. + // A sanity check to make sure mergeCombiners is not defined. + assert(mergeCombiners == null) + val values = new ShuffledRDD[K, V, V](self, None, partitioner) + values.mapPartitions(aggregator.combineValuesByKey(_), true) + } } /** @@ -184,7 +175,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( createCombiner _, mergeValue _, mergeCombiners _, partitioner) bufs.flatMapValues(buf => buf) } else { - new RepartitionShuffledRDD(self, partitioner) + new ShuffledRDD[K, V, V](self, None, partitioner) } } @@ -621,7 +612,16 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( * order of the keys). */ def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = { - new ShuffledSortedRDD(self, ascending, numSplits) + val shuffled = + new ShuffledRDD[K, V, V](self, None, new RangePartitioner(numSplits, self, ascending)) + shuffled.mapPartitions(iter => { + val buf = iter.toArray + if (ascending) { + buf.sortWith((x, y) => x._1 < y._1).iterator + } else { + buf.sortWith((x, y) => x._1 > y._1).iterator + } + }, true) } } diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index ddb420efff..338dff4061 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -282,8 +282,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] = - new MapPartitionsRDD(this, sc.clean(f)) + def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = + new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning) /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala index daa35fe7f2..d9a94d4021 100644 --- a/core/src/main/scala/spark/ShuffleFetcher.scala +++ b/core/src/main/scala/spark/ShuffleFetcher.scala @@ -1,10 +1,12 @@ package spark private[spark] abstract class ShuffleFetcher { - // Fetch the shuffle outputs for a given ShuffleDependency, calling func exactly - // once on each key-value pair obtained. - def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) + /** + * Fetch the shuffle outputs for a given ShuffleDependency. + * @return An iterator over the elements of the fetched shuffle outputs. + */ + def fetch[K, V](shuffleId: Int, reduceId: Int) : Iterator[(K, V)] - // Stop the fetcher + /** Stop the fetcher */ def stop() {} } diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index f1defbe492..cc92f1203c 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -94,13 +94,13 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle - def mergePair(k: K, vs: Seq[Any]) { - val mySeq = getSeq(k) - for (v <- vs) + def mergePair(pair: (K, Seq[Any])) { + val mySeq = getSeq(pair._1) + for (v <- pair._2) mySeq(depNum) += v } val fetcher = SparkEnv.get.shuffleFetcher - fetcher.fetch[K, Seq[Any]](shuffleId, split.index, mergePair) + fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair) } } map.iterator diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala index b2c7a1cb9e..a904ef62c3 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -7,8 +7,11 @@ import spark.Split private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], - f: Iterator[T] => Iterator[U]) + f: Iterator[T] => Iterator[U], + preservesPartitioning: Boolean = false) extends RDD[U](prev.context) { + + override val partitioner = if (preservesPartitioning) prev.partitioner else None override def splits = prev.splits override val dependencies = List(new OneToOneDependency(prev)) diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index 7577909b83..04234491a6 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -1,11 +1,7 @@ package spark.rdd -import scala.collection.mutable.ArrayBuffer -import java.util.{HashMap => JHashMap} - import spark.Aggregator import spark.Partitioner -import spark.RangePartitioner import spark.RDD import spark.ShuffleDependency import spark.SparkEnv @@ -16,15 +12,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split { override def hashCode(): Int = idx } - /** * The resulting RDD from a shuffle (e.g. repartitioning of data). */ -abstract class ShuffledRDD[K, V, C]( +class ShuffledRDD[K, V, C]( @transient parent: RDD[(K, V)], aggregator: Option[Aggregator[K, V, C]], - part: Partitioner) - extends RDD[(K, C)](parent.context) { + part: Partitioner) extends RDD[(K, C)](parent.context) { override val partitioner = Some(part) @@ -37,106 +31,8 @@ abstract class ShuffledRDD[K, V, C]( val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part) override val dependencies = List(dep) -} - - -/** - * Repartition a key-value pair RDD. - */ -class RepartitionShuffledRDD[K, V]( - @transient parent: RDD[(K, V)], - part: Partitioner) - extends ShuffledRDD[K, V, V]( - parent, - None, - part) { - - override def compute(split: Split): Iterator[(K, V)] = { - val buf = new ArrayBuffer[(K, V)] - val fetcher = SparkEnv.get.shuffleFetcher - def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) } - fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) - buf.iterator - } -} - - -/** - * A sort-based shuffle (that doesn't apply aggregation). It does so by first - * repartitioning the RDD by range, and then sort within each range. - */ -class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V]( - @transient parent: RDD[(K, V)], - ascending: Boolean, - numSplits: Int) - extends RepartitionShuffledRDD[K, V]( - parent, - new RangePartitioner(numSplits, parent, ascending)) { - - override def compute(split: Split): Iterator[(K, V)] = { - // By separating this from RepartitionShuffledRDD, we avoided a - // buf.iterator.toArray call, thus avoiding building up the buffer twice. - val buf = new ArrayBuffer[(K, V)] - def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) } - SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer) - if (ascending) { - buf.sortWith((x, y) => x._1 < y._1).iterator - } else { - buf.sortWith((x, y) => x._1 > y._1).iterator - } - } -} - - -/** - * The resulting RDD from shuffle and running (hash-based) aggregation. - */ -class ShuffledAggregatedRDD[K, V, C]( - @transient parent: RDD[(K, V)], - aggregator: Aggregator[K, V, C], - part : Partitioner) - extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) { override def compute(split: Split): Iterator[(K, C)] = { - val combiners = new JHashMap[K, C] - val fetcher = SparkEnv.get.shuffleFetcher - - if (aggregator.mapSideCombine) { - // Apply combiners on map partitions. In this case, post-shuffle we get a - // list of outputs from the combiners and merge them using mergeCombiners. - def mergePairWithMapSideCombiners(k: K, c: C) { - val oldC = combiners.get(k) - if (oldC == null) { - combiners.put(k, c) - } else { - combiners.put(k, aggregator.mergeCombiners(oldC, c)) - } - } - fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners) - } else { - // Do not apply combiners on map partitions (i.e. map side aggregation is - // turned off). Post-shuffle we get a list of values and we use mergeValue - // to merge them. - def mergePairWithoutMapSideCombiners(k: K, v: V) { - val oldC = combiners.get(k) - if (oldC == null) { - combiners.put(k, aggregator.createCombiner(v)) - } else { - combiners.put(k, aggregator.mergeValue(oldC, v)) - } - } - fetcher.fetch[K, V](dep.shuffleId, split.index, mergePairWithoutMapSideCombiners) - } - - return new Iterator[(K, C)] { - var iter = combiners.entrySet().iterator() - - def hasNext: Boolean = iter.hasNext() - - def next(): (K, C) = { - val entry = iter.next() - (entry.getKey, entry.getValue) - } - } + SparkEnv.get.shuffleFetcher.fetch[K, C](dep.shuffleId, split.index) } -} +} \ No newline at end of file -- cgit v1.2.3