aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/Aggregator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/Aggregator.scala')
-rw-r--r--core/src/main/scala/spark/Aggregator.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 136b4da61e..9af401986d 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -28,18 +28,18 @@ import scala.collection.JavaConversions._
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
case class Aggregator[K, V, C] (
- val createCombiner: V => C,
- val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C) {
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C) {
- def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
- for ((k, v) <- iter) {
- val oldC = combiners.get(k)
+ for (kv <- iter) {
+ val oldC = combiners.get(kv._1)
if (oldC == null) {
- combiners.put(k, createCombiner(v))
+ combiners.put(kv._1, createCombiner(kv._2))
} else {
- combiners.put(k, mergeValue(oldC, v))
+ combiners.put(kv._1, mergeValue(oldC, kv._2))
}
}
combiners.iterator
@@ -47,7 +47,7 @@ case class Aggregator[K, V, C] (
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
- for ((k, c) <- iter) {
+ iter.foreach { case(k, c) =>
val oldC = combiners.get(k)
if (oldC == null) {
combiners.put(k, c)