aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-06-13 23:59:42 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-06-13 23:59:42 -0700
commit6738178d0daf1bbe7441db7c0c773a29bb2ec388 (patch)
tree5bcaf32baabc3e93a25463a6d44f4e3a2f01cb37 /core/src
parentd93851aedf4821a4895afd1a397a875658358137 (diff)
downloadspark-6738178d0daf1bbe7441db7c0c773a29bb2ec388.tar.gz
spark-6738178d0daf1bbe7441db7c0c773a29bb2ec388.tar.bz2
spark-6738178d0daf1bbe7441db7c0c773a29bb2ec388.zip
SPARK-772: groupByKey should disable map side combine.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala8
1 files changed, 5 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 15593db0d9..fa4bbfc76f 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -19,7 +19,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
import spark.partial.BoundedDouble
import spark.partial.PartialResult
@@ -187,11 +187,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+ // groupByKey shouldn't use map side combine because map side combine does not
+ // reduce the amount of data shuffled and requires all map side data be inserted
+ // into a hash table, leading to more objects in the old gen.
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner)
+ createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}