aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-04-28 05:11:03 -0700
committerShivaram Venkataraman <shivaram@eecs.berkeley.edu>2013-04-28 05:11:03 -0700
commit0cc6642b7c6fbb4167956b668603f2ea6fb5ac8e (patch)
treec27862f3254e2c7c5685eb9b9e42b24bb43f0d24
parentc9c4954d994c5ba824e71c1c5cd8d5de531caf78 (diff)
downloadspark-0cc6642b7c6fbb4167956b668603f2ea6fb5ac8e.tar.gz
spark-0cc6642b7c6fbb4167956b668603f2ea6fb5ac8e.tar.bz2
spark-0cc6642b7c6fbb4167956b668603f2ea6fb5ac8e.zip
Rename to zipPartitions and style changes
-rw-r--r--core/src/main/scala/spark/RDD.scala24
-rw-r--r--core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala22
-rw-r--r--core/src/test/scala/spark/MapZippedPartitionsSuite.scala2
3 files changed, 25 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 8e7e1457c1..bded55238f 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -439,22 +439,22 @@ abstract class RDD[T: ClassManifest](
*/
def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
- def zipAndMapPartitions[B: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B]) => Iterator[V],
- rdd2: RDD[B]) =
+ def zipPartitions[B: ClassManifest, V: ClassManifest](
+ f: (Iterator[T], Iterator[B]) => Iterator[V],
+ rdd2: RDD[B]) =
new MapZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
- def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C]) =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
+ f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
+ rdd2: RDD[B],
+ rdd3: RDD[C]) =
new MapZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
- def zipAndMapPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C],
- rdd4: RDD[D]) =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
+ f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
+ rdd2: RDD[B],
+ rdd3: RDD[C],
+ rdd4: RDD[D]) =
new MapZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
diff --git a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala
index 6653b3b444..3520fd24b0 100644
--- a/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapZippedPartitionsRDD.scala
@@ -4,13 +4,13 @@ import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
private[spark] class MapZippedPartition(
- idx: Int,
- @transient rdds: Seq[RDD[_]]
- ) extends Partition {
+ idx: Int,
+ @transient rdds: Seq[RDD[_]])
+ extends Partition {
override val index: Int = idx
var partitionValues = rdds.map(rdd => rdd.partitions(idx))
- def partitions = partitionValues
+ def partitions = partitionValues
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
@@ -68,7 +68,8 @@ class MapZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManife
}
}
-class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
+class MapZippedPartitionsRDD3
+ [A: ClassManifest, B: ClassManifest, C: ClassManifest, V: ClassManifest](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
@@ -78,8 +79,8 @@ class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManife
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[MapZippedPartition].partitions
- f(rdd1.iterator(partitions(0), context),
- rdd2.iterator(partitions(1), context),
+ f(rdd1.iterator(partitions(0), context),
+ rdd2.iterator(partitions(1), context),
rdd3.iterator(partitions(2), context))
}
@@ -91,7 +92,8 @@ class MapZippedPartitionsRDD3[A: ClassManifest, B: ClassManifest, C: ClassManife
}
}
-class MapZippedPartitionsRDD4[A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
+class MapZippedPartitionsRDD4
+ [A: ClassManifest, B: ClassManifest, C: ClassManifest, D:ClassManifest, V: ClassManifest](
sc: SparkContext,
f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
var rdd1: RDD[A],
@@ -102,8 +104,8 @@ class MapZippedPartitionsRDD4[A: ClassManifest, B: ClassManifest, C: ClassManife
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[MapZippedPartition].partitions
- f(rdd1.iterator(partitions(0), context),
- rdd2.iterator(partitions(1), context),
+ f(rdd1.iterator(partitions(0), context),
+ rdd2.iterator(partitions(1), context),
rdd3.iterator(partitions(2), context),
rdd4.iterator(partitions(3), context))
}
diff --git a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala
index f65a646416..834b517cbc 100644
--- a/core/src/test/scala/spark/MapZippedPartitionsSuite.scala
+++ b/core/src/test/scala/spark/MapZippedPartitionsSuite.scala
@@ -24,7 +24,7 @@ class MapZippedPartitionsSuite extends FunSuite with LocalSparkContext {
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
- val zippedRDD = data1.zipAndMapPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3)
+ val zippedRDD = data1.zipPartitions(MapZippedPartitionsSuite.procZippedData, data2, data3)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)