diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-11 23:40:57 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-11 23:40:57 -0800 |
commit | 18f4889d96b61b59569ec05f64900da1477404d0 (patch) | |
tree | 3dcb95406babf79d7234aa3851366d5b162dde72 /core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | |
parent | 4d9b0ab420df383869fa586b229ac00f234b8749 (diff) | |
parent | 288a878999848adb130041d1e40c14bfc879cec6 (diff) | |
download | spark-18f4889d96b61b59569ec05f64900da1477404d0.tar.gz spark-18f4889d96b61b59569ec05f64900da1477404d0.tar.bz2 spark-18f4889d96b61b59569ec05f64900da1477404d0.zip |
Merge remote-tracking branch 'apache/master' into error-handling
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index c118ddfc01..1248409e35 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -99,8 +99,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. - // A sanity check to make sure mergeCombiners is not defined. - assert(mergeCombiners == null) val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) @@ -267,8 +265,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) // into a hash table, leading to more objects in the old gen. def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false) + createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Seq[V])]] } @@ -339,7 +338,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * existing partitioner/parallelism level. */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { + : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } |