aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:40:57 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-11 23:40:57 -0800
commit18f4889d96b61b59569ec05f64900da1477404d0 (patch)
tree3dcb95406babf79d7234aa3851366d5b162dde72 /core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
parent4d9b0ab420df383869fa586b229ac00f234b8749 (diff)
parent288a878999848adb130041d1e40c14bfc879cec6 (diff)
downloadspark-18f4889d96b61b59569ec05f64900da1477404d0.tar.gz
spark-18f4889d96b61b59569ec05f64900da1477404d0.tar.bz2
spark-18f4889d96b61b59569ec05f64900da1477404d0.zip
Merge remote-tracking branch 'apache/master' into error-handling
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala7
1 files changed, 3 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index c118ddfc01..1248409e35 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -99,8 +99,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
- // A sanity check to make sure mergeCombiners is not defined.
- assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
values.mapPartitionsWithContext((context, iter) => {
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter))
@@ -267,8 +265,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
// into a hash table, leading to more objects in the old gen.
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+ def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
+ createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
@@ -339,7 +338,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- : RDD[(K, C)] = {
+ : RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}