diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-07 21:22:08 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-08 23:16:51 -0700 |
commit | 12d593129df8a434f66bd3d01812cab76f40e6b8 (patch) | |
tree | 651bbef30ba19e9f5e708d393269948f85294eba /core | |
parent | 0b35051f19bcd9c432574ad5c0a921d45cd902cb (diff) | |
download | spark-12d593129df8a434f66bd3d01812cab76f40e6b8.tar.gz spark-12d593129df8a434f66bd3d01812cab76f40e6b8.tar.bz2 spark-12d593129df8a434f66bd3d01812cab76f40e6b8.zip |
Create fewer function objects in uses of AppendOnlyMap.changeValue
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/Aggregator.scala | 30 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 10 |
2 files changed, 20 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 84e15fc0c8..1a2ec55876 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -33,28 +33,26 @@ case class Aggregator[K, V, C] ( def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { val combiners = new AppendOnlyMap[K, C] - for ((k, v) <- iter) { - combiners.changeValue(k, (hadValue, oldValue) => { - if (hadValue) { - mergeValue(oldValue, v) - } else { - createCombiner(v) - } - }) + var kv: Product2[K, V] = null + val update = (hadValue: Boolean, oldValue: C) => { + if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) + } + while (iter.hasNext) { + kv = iter.next() + combiners.changeValue(kv._1, update) } combiners.iterator } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { val combiners = new AppendOnlyMap[K, C] - for ((k, c) <- iter) { - combiners.changeValue(k, (hadValue, oldValue) => { - if (hadValue) { - mergeCombiners(oldValue, c) - } else { - c - } - }) + var kc: (K, C) = null + val update = (hadValue: Boolean, oldValue: C) => { + if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2 + } + while (iter.hasNext) { + kc = iter.next() + combiners.changeValue(kc._1, update) } combiners.iterator } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index f41a023bc1..d237797aa6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -106,10 +106,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] - def getSeq(k: K): Seq[ArrayBuffer[Any]] = { - map.changeValue(k, (hadValue, oldValue) => { - if (hadValue) oldValue else Array.fill(numRdds)(new ArrayBuffer[Any]) - }) + val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => { + if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any]) + } + + val getSeq = (k: K) => { + map.changeValue(k, update) } val ser = SparkEnv.get.serializerManager.get(serializerClass) |