diff options
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 4893fe8d78..021118c8ba 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -47,7 +47,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) @transient var deps_ = { val deps = new ArrayBuffer[Dependency[_]] - for ((rdd, index) <- rdds.zipWithIndex) { + for (rdd <- rdds) { if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) deps += new OneToOneDependency(rdd) @@ -65,12 +65,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) @transient var splits_ : Array[Split] = { val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { - array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => + // Each CoGroupSplit will have a dependency per contributing RDD + array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) => + // Assume each RDD contributed a single dependency, and get it dependencies(j) match { case s: ShuffleDependency[_, _] => - new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep + new ShuffleCoGroupSplitDep(s.shuffleId) case _ => - new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep + new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i)) } }.toList) } @@ -97,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } } for ((dep, depNum) <- split.deps.zipWithIndex) dep match { - case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => { + case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent for ((k, v) <- rdd.iterator(itsSplit, context)) { getSeq(k.asInstanceOf[K])(depNum) += v |