From bd336f5f406386c929f2d1f9aecd7d5190a1a087 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 10 Jan 2013 17:13:04 -0800 Subject: Changed CoGroupRDD's hash map from Scala to Java. --- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index de0d9fad88..2e051c81c8 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -1,7 +1,8 @@ package spark.rdd +import java.util.{HashMap => JHashMap} import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Split, TaskContext} import spark.{Dependency, OneToOneDependency, ShuffleDependency} @@ -71,7 +72,7 @@ class CoGroupedRDD[K](@transient rdds: Seq[RDD[(_, _)]], part: Partitioner) override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { val split = s.asInstanceOf[CoGroupSplit] val numRdds = split.deps.size - val map = new HashMap[K, Seq[ArrayBuffer[Any]]] + val map = new JHashMap[K, Seq[ArrayBuffer[Any]]] def getSeq(k: K): Seq[ArrayBuffer[Any]] = { map.getOrElseUpdate(k, Array.fill(numRdds)(new ArrayBuffer[Any])) } -- cgit v1.2.3