aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-05 18:28:44 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-05 18:28:44 -0800
commit9cfa06837998f30e50b160bc7aaaad3b33a23c5e (patch)
treea280dd8fec5ef830853924217003f83ac541d25d /core
parent03eefbb2005e9e88fd79eecef3fc612e9f2ee623 (diff)
parent8bd0e888f377f13ac239df4ffd49fc666095e764 (diff)
downloadspark-9cfa06837998f30e50b160bc7aaaad3b33a23c5e.tar.gz
spark-9cfa06837998f30e50b160bc7aaaad3b33a23c5e.tar.bz2
spark-9cfa06837998f30e50b160bc7aaaad3b33a23c5e.zip
Merge pull request #450 from stephenh/inlinemergepair
Inline mergePair to look more like the narrow dep branch.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala10
1 files changed, 4 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 8fafd27bb6..4893fe8d78 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -84,6 +84,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
val split = s.asInstanceOf[CoGroupSplit]
val numRdds = split.deps.size
+ // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
val seq = map.get(k)
@@ -104,13 +105,10 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
- def mergePair(pair: (K, Seq[Any])) {
- val mySeq = getSeq(pair._1)
- for (v <- pair._2)
- mySeq(depNum) += v
- }
val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair)
+ for ((k, vs) <- fetcher.fetch[K, Seq[Any]](shuffleId, split.index)) {
+ getSeq(k)(depNum) ++= vs
+ }
}
}
JavaConversions.mapAsScalaMap(map).iterator