diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-05 21:26:44 -0600 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-05 21:26:45 -0600 |
commit | f4d43cb43e64ec3436a129cf3f7a177374451060 (patch) | |
tree | 5cf0f2208998c12cc0c7000f962f0a54ea6c52a5 /core | |
parent | f2bc7480131c7468eb6d3bc6089a4deadf0a2a88 (diff) | |
download | spark-f4d43cb43e64ec3436a129cf3f7a177374451060.tar.gz spark-f4d43cb43e64ec3436a129cf3f7a177374451060.tar.bz2 spark-f4d43cb43e64ec3436a129cf3f7a177374451060.zip |
Remove unneeded zipWithIndex.
Also rename r->rdd and remove unneeded extra type info.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 4893fe8d78..021118c8ba 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -47,7 +47,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) @transient var deps_ = { val deps = new ArrayBuffer[Dependency[_]] - for ((rdd, index) <- rdds.zipWithIndex) { + for (rdd <- rdds) { if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) deps += new OneToOneDependency(rdd) @@ -65,12 +65,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) @transient var splits_ : Array[Split] = { val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { - array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) => + // Each CoGroupSplit will have a dependency per contributing RDD + array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) => + // Assume each RDD contributed a single dependency, and get it dependencies(j) match { case s: ShuffleDependency[_, _] => - new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep + new ShuffleCoGroupSplitDep(s.shuffleId) case _ => - new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep + new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i)) } }.toList) } @@ -97,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } } for ((dep, depNum) <- split.deps.zipWithIndex) dep match { - case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => { + case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent for ((k, v) <- rdd.iterator(itsSplit, context)) { getSeq(k.asInstanceOf[K])(depNum) += v |