diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-13 19:34:32 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-13 19:34:32 -0800 |
commit | cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd (patch) | |
tree | f0184718e240d3f33964b6f3c0c5c6ce581c29a2 /core | |
parent | 72408e8dfacc24652f376d1ee4dd6f04edb54804 (diff) | |
parent | 9a344098101d23fe2edba34dbfd9dbf191540e43 (diff) | |
download | spark-cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd.tar.gz spark-cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd.tar.bz2 spark-cb867e9ffb2c5e3d65d50c222fcce3631b94e4dd.zip |
Merge branch 'master' of github.com:mesos/spark
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index de0d9fad88..ce5f171911 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -1,7 +1,8 @@ package spark.rdd +import java.util.{HashMap => JHashMap} +import scala.collection.JavaConversions import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext} import spark.{Dependency, OneToOneDependency, ShuffleDependency} @@ -71,9 +72,16 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { val split = s.asInstanceOf[CoGroupSplit] val numRdds = split.deps.size - val map = new HashMap[K, Seq[ArrayBuffer[Any]]] + val map = new JHashMap[K, Seq[ArrayBuffer[Any]]] def getSeq(k: K): Seq[ArrayBuffer[Any]] = { - map.getOrElseUpdate(k, Array.fill(numRdds)(new ArrayBuffer[Any])) + val seq = map.get(k) + if (seq != null) { + seq + } else { + val seq = Array.fill(numRdds)(new ArrayBuffer[Any]) + map.put(k, seq) + seq + } } for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, itsSplit) => { @@ -93,6 +101,6 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair) } } - map.iterator + JavaConversions.mapAsScalaMap(map).iterator } } |