aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-08 17:29:33 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-13 14:59:20 -0700
commit4775c55641f281523f105f9272f164033242a0aa (patch)
tree84c9bfef3dc0c013bf53ee4620e1ee3a50100cca /core/src/main
parent110832e88f0c25174836c7f92b1ab45b0ededf86 (diff)
downloadspark-4775c55641f281523f105f9272f164033242a0aa.tar.gz
spark-4775c55641f281523f105f9272f164033242a0aa.tar.bz2
spark-4775c55641f281523f105f9272f164033242a0aa.zip
Change ShuffleFetcher to return an Iterator.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala22
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala44
-rw-r--r--core/src/main/scala/spark/RDD.scala5
-rw-r--r--core/src/main/scala/spark/ShuffleFetcher.scala10
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala8
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala5
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala112
7 files changed, 49 insertions, 157 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 4554db2249..86432d0127 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -1,18 +1,12 @@
package spark
-import java.io.EOFException
-import java.net.URL
-
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import spark.storage.BlockException
import spark.storage.BlockManagerId
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
- def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
+ override def fetch[K, V](shuffleId: Int, reduceId: Int) = {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
@@ -31,14 +25,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
(address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
}
- for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) {
+ def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = {
+ val blockId = blockPair._1
+ val blockOption = blockPair._2
blockOption match {
case Some(block) => {
- val values = block
- for(value <- values) {
- val v = value.asInstanceOf[(K, V)]
- func(v._1, v._2)
- }
+ block.asInstanceOf[Iterator[(K, V)]]
}
case None => {
val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
@@ -53,8 +45,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
}
}
}
-
- logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format(
- shuffleId, reduceId, System.currentTimeMillis - startTime))
+ blockManager.getMultiple(blocksByAddress).flatMap(unpackBlock)
}
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0240fd95c7..36cfda9cdb 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -1,10 +1,6 @@
package spark
-import java.io.EOFException
-import java.io.ObjectInputStream
-import java.net.URL
import java.util.{Date, HashMap => JHashMap}
-import java.util.concurrent.atomic.AtomicLong
import java.text.SimpleDateFormat
import scala.collection.Map
@@ -14,18 +10,11 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@@ -67,15 +56,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
partitioner: Partitioner,
mapSideCombine: Boolean = true): RDD[(K, C)] = {
val aggregator =
- if (mapSideCombine) {
- new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
- } else {
- // Don't apply map-side combiner.
- // A sanity check to make sure mergeCombiners is not defined.
- assert(mergeCombiners == null)
- new Aggregator[K, V, C](createCombiner, mergeValue, null, false)
- }
- new ShuffledAggregatedRDD(self, aggregator, partitioner)
+ new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ if (mapSideCombine) {
+ val combiners = new ShuffledRDD[K, V, C](self, Some(aggregator), partitioner)
+ combiners.mapPartitions(aggregator.combineCombinersByKey(_), true)
+ } else {
+ // Don't apply map-side combiner.
+ // A sanity check to make sure mergeCombiners is not defined.
+ assert(mergeCombiners == null)
+ val values = new ShuffledRDD[K, V, V](self, None, partitioner)
+ values.mapPartitions(aggregator.combineValuesByKey(_), true)
+ }
}
/**
@@ -184,7 +175,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.flatMapValues(buf => buf)
} else {
- new RepartitionShuffledRDD(self, partitioner)
+ new ShuffledRDD[K, V, V](self, None, partitioner)
}
}
@@ -621,7 +612,16 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
* order of the keys).
*/
def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = {
- new ShuffledSortedRDD(self, ascending, numSplits)
+ val shuffled =
+ new ShuffledRDD[K, V, V](self, None, new RangePartitioner(numSplits, self, ascending))
+ shuffled.mapPartitions(iter => {
+ val buf = iter.toArray
+ if (ascending) {
+ buf.sortWith((x, y) => x._1 < y._1).iterator
+ } else {
+ buf.sortWith((x, y) => x._1 > y._1).iterator
+ }
+ }, true)
}
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index ddb420efff..338dff4061 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -282,8 +282,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
- new MapPartitionsRDD(this, sc.clean(f))
+ def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] =
+ new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala
index daa35fe7f2..d9a94d4021 100644
--- a/core/src/main/scala/spark/ShuffleFetcher.scala
+++ b/core/src/main/scala/spark/ShuffleFetcher.scala
@@ -1,10 +1,12 @@
package spark
private[spark] abstract class ShuffleFetcher {
- // Fetch the shuffle outputs for a given ShuffleDependency, calling func exactly
- // once on each key-value pair obtained.
- def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit)
+ /**
+ * Fetch the shuffle outputs for a given ShuffleDependency.
+ * @return An iterator over the elements of the fetched shuffle outputs.
+ */
+ def fetch[K, V](shuffleId: Int, reduceId: Int) : Iterator[(K, V)]
- // Stop the fetcher
+ /** Stop the fetcher */
def stop() {}
}
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index f1defbe492..cc92f1203c 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -94,13 +94,13 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
- def mergePair(k: K, vs: Seq[Any]) {
- val mySeq = getSeq(k)
- for (v <- vs)
+ def mergePair(pair: (K, Seq[Any])) {
+ val mySeq = getSeq(pair._1)
+ for (v <- pair._2)
mySeq(depNum) += v
}
val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[K, Seq[Any]](shuffleId, split.index, mergePair)
+ fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
}
}
map.iterator
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
index b2c7a1cb9e..a904ef62c3 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -7,8 +7,11 @@ import spark.Split
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
- f: Iterator[T] => Iterator[U])
+ f: Iterator[T] => Iterator[U],
+ preservesPartitioning: Boolean = false)
extends RDD[U](prev.context) {
+
+ override val partitioner = if (preservesPartitioning) prev.partitioner else None
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 7577909b83..04234491a6 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,11 +1,7 @@
package spark.rdd
-import scala.collection.mutable.ArrayBuffer
-import java.util.{HashMap => JHashMap}
-
import spark.Aggregator
import spark.Partitioner
-import spark.RangePartitioner
import spark.RDD
import spark.ShuffleDependency
import spark.SparkEnv
@@ -16,15 +12,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override def hashCode(): Int = idx
}
-
/**
* The resulting RDD from a shuffle (e.g. repartitioning of data).
*/
-abstract class ShuffledRDD[K, V, C](
+class ShuffledRDD[K, V, C](
@transient parent: RDD[(K, V)],
aggregator: Option[Aggregator[K, V, C]],
- part: Partitioner)
- extends RDD[(K, C)](parent.context) {
+ part: Partitioner) extends RDD[(K, C)](parent.context) {
override val partitioner = Some(part)
@@ -37,106 +31,8 @@ abstract class ShuffledRDD[K, V, C](
val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
override val dependencies = List(dep)
-}
-
-
-/**
- * Repartition a key-value pair RDD.
- */
-class RepartitionShuffledRDD[K, V](
- @transient parent: RDD[(K, V)],
- part: Partitioner)
- extends ShuffledRDD[K, V, V](
- parent,
- None,
- part) {
-
- override def compute(split: Split): Iterator[(K, V)] = {
- val buf = new ArrayBuffer[(K, V)]
- val fetcher = SparkEnv.get.shuffleFetcher
- def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
- fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- buf.iterator
- }
-}
-
-
-/**
- * A sort-based shuffle (that doesn't apply aggregation). It does so by first
- * repartitioning the RDD by range, and then sort within each range.
- */
-class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
- @transient parent: RDD[(K, V)],
- ascending: Boolean,
- numSplits: Int)
- extends RepartitionShuffledRDD[K, V](
- parent,
- new RangePartitioner(numSplits, parent, ascending)) {
-
- override def compute(split: Split): Iterator[(K, V)] = {
- // By separating this from RepartitionShuffledRDD, we avoided a
- // buf.iterator.toArray call, thus avoiding building up the buffer twice.
- val buf = new ArrayBuffer[(K, V)]
- def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) }
- SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- if (ascending) {
- buf.sortWith((x, y) => x._1 < y._1).iterator
- } else {
- buf.sortWith((x, y) => x._1 > y._1).iterator
- }
- }
-}
-
-
-/**
- * The resulting RDD from shuffle and running (hash-based) aggregation.
- */
-class ShuffledAggregatedRDD[K, V, C](
- @transient parent: RDD[(K, V)],
- aggregator: Aggregator[K, V, C],
- part : Partitioner)
- extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) {
override def compute(split: Split): Iterator[(K, C)] = {
- val combiners = new JHashMap[K, C]
- val fetcher = SparkEnv.get.shuffleFetcher
-
- if (aggregator.mapSideCombine) {
- // Apply combiners on map partitions. In this case, post-shuffle we get a
- // list of outputs from the combiners and merge them using mergeCombiners.
- def mergePairWithMapSideCombiners(k: K, c: C) {
- val oldC = combiners.get(k)
- if (oldC == null) {
- combiners.put(k, c)
- } else {
- combiners.put(k, aggregator.mergeCombiners(oldC, c))
- }
- }
- fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners)
- } else {
- // Do not apply combiners on map partitions (i.e. map side aggregation is
- // turned off). Post-shuffle we get a list of values and we use mergeValue
- // to merge them.
- def mergePairWithoutMapSideCombiners(k: K, v: V) {
- val oldC = combiners.get(k)
- if (oldC == null) {
- combiners.put(k, aggregator.createCombiner(v))
- } else {
- combiners.put(k, aggregator.mergeValue(oldC, v))
- }
- }
- fetcher.fetch[K, V](dep.shuffleId, split.index, mergePairWithoutMapSideCombiners)
- }
-
- return new Iterator[(K, C)] {
- var iter = combiners.entrySet().iterator()
-
- def hasNext: Boolean = iter.hasNext()
-
- def next(): (K, C) = {
- val entry = iter.next()
- (entry.getKey, entry.getValue)
- }
- }
+ SparkEnv.get.shuffleFetcher.fetch[K, C](dep.shuffleId, split.index)
}
-}
+} \ No newline at end of file