aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-13 19:34:32 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-13 19:34:32 -0800
commitcb867e9ffb2c5e3d65d50c222fcce3631b94e4dd (patch)
treef0184718e240d3f33964b6f3c0c5c6ce581c29a2 /core
parent72408e8dfacc24652f376d1ee4dd6f04edb54804 (diff)
parent9a344098101d23fe2edba34dbfd9dbf191540e43 (diff)
downloadspark-cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd.tar.gz
spark-cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd.tar.bz2
spark-cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd.zip
Merge branch 'master' of github.com:mesos/spark
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala16
1 files changed, 12 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index de0d9fad88..ce5f171911 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -1,7 +1,8 @@
package spark.rdd
+import java.util.{HashMap => JHashMap}
+import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -71,9 +72,16 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
val split = s.asInstanceOf[CoGroupSplit]
val numRdds = split.deps.size
- val map = new HashMap[K, Seq[ArrayBuffer[Any]]]
+ val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
- map.getOrElseUpdate(k, Array.fill(numRdds)(new ArrayBuffer[Any]))
+ val seq = map.get(k)
+ if (seq != null) {
+ seq
+ } else {
+ val seq = Array.fill(numRdds)(new ArrayBuffer[Any])
+ map.put(k, seq)
+ seq
+ }
}
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, itsSplit) => {
@@ -93,6 +101,6 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner)
fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
}
}
- map.iterator
+ JavaConversions.mapAsScalaMap(map).iterator
}
}