aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-23 22:01:45 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-23 22:01:45 -0700
commit0bd20c63e2ed3ca8e3577c2805678d9d34218ffd (patch)
treedc779da79fa94a12ee18697a4d3b3424dbd21acd /core/src
parent7849216bba414b2b2a63a4b093bea8f6397384d9 (diff)
parent33cd3a0c12bf487a9060135c6cf2a3efa7943c77 (diff)
downloadspark-0bd20c63e2ed3ca8e3577c2805678d9d34218ffd.tar.gz
spark-0bd20c63e2ed3ca8e3577c2805678d9d34218ffd.tar.bz2
spark-0bd20c63e2ed3ca8e3577c2805678d9d34218ffd.zip
Merge remote-tracking branch 'JoshRosen/shuffle_refactoring' into dev
Conflicts: core/src/main/scala/spark/Dependency.scala core/src/main/scala/spark/rdd/CoGroupedRDD.scala core/src/main/scala/spark/rdd/ShuffledRDD.scala
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/Aggregator.scala37
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala22
-rw-r--r--core/src/main/scala/spark/Dependency.scala4
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala45
-rw-r--r--core/src/main/scala/spark/RDD.scala5
-rw-r--r--core/src/main/scala/spark/ShuffleFetcher.scala10
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala23
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala5
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala118
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala43
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala39
13 files changed, 113 insertions, 250 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index b0daa70cfd..df8ce9c054 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -1,17 +1,44 @@
package spark
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.JavaConversions._
+
/** A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
- * @param mapSideCombine whether to apply combiners on map partitions, also
- * known as map-side aggregations. When set to false,
- * mergeCombiners function is not used.
*/
case class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C,
- val mapSideCombine: Boolean = true)
+ val mergeCombiners: (C, C) => C) {
+
+ def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
+ val combiners = new JHashMap[K, C]
+ for ((k, v) <- iter) {
+ val oldC = combiners.get(k)
+ if (oldC == null) {
+ combiners.put(k, createCombiner(v))
+ } else {
+ combiners.put(k, mergeValue(oldC, v))
+ }
+ }
+ combiners.iterator
+ }
+
+ def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
+ val combiners = new JHashMap[K, C]
+ for ((k, c) <- iter) {
+ val oldC = combiners.get(k)
+ if (oldC == null) {
+ combiners.put(k, c)
+ } else {
+ combiners.put(k, mergeCombiners(oldC, c))
+ }
+ }
+ combiners.iterator
+ }
+}
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 4554db2249..86432d0127 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -1,18 +1,12 @@
package spark
-import java.io.EOFException
-import java.net.URL
-
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import spark.storage.BlockException
import spark.storage.BlockManagerId
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
- def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit) {
+ override def fetch[K, V](shuffleId: Int, reduceId: Int) = {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
@@ -31,14 +25,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
(address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
}
- for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) {
+ def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = {
+ val blockId = blockPair._1
+ val blockOption = blockPair._2
blockOption match {
case Some(block) => {
- val values = block
- for(value <- values) {
- val v = value.asInstanceOf[(K, V)]
- func(v._1, v._2)
- }
+ block.asInstanceOf[Iterator[(K, V)]]
}
case None => {
val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
@@ -53,8 +45,6 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
}
}
}
-
- logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format(
- shuffleId, reduceId, System.currentTimeMillis - startTime))
+ blockManager.getMultiple(blocksByAddress).flatMap(unpackBlock)
}
}
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index dfc7e292b7..b85d2732db 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -22,12 +22,10 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* Represents a dependency on the output of a shuffle stage.
* @param shuffleId the shuffle id
* @param rdd the parent RDD
- * @param aggregator optional aggregator; this allows for map-side combining
* @param partitioner partitioner used to partition the shuffle output
*/
-class ShuffleDependency[K, V, C](
+class ShuffleDependency[K, V](
@transient rdd: RDD[(K, V)],
- val aggregator: Option[Aggregator[K, V, C]],
val partitioner: Partitioner)
extends Dependency(rdd) {
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index d693b4e820..e5bb639cfd 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -1,10 +1,6 @@
package spark
-import java.io.EOFException
-import java.io.ObjectInputStream
-import java.net.URL
import java.util.{Date, HashMap => JHashMap}
-import java.util.concurrent.atomic.AtomicLong
import java.text.SimpleDateFormat
import scala.collection.Map
@@ -14,18 +10,11 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}
@@ -64,15 +53,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
partitioner: Partitioner,
mapSideCombine: Boolean = true): RDD[(K, C)] = {
val aggregator =
- if (mapSideCombine) {
- new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
- } else {
- // Don't apply map-side combiner.
- // A sanity check to make sure mergeCombiners is not defined.
- assert(mergeCombiners == null)
- new Aggregator[K, V, C](createCombiner, mergeValue, null, false)
- }
- new ShuffledAggregatedRDD(self, aggregator, partitioner)
+ new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ if (mapSideCombine) {
+ val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
+ val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
+ partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
+ } else {
+ // Don't apply map-side combiner.
+ // A sanity check to make sure mergeCombiners is not defined.
+ assert(mergeCombiners == null)
+ val values = new ShuffledRDD[K, V](self, partitioner)
+ values.mapPartitions(aggregator.combineValuesByKey(_), true)
+ }
}
/**
@@ -181,7 +173,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.flatMapValues(buf => buf)
} else {
- new RepartitionShuffledRDD(self, partitioner)
+ new ShuffledRDD[K, V](self, partitioner)
}
}
@@ -618,7 +610,16 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
* order of the keys).
*/
def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = {
- new ShuffledSortedRDD(self, ascending, numSplits)
+ val shuffled =
+ new ShuffledRDD[K, V](self, new RangePartitioner(numSplits, self, ascending))
+ shuffled.mapPartitions(iter => {
+ val buf = iter.toArray
+ if (ascending) {
+ buf.sortWith((x, y) => x._1 < y._1).iterator
+ } else {
+ buf.sortWith((x, y) => x._1 > y._1).iterator
+ }
+ }, true)
}
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index ddb420efff..338dff4061 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -282,8 +282,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
- new MapPartitionsRDD(this, sc.clean(f))
+ def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
+ preservesPartitioning: Boolean = false): RDD[U] =
+ new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala
index daa35fe7f2..d9a94d4021 100644
--- a/core/src/main/scala/spark/ShuffleFetcher.scala
+++ b/core/src/main/scala/spark/ShuffleFetcher.scala
@@ -1,10 +1,12 @@
package spark
private[spark] abstract class ShuffleFetcher {
- // Fetch the shuffle outputs for a given ShuffleDependency, calling func exactly
- // once on each key-value pair obtained.
- def fetch[K, V](shuffleId: Int, reduceId: Int, func: (K, V) => Unit)
+ /**
+ * Fetch the shuffle outputs for a given ShuffleDependency.
+ * @return An iterator over the elements of the fetched shuffle outputs.
+ */
+ def fetch[K, V](shuffleId: Int, reduceId: Int) : Iterator[(K, V)]
- // Stop the fetcher
+ /** Stop the fetcher */
def stop() {}
}
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index ace2500627..50bec9e63b 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -1,9 +1,5 @@
package spark.rdd
-import java.net.URL
-import java.io.EOFException
-import java.io.ObjectInputStream
-
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -43,12 +39,13 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
override val dependencies = {
val deps = new ArrayBuffer[Dependency[_]]
for ((rdd, index) <- rdds.zipWithIndex) {
- if (rdd.partitioner == Some(part)) {
- logInfo("Adding one-to-one dependency with " + rdd)
- deps += new OneToOneDependency(rdd)
+ val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
+ if (mapSideCombinedRDD.partitioner == Some(part)) {
+ logInfo("Adding one-to-one dependency with " + mapSideCombinedRDD)
+ deps += new OneToOneDependency(mapSideCombinedRDD)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](rdd, Some(aggr), part)
+ deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
}
}
deps.toList
@@ -61,7 +58,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
for (i <- 0 until array.size) {
array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
dependencies(j) match {
- case s: ShuffleDependency[_, _, _] =>
+ case s: ShuffleDependency[_, _] =>
new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
case _ =>
new NarrowCoGroupSplitDep(r, r.splits(i)): CoGroupSplitDep
@@ -93,13 +90,13 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
- def mergePair(k: K, vs: Seq[Any]) {
- val mySeq = getSeq(k)
- for (v <- vs)
+ def mergePair(pair: (K, Seq[Any])) {
+ val mySeq = getSeq(pair._1)
+ for (v <- pair._2)
mySeq(depNum) += v
}
val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[K, Seq[Any]](shuffleId, split.index, mergePair)
+ fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
}
}
map.iterator
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
index b2c7a1cb9e..a904ef62c3 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -7,8 +7,11 @@ import spark.Split
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
- f: Iterator[T] => Iterator[U])
+ f: Iterator[T] => Iterator[U],
+ preservesPartitioning: Boolean = false)
extends RDD[U](prev.context) {
+
+ override val partitioner = if (preservesPartitioning) prev.partitioner else None
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index be120acc71..145e419c53 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,11 +1,6 @@
package spark.rdd
-import scala.collection.mutable.ArrayBuffer
-import java.util.{HashMap => JHashMap}
-
-import spark.Aggregator
import spark.Partitioner
-import spark.RangePartitioner
import spark.RDD
import spark.ShuffleDependency
import spark.SparkEnv
@@ -16,15 +11,16 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override def hashCode(): Int = idx
}
-
/**
* The resulting RDD from a shuffle (e.g. repartitioning of data).
+ * @param parent the parent RDD.
+ * @param part the partitioner used to partition the RDD
+ * @tparam K the key class.
+ * @tparam V the value class.
*/
-abstract class ShuffledRDD[K, V, C](
+class ShuffledRDD[K, V](
@transient parent: RDD[(K, V)],
- aggregator: Option[Aggregator[K, V, C]],
- part: Partitioner)
- extends RDD[(K, C)](parent.context) {
+ part: Partitioner) extends RDD[(K, V)](parent.context) {
override val partitioner = Some(part)
@@ -35,108 +31,10 @@ abstract class ShuffledRDD[K, V, C](
override def preferredLocations(split: Split) = Nil
- val dep = new ShuffleDependency(parent, aggregator, part)
+ val dep = new ShuffleDependency(parent, part)
override val dependencies = List(dep)
-}
-
-
-/**
- * Repartition a key-value pair RDD.
- */
-class RepartitionShuffledRDD[K, V](
- @transient parent: RDD[(K, V)],
- part: Partitioner)
- extends ShuffledRDD[K, V, V](
- parent,
- None,
- part) {
override def compute(split: Split): Iterator[(K, V)] = {
- val buf = new ArrayBuffer[(K, V)]
- val fetcher = SparkEnv.get.shuffleFetcher
- def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
- fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- buf.iterator
- }
-}
-
-
-/**
- * A sort-based shuffle (that doesn't apply aggregation). It does so by first
- * repartitioning the RDD by range, and then sort within each range.
- */
-class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
- @transient parent: RDD[(K, V)],
- ascending: Boolean,
- numSplits: Int)
- extends RepartitionShuffledRDD[K, V](
- parent,
- new RangePartitioner(numSplits, parent, ascending)) {
-
- override def compute(split: Split): Iterator[(K, V)] = {
- // By separating this from RepartitionShuffledRDD, we avoided a
- // buf.iterator.toArray call, thus avoiding building up the buffer twice.
- val buf = new ArrayBuffer[(K, V)]
- def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) }
- SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
- if (ascending) {
- buf.sortWith((x, y) => x._1 < y._1).iterator
- } else {
- buf.sortWith((x, y) => x._1 > y._1).iterator
- }
- }
-}
-
-
-/**
- * The resulting RDD from shuffle and running (hash-based) aggregation.
- */
-class ShuffledAggregatedRDD[K, V, C](
- @transient parent: RDD[(K, V)],
- aggregator: Aggregator[K, V, C],
- part : Partitioner)
- extends ShuffledRDD[K, V, C](parent, Some(aggregator), part) {
-
- override def compute(split: Split): Iterator[(K, C)] = {
- val combiners = new JHashMap[K, C]
- val fetcher = SparkEnv.get.shuffleFetcher
-
- if (aggregator.mapSideCombine) {
- // Apply combiners on map partitions. In this case, post-shuffle we get a
- // list of outputs from the combiners and merge them using mergeCombiners.
- def mergePairWithMapSideCombiners(k: K, c: C) {
- val oldC = combiners.get(k)
- if (oldC == null) {
- combiners.put(k, c)
- } else {
- combiners.put(k, aggregator.mergeCombiners(oldC, c))
- }
- }
- fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners)
- } else {
- // Do not apply combiners on map partitions (i.e. map side aggregation is
- // turned off). Post-shuffle we get a list of values and we use mergeValue
- // to merge them.
- def mergePairWithoutMapSideCombiners(k: K, v: V) {
- val oldC = combiners.get(k)
- if (oldC == null) {
- combiners.put(k, aggregator.createCombiner(v))
- } else {
- combiners.put(k, aggregator.mergeValue(oldC, v))
- }
- }
- fetcher.fetch[K, V](dep.shuffleId, split.index, mergePairWithoutMapSideCombiners)
- }
-
- return new Iterator[(K, C)] {
- var iter = combiners.entrySet().iterator()
-
- def hasNext: Boolean = iter.hasNext()
-
- def next(): (K, C) = {
- val entry = iter.next()
- (entry.getKey, entry.getValue)
- }
- }
+ SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index)
}
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 6f4c6bffd7..aaaed59c4a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -104,7 +104,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
* The priority value passed in will be used if the stage doesn't already exist with
* a lower priority (we assume that priorities always increase across jobs for now).
*/
- def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_,_], priority: Int): Stage = {
+ def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
@@ -119,7 +119,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
* as a result stage for the final RDD used directly in an action. The stage will also be given
* the provided priority.
*/
- def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]], priority: Int): Stage = {
+ def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of splits is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
@@ -149,7 +149,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
cacheTracker.registerRDD(r.id, r.splits.size)
for (dep <- r.dependencies) {
dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
+ case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, priority)
case _ =>
visit(dep.rdd)
@@ -172,7 +172,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (locs(p) == Nil) {
for (dep <- rdd.dependencies) {
dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
+ case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
missing += mapStage
@@ -549,7 +549,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
visitedRdds += rdd
for (dep <- rdd.dependencies) {
dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
+ case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
visitedStages += mapStage
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 86796d3677..60105c42b6 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -22,7 +22,7 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new JHashMap[Int, Array[Byte]]
- def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = {
+ def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
val old = serializedInfoCache.get(stageId)
if (old != null) {
@@ -41,14 +41,14 @@ private[spark] object ShuffleMapTask {
}
}
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
+ def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
synchronized {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance
val objIn = ser.deserializeStream(in)
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
+ val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
return (rdd, dep)
}
}
@@ -71,7 +71,7 @@ private[spark] object ShuffleMapTask {
private[spark] class ShuffleMapTask(
stageId: Int,
var rdd: RDD[_],
- var dep: ShuffleDependency[_,_,_],
+ var dep: ShuffleDependency[_,_],
var partition: Int,
@transient var locs: Seq[String])
extends Task[MapStatus](stageId)
@@ -113,33 +113,14 @@ private[spark] class ShuffleMapTask(
val numOutputSplits = dep.partitioner.numPartitions
val partitioner = dep.partitioner
- val bucketIterators =
- if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) {
- val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]]
- // Apply combiners (map-side aggregation) to the map output.
- val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any])
- for (elem <- rdd.iterator(split)) {
- val (k, v) = elem.asInstanceOf[(Any, Any)]
- val bucketId = partitioner.getPartition(k)
- val bucket = buckets(bucketId)
- val existing = bucket.get(k)
- if (existing == null) {
- bucket.put(k, aggregator.createCombiner(v))
- } else {
- bucket.put(k, aggregator.mergeValue(existing, v))
- }
- }
- buckets.map(_.iterator)
- } else {
- // No combiners (no map-side aggregation). Simply partition the map output.
- val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
- for (elem <- rdd.iterator(split)) {
- val pair = elem.asInstanceOf[(Any, Any)]
- val bucketId = partitioner.getPartition(pair._1)
- buckets(bucketId) += pair
- }
- buckets.map(_.iterator)
- }
+ // Partition the map output.
+ val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
+ for (elem <- rdd.iterator(split)) {
+ val pair = elem.asInstanceOf[(Any, Any)]
+ val bucketId = partitioner.getPartition(pair._1)
+ buckets(bucketId) += pair
+ }
+ val bucketIterators = buckets.map(_.iterator)
val compressedSizes = new Array[Byte](numOutputSplits)
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 1149c00a23..4846b66729 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -22,7 +22,7 @@ import spark.storage.BlockManagerId
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
- val shuffleDep: Option[ShuffleDependency[_,_,_]], // Output shuffle if stage is a map stage
+ val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val priority: Int)
extends Logging {
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 7f8ec5d48f..8170100f1d 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -12,8 +12,8 @@ import org.scalacheck.Prop._
import com.google.common.io.Files
-import spark.rdd.ShuffledAggregatedRDD
-import SparkContext._
+import spark.rdd.ShuffledRDD
+import spark.SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
@@ -216,41 +216,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
}
-
- test("map-side combine") {
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1), (1, 1)), 2)
-
- // Test with map-side combine on.
- val sums = pairs.reduceByKey(_+_).collect()
- assert(sums.toSet === Set((1, 8), (2, 1)))
-
- // Turn off map-side combine and test the results.
- val aggregator = new Aggregator[Int, Int, Int](
- (v: Int) => v,
- _+_,
- _+_,
- false)
- val shuffledRdd = new ShuffledAggregatedRDD(
- pairs, aggregator, new HashPartitioner(2))
- assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
-
- // Turn map-side combine off and pass a wrong mergeCombine function. Should
- // not see an exception because mergeCombine should not have been called.
- val aggregatorWithException = new Aggregator[Int, Int, Int](
- (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
- val shuffledRdd1 = new ShuffledAggregatedRDD(
- pairs, aggregatorWithException, new HashPartitioner(2))
- assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
-
- // Now run the same mergeCombine function with map-side combine on. We
- // expect to see an exception thrown.
- val aggregatorWithException1 = new Aggregator[Int, Int, Int](
- (v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
- val shuffledRdd2 = new ShuffledAggregatedRDD(
- pairs, aggregatorWithException1, new HashPartitioner(2))
- evaluating { shuffledRdd2.collect() } should produce [SparkException]
- }
}
object ShuffleSuite {