aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-13 14:57:33 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-13 14:59:20 -0700
commit33cd3a0c12bf487a9060135c6cf2a3efa7943c77 (patch)
treef99125fea55f30258d44fdd81921765865e95f68 /core/src/main
parent10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e (diff)
downloadspark-33cd3a0c12bf487a9060135c6cf2a3efa7943c77.tar.gz
spark-33cd3a0c12bf487a9060135c6cf2a3efa7943c77.tar.bz2
spark-33cd3a0c12bf487a9060135c6cf2a3efa7943c77.zip
Remove map-side combining from ShuffleMapTask.
This separation of concerns simplifies the ShuffleDependency and ShuffledRDD interfaces. Map-side combining can be performed in a mapPartitions() call prior to shuffling the RDD. I don't anticipate this having much of a performance impact: in both approaches, each tuple is hashed twice: once in the bucket partitioning and once in the combiner's hashtable. The same steps are being performed, but in a different order and through one extra Iterator.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/Dependency.scala4
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala11
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala17
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala43
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala2
7 files changed, 37 insertions, 65 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index 5a67073ef4..d5f54d6cbd 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -22,13 +22,11 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* Represents a dependency on the output of a shuffle stage.
* @param shuffleId the shuffle id
* @param rdd the parent RDD
- * @param aggregator optional aggregator; if provided, map-side combining will be performed
* @param partitioner partitioner used to partition the shuffle output
*/
-class ShuffleDependency[K, V, C](
+class ShuffleDependency[K, V](
val shuffleId: Int,
@transient rdd: RDD[(K, V)],
- val aggregator: Option[Aggregator[K, V, C]],
val partitioner: Partitioner)
extends Dependency(rdd)
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 36cfda9cdb..9cb2378048 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -58,13 +58,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val aggregator =
new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (mapSideCombine) {
- val combiners = new ShuffledRDD[K, V, C](self, Some(aggregator), partitioner)
- combiners.mapPartitions(aggregator.combineCombinersByKey(_), true)
+ val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
+ val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
+ partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
- val values = new ShuffledRDD[K, V, V](self, None, partitioner)
+ val values = new ShuffledRDD[K, V](self, partitioner)
values.mapPartitions(aggregator.combineValuesByKey(_), true)
}
}
@@ -175,7 +176,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
createCombiner _, mergeValue _, mergeCombiners _, partitioner)
bufs.flatMapValues(buf => buf)
} else {
- new ShuffledRDD[K, V, V](self, None, partitioner)
+ new ShuffledRDD[K, V](self, partitioner)
}
}
@@ -613,7 +614,7 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
*/
def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = {
val shuffled =
- new ShuffledRDD[K, V, V](self, None, new RangePartitioner(numSplits, self, ascending))
+ new ShuffledRDD[K, V](self, new RangePartitioner(numSplits, self, ascending))
shuffled.mapPartitions(iter => {
val buf = iter.toArray
if (ascending) {
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index cc92f1203c..551085815c 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -1,9 +1,5 @@
package spark.rdd
-import java.net.URL
-import java.io.EOFException
-import java.io.ObjectInputStream
-
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -43,13 +39,14 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
override val dependencies = {
val deps = new ArrayBuffer[Dependency[_]]
for ((rdd, index) <- rdds.zipWithIndex) {
- if (rdd.partitioner == Some(part)) {
- logInfo("Adding one-to-one dependency with " + rdd)
- deps += new OneToOneDependency(rdd)
+ val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
+ if (mapSideCombinedRDD.partitioner == Some(part)) {
+ logInfo("Adding one-to-one dependency with " + mapSideCombinedRDD)
+ deps += new OneToOneDependency(mapSideCombinedRDD)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- deps += new ShuffleDependency[Any, Any, ArrayBuffer[Any]](
- context.newShuffleId, rdd, Some(aggr), part)
+ deps += new ShuffleDependency[Any, ArrayBuffer[Any]](
+ context.newShuffleId, mapSideCombinedRDD, part)
}
}
deps.toList
@@ -62,7 +59,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
for (i <- 0 until array.size) {
array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
dependencies(j) match {
- case s: ShuffleDependency[_, _, _] =>
+ case s: ShuffleDependency[_, _] =>
new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
case _ =>
new NarrowCoGroupSplitDep(r, r.splits(i)): CoGroupSplitDep
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 8b1c29b065..3a173ece1a 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,6 +1,5 @@
package spark.rdd
-import spark.Aggregator
import spark.Partitioner
import spark.RDD
import spark.ShuffleDependency
@@ -15,17 +14,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
/**
* The resulting RDD from a shuffle (e.g. repartitioning of data).
* @param parent the parent RDD.
- * @param aggregator if provided, this aggregator will be used to perform map-side combining.
* @param part the partitioner used to partition the RDD
* @tparam K the key class.
* @tparam V the value class.
- * @tparam C if map side combiners are used, then this is the combiner type; otherwise,
- * this is the same as V.
*/
-class ShuffledRDD[K, V, C](
+class ShuffledRDD[K, V](
@transient parent: RDD[(K, V)],
- aggregator: Option[Aggregator[K, V, C]],
- part: Partitioner) extends RDD[(K, C)](parent.context) {
+ part: Partitioner) extends RDD[(K, V)](parent.context) {
override val partitioner = Some(part)
@@ -36,10 +31,10 @@ class ShuffledRDD[K, V, C](
override def preferredLocations(split: Split) = Nil
- val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
+ val dep = new ShuffleDependency(context.newShuffleId, parent, part)
override val dependencies = List(dep)
- override def compute(split: Split): Iterator[(K, C)] = {
- SparkEnv.get.shuffleFetcher.fetch[K, C](dep.shuffleId, split.index)
+ override def compute(split: Split): Iterator[(K, V)] = {
+ SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index)
}
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 6f4c6bffd7..aaaed59c4a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -104,7 +104,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
* The priority value passed in will be used if the stage doesn't already exist with
* a lower priority (we assume that priorities always increase across jobs for now).
*/
- def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_,_], priority: Int): Stage = {
+ def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
@@ -119,7 +119,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
* as a result stage for the final RDD used directly in an action. The stage will also be given
* the provided priority.
*/
- def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]], priority: Int): Stage = {
+ def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], priority: Int): Stage = {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of splits is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
@@ -149,7 +149,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
cacheTracker.registerRDD(r.id, r.splits.size)
for (dep <- r.dependencies) {
dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
+ case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, priority)
case _ =>
visit(dep.rdd)
@@ -172,7 +172,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (locs(p) == Nil) {
for (dep <- rdd.dependencies) {
dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
+ case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
missing += mapStage
@@ -549,7 +549,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
visitedRdds += rdd
for (dep <- rdd.dependencies) {
dep match {
- case shufDep: ShuffleDependency[_,_,_] =>
+ case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
visitedStages += mapStage
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index c97be18844..60105c42b6 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -22,7 +22,7 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new JHashMap[Int, Array[Byte]]
- def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_,_]): Array[Byte] = {
+ def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
val old = serializedInfoCache.get(stageId)
if (old != null) {
@@ -41,14 +41,14 @@ private[spark] object ShuffleMapTask {
}
}
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
+ def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
synchronized {
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance
val objIn = ser.deserializeStream(in)
val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_,_]]
+ val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
return (rdd, dep)
}
}
@@ -71,7 +71,7 @@ private[spark] object ShuffleMapTask {
private[spark] class ShuffleMapTask(
stageId: Int,
var rdd: RDD[_],
- var dep: ShuffleDependency[_,_,_],
+ var dep: ShuffleDependency[_,_],
var partition: Int,
@transient var locs: Seq[String])
extends Task[MapStatus](stageId)
@@ -113,33 +113,14 @@ private[spark] class ShuffleMapTask(
val numOutputSplits = dep.partitioner.numPartitions
val partitioner = dep.partitioner
- val bucketIterators =
- if (dep.aggregator.isDefined) {
- val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]]
- // Apply combiners (map-side aggregation) to the map output.
- val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any])
- for (elem <- rdd.iterator(split)) {
- val (k, v) = elem.asInstanceOf[(Any, Any)]
- val bucketId = partitioner.getPartition(k)
- val bucket = buckets(bucketId)
- val existing = bucket.get(k)
- if (existing == null) {
- bucket.put(k, aggregator.createCombiner(v))
- } else {
- bucket.put(k, aggregator.mergeValue(existing, v))
- }
- }
- buckets.map(_.iterator)
- } else {
- // No combiners (no map-side aggregation). Simply partition the map output.
- val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
- for (elem <- rdd.iterator(split)) {
- val pair = elem.asInstanceOf[(Any, Any)]
- val bucketId = partitioner.getPartition(pair._1)
- buckets(bucketId) += pair
- }
- buckets.map(_.iterator)
- }
+ // Partition the map output.
+ val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
+ for (elem <- rdd.iterator(split)) {
+ val pair = elem.asInstanceOf[(Any, Any)]
+ val bucketId = partitioner.getPartition(pair._1)
+ buckets(bucketId) += pair
+ }
+ val bucketIterators = buckets.map(_.iterator)
val compressedSizes = new Array[Byte](numOutputSplits)
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 1149c00a23..4846b66729 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -22,7 +22,7 @@ import spark.storage.BlockManagerId
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
- val shuffleDep: Option[ShuffleDependency[_,_,_]], // Output shuffle if stage is a map stage
+ val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage],
val priority: Int)
extends Logging {