diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-05 17:50:25 -0600 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-05 17:50:25 -0600 |
commit | 8bd0e888f377f13ac239df4ffd49fc666095e764 (patch) | |
tree | 33f0831ab0dd3bba69107c0af1d78c1cccbc8339 /core/src | |
parent | ae26911ec0d768dcdae8b7d706ca4544e36535e6 (diff) | |
download | spark-8bd0e888f377f13ac239df4ffd49fc666095e764.tar.gz spark-8bd0e888f377f13ac239df4ffd49fc666095e764.tar.bz2 spark-8bd0e888f377f13ac239df4ffd49fc666095e764.zip |
Inline mergePair to look more like the narrow dep branch.
No functionality changes, I think this is just more consistent
given mergePair isn't called multiple times/recursive.
Also added a comment to explain the usual case of having two parent RDDs.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 10 |
1 files changed, 4 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 8fafd27bb6..4893fe8d78 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -84,6 +84,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { val split = s.asInstanceOf[CoGroupSplit] val numRdds = split.deps.size + // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) val map = new JHashMap[K, Seq[ArrayBuffer[Any]]] def getSeq(k: K): Seq[ArrayBuffer[Any]] = { val seq = map.get(k) @@ -104,13 +105,10 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle - def mergePair(pair: (K, Seq[Any])) { - val mySeq = getSeq(pair._1) - for (v <- pair._2) - mySeq(depNum) += v - } val fetcher = SparkEnv.get.shuffleFetcher - fetcher.fetch[K, Seq[Any]](shuffleId, split.index).foreach(mergePair) + for ((k, vs) <- fetcher.fetch[K, Seq[Any]](shuffleId, split.index)) { + getSeq(k)(depNum) ++= vs + } } } JavaConversions.mapAsScalaMap(map).iterator |