aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala12
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 4893fe8d78..021118c8ba 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -47,7 +47,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
@transient var deps_ = {
val deps = new ArrayBuffer[Dependency[_]]
- for ((rdd, index) <- rdds.zipWithIndex) {
+ for (rdd <- rdds) {
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
deps += new OneToOneDependency(rdd)
@@ -65,12 +65,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
@transient var splits_ : Array[Split] = {
val array = new Array[Split](part.numPartitions)
for (i <- 0 until array.size) {
- array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
+ // Each CoGroupSplit will have a dependency per contributing RDD
+ array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) =>
+ // Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _] =>
- new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
+ new ShuffleCoGroupSplitDep(s.shuffleId)
case _ =>
- new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep
+ new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
}
}.toList)
}
@@ -97,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
}
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
- case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
for ((k, v) <- rdd.iterator(itsSplit, context)) {
getSeq(k.asInstanceOf[K])(depNum) += v