aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-08 18:13:00 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-10-13 14:59:20 -0700
commit10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e (patch)
tree4869648ee9f4466e1e5c88caae19db82d8a83af1 /core/src/main
parent4775c55641f281523f105f9272f164033242a0aa (diff)
downloadspark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.gz
spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.tar.bz2
spark-10bcd217d2c9fcd7822d4399cfb9a0c9a05bc56e.zip
Remove mapSideCombine field from Aggregator.
Instead, the presence or absense of a ShuffleDependency's aggregator will control whether map-side combining is performed.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/Aggregator.scala6
-rw-r--r--core/src/main/scala/spark/Dependency.scala2
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala2
4 files changed, 10 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 8d4f982413..df8ce9c054 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -9,15 +9,11 @@ import scala.collection.JavaConversions._
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
- * @param mapSideCombine whether to apply combiners on map partitions, also
- * known as map-side aggregations. When set to false,
- * mergeCombiners function is not used.
*/
case class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C,
- val mapSideCombine: Boolean = true) {
+ val mergeCombiners: (C, C) => C) {
def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index 19a51dd5b8..5a67073ef4 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -22,7 +22,7 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* Represents a dependency on the output of a shuffle stage.
* @param shuffleId the shuffle id
* @param rdd the parent RDD
- * @param aggregator optional aggregator; this allows for map-side combining
+ * @param aggregator optional aggregator; if provided, map-side combining will be performed
* @param partitioner partitioner used to partition the shuffle output
*/
class ShuffleDependency[K, V, C](
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 04234491a6..8b1c29b065 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -14,6 +14,13 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
/**
* The resulting RDD from a shuffle (e.g. repartitioning of data).
+ * @param parent the parent RDD.
+ * @param aggregator if provided, this aggregator will be used to perform map-side combining.
+ * @param part the partitioner used to partition the RDD
+ * @tparam K the key class.
+ * @tparam V the value class.
+ * @tparam C if map side combiners are used, then this is the combiner type; otherwise,
+ * this is the same as V.
*/
class ShuffledRDD[K, V, C](
@transient parent: RDD[(K, V)],
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 86796d3677..c97be18844 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -114,7 +114,7 @@ private[spark] class ShuffleMapTask(
val partitioner = dep.partitioner
val bucketIterators =
- if (dep.aggregator.isDefined && dep.aggregator.get.mapSideCombine) {
+ if (dep.aggregator.isDefined) {
val aggregator = dep.aggregator.get.asInstanceOf[Aggregator[Any, Any, Any]]
// Apply combiners (map-side aggregation) to the map output.
val buckets = Array.tabulate(numOutputSplits)(_ => new JHashMap[Any, Any])