aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-08-29 23:32:08 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2012-08-29 23:32:08 -0700
commit5945bcdcc56a71324357b02c21bef80dd7efd13a (patch)
tree39feabf60f9f54329e75dac08a88983b1a602d55
parentc68e820b2ae566674792c41275376e8a7c443797 (diff)
downloadspark-5945bcdcc56a71324357b02c21bef80dd7efd13a.tar.gz
spark-5945bcdcc56a71324357b02c21bef80dd7efd13a.tar.bz2
spark-5945bcdcc56a71324357b02c21bef80dd7efd13a.zip
Added a new flag in Aggregator to indicate applying map side combiners.
-rw-r--r--core/src/main/scala/spark/Aggregator.scala12
-rw-r--r--core/src/main/scala/spark/ShuffledRDD.scala13
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala2
3 files changed, 18 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 6f99270b1e..6516bea157 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -1,7 +1,17 @@
package spark
+/** A set of functions used to aggregate data.
+ *
+ * @param createCombiner function to create the initial value of the aggregation.
+ * @param mergeValue function to merge a new value into the aggregation result.
+ * @param mergeCombiners function to merge outputs from multiple mergeValue function.
+ * @param mapSideCombine whether to apply combiners on map partitions, also
+ * known as map-side aggregations. When set to false,
+ * mergeCombiners function is not used.
+ */
class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C)
+ val mergeCombiners: (C, C) => C,
+ val mapSideCombine: Boolean = true)
extends Serializable
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 8293048caa..3616d8e47e 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -29,10 +29,9 @@ class ShuffledRDD[K, V, C](
val combiners = new JHashMap[K, C]
val fetcher = SparkEnv.get.shuffleFetcher
- if (aggregator.mergeCombiners != null) {
- // If mergeCombiners is specified, combiners are applied on the map
- // partitions. In this case, post-shuffle we get a list of outputs from
- // the combiners and merge them using mergeCombiners.
+ if (aggregator.mapSideCombine) {
+ // Apply combiners on map partitions. In this case, post-shuffle we get a
+ // list of outputs from the combiners and merge them using mergeCombiners.
def mergePairWithMapSideCombiners(k: K, c: C) {
val oldC = combiners.get(k)
if (oldC == null) {
@@ -43,9 +42,9 @@ class ShuffledRDD[K, V, C](
}
fetcher.fetch[K, C](dep.shuffleId, split.index, mergePairWithMapSideCombiners)
} else {
- // If mergeCombiners is not specified, no combiner is applied on the map
- // partitions (i.e. map side aggregation is turned off). Post-shuffle we
- // get a list of values and we use mergeValue to merge them.
+ // Do not apply combiners on map partitions (i.e. map side aggregation is
+ // turned off). Post-shuffle we get a list of values and we use mergeValue
+ // to merge them.
def mergePairWithoutMapSideCombiners(k: K, v: V) {
val oldC = combiners.get(k)
if (oldC == null) {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 940932cc51..a281ae94c5 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -108,7 +108,7 @@ class ShuffleMapTask(
val partitioner = dep.partitioner
val bucketIterators =
- if (aggregator.mergeCombiners != null) {
+ if (aggregator.mapSideCombine) {
// Apply combiners (map-side aggregation) to the map output.
val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[Any, Any])
for (elem <- rdd.iterator(split)) {