aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-07 21:22:08 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-08 23:16:51 -0700
commit12d593129df8a434f66bd3d01812cab76f40e6b8 (patch)
tree651bbef30ba19e9f5e708d393269948f85294eba /core
parent0b35051f19bcd9c432574ad5c0a921d45cd902cb (diff)
downloadspark-12d593129df8a434f66bd3d01812cab76f40e6b8.tar.gz
spark-12d593129df8a434f66bd3d01812cab76f40e6b8.tar.bz2
spark-12d593129df8a434f66bd3d01812cab76f40e6b8.zip
Create fewer function objects in uses of AppendOnlyMap.changeValue
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala10
2 files changed, 20 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 84e15fc0c8..1a2ec55876 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -33,28 +33,26 @@ case class Aggregator[K, V, C] (
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C]
- for ((k, v) <- iter) {
- combiners.changeValue(k, (hadValue, oldValue) => {
- if (hadValue) {
- mergeValue(oldValue, v)
- } else {
- createCombiner(v)
- }
- })
+ var kv: Product2[K, V] = null
+ val update = (hadValue: Boolean, oldValue: C) => {
+ if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
+ }
+ while (iter.hasNext) {
+ kv = iter.next()
+ combiners.changeValue(kv._1, update)
}
combiners.iterator
}
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new AppendOnlyMap[K, C]
- for ((k, c) <- iter) {
- combiners.changeValue(k, (hadValue, oldValue) => {
- if (hadValue) {
- mergeCombiners(oldValue, c)
- } else {
- c
- }
- })
+ var kc: (K, C) = null
+ val update = (hadValue: Boolean, oldValue: C) => {
+ if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
+ }
+ while (iter.hasNext) {
+ kc = iter.next()
+ combiners.changeValue(kc._1, update)
}
combiners.iterator
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index f41a023bc1..d237797aa6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -106,10 +106,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]]
- def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
- map.changeValue(k, (hadValue, oldValue) => {
- if (hadValue) oldValue else Array.fill(numRdds)(new ArrayBuffer[Any])
- })
+ val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => {
+ if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
+ }
+
+ val getSeq = (k: K) => {
+ map.changeValue(k, update)
}
val ser = SparkEnv.get.serializerManager.get(serializerClass)