aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-05 21:26:44 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-05 21:26:45 -0600
commitf4d43cb43e64ec3436a129cf3f7a177374451060 (patch)
tree5cf0f2208998c12cc0c7000f962f0a54ea6c52a5 /core/src/main/scala/spark
parentf2bc7480131c7468eb6d3bc6089a4deadf0a2a88 (diff)
downloadspark-f4d43cb43e64ec3436a129cf3f7a177374451060.tar.gz
spark-f4d43cb43e64ec3436a129cf3f7a177374451060.tar.bz2
spark-f4d43cb43e64ec3436a129cf3f7a177374451060.zip
Remove unneeded zipWithIndex.
Also rename r->rdd and remove unneeded extra type info.
Diffstat (limited to 'core/src/main/scala/spark')
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala12
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 4893fe8d78..021118c8ba 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -47,7 +47,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
@transient var deps_ = {
val deps = new ArrayBuffer[Dependency[_]]
- for ((rdd, index) <- rdds.zipWithIndex) {
+ for (rdd <- rdds) {
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
deps += new OneToOneDependency(rdd)
@@ -65,12 +65,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
@transient var splits_ : Array[Split] = {
val array = new Array[Split](part.numPartitions)
for (i <- 0 until array.size) {
- array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (r, j) =>
+ // Each CoGroupSplit will have a dependency per contributing RDD
+ array(i) = new CoGroupSplit(i, rdds.zipWithIndex.map { case (rdd, j) =>
+ // Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _] =>
- new ShuffleCoGroupSplitDep(s.shuffleId): CoGroupSplitDep
+ new ShuffleCoGroupSplitDep(s.shuffleId)
case _ =>
- new NarrowCoGroupSplitDep(r, i, r.splits(i)): CoGroupSplitDep
+ new NarrowCoGroupSplitDep(rdd, i, rdd.splits(i))
}
}.toList)
}
@@ -97,7 +99,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
}
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
- case NarrowCoGroupSplitDep(rdd, itsSplitIndex, itsSplit) => {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
for ((k, v) <- rdd.iterator(itsSplit, context)) {
getSeq(k.asInstanceOf[K])(depNum) += v