aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-16 14:02:34 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-16 14:02:34 -0700
commite89ffc7b3cd797b3dcc5946eb9dcdb64d15ea23d (patch)
treeb52785b01560da251f045446f042ddcfb9ef9a2f /core
parent1fb1b0992838c8cdd57eec45793e67a0490f1a52 (diff)
parent53b2639a1e26e94f4a4fa8856f462b281c90d8b1 (diff)
downloadspark-e89ffc7b3cd797b3dcc5946eb9dcdb64d15ea23d.tar.gz
spark-e89ffc7b3cd797b3dcc5946eb9dcdb64d15ea23d.tar.bz2
spark-e89ffc7b3cd797b3dcc5946eb9dcdb64d15ea23d.zip
Merge pull request #839 from jegonzal/zip_partitions
Currying RDD.zipPartitions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala21
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala6
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/spark/ZippedPartitionsSuite.scala2
4 files changed, 14 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 160b3e9d83..503ea6ccbf 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -515,22 +515,19 @@ abstract class RDD[T: ClassManifest](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
- def zipPartitions[B: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B]) => Iterator[V],
- rdd2: RDD[B]): RDD[V] =
+ def zipPartitions[B: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B])
+ (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C]): RDD[V] =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C])
+ (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C],
- rdd4: RDD[D]): RDD[V] =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
+ (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index e0255ed23e..2c2b138f16 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -207,12 +207,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of elements in each partition.
*/
def zipPartitions[U, V](
- f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V],
- other: JavaRDDLike[U, _]): JavaRDD[V] = {
+ other: JavaRDDLike[U, _],
+ f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
- rdd.zipPartitions(fn, other.rdd)(other.classManifest, f.elementType()))(f.elementType())
+ rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
}
// Actions (launch a job to return a value to the user program)
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 4ab271de1a..c337c49268 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -748,7 +748,7 @@ public class JavaAPISuite implements Serializable {
}
};
- JavaRDD<Integer> sizes = rdd1.zipPartitions(sizesFn, rdd2);
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
index 5e6d7b09d8..bb5d379273 100644
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
@@ -40,7 +40,7 @@ class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
- val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
+ val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)