diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-06-13 23:59:42 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-06-13 23:59:42 -0700 |
commit | 6738178d0daf1bbe7441db7c0c773a29bb2ec388 (patch) | |
tree | 5bcaf32baabc3e93a25463a6d44f4e3a2f01cb37 | |
parent | d93851aedf4821a4895afd1a397a875658358137 (diff) | |
download | spark-6738178d0daf1bbe7441db7c0c773a29bb2ec388.tar.gz spark-6738178d0daf1bbe7441db7c0c773a29bb2ec388.tar.bz2 spark-6738178d0daf1bbe7441db7c0c773a29bb2ec388.zip |
SPARK-772: groupByKey should disable map side combine.
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 15593db0d9..fa4bbfc76f 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -19,7 +19,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil} import spark.partial.BoundedDouble import spark.partial.PartialResult @@ -187,11 +187,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + // groupByKey shouldn't use map side combine because map side combine does not + // reduce the amount of data shuffled and requires all map side data be inserted + // into a hash table, leading to more objects in the old gen. def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner) + createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Seq[V])]] } |