diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-04-07 21:15:21 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-04-07 21:15:21 -0400 |
commit | 054feb6448578de5542f9ef54d4cc88f706c22f5 (patch) | |
tree | f42a57c3ebc893461bd0de18db05c9da92af5c53 /core | |
parent | b5900d47b1386f5bc21df5db32b09b2d44e9dba7 (diff) | |
download | spark-054feb6448578de5542f9ef54d4cc88f706c22f5.tar.gz spark-054feb6448578de5542f9ef54d4cc88f706c22f5.tar.bz2 spark-054feb6448578de5542f9ef54d4cc88f706c22f5.zip |
Fixed a bug with zip
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/ZippedRDD.scala | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index e80ec17aa5..35b0e06785 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest]( @transient rdd2: RDD[U] ) extends Partition { - var split1 = rdd1.partitions(idx) - var split2 = rdd1.partitions(idx) + var partition1 = rdd1.partitions(idx) + var partition2 = rdd2.partitions(idx) override val index: Int = idx - def splits = (split1, split2) + def partitions = (partition1, partition2) @throws(classOf[IOException]) private def writeObject(oos: ObjectOutputStream) { - // Update the reference to parent split at the time of task serialization - split1 = rdd1.partitions(idx) - split2 = rdd2.partitions(idx) + // Update the reference to parent partition at the time of task serialization + partition1 = rdd1.partitions(idx) + partition2 = rdd2.partitions(idx) oos.defaultWriteObject() } } @@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context)) } override def getPreferredLocations(s: Partition): Seq[String] = { - val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits - rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2)) + val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions + rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2)) } override def clearDependencies() { |