aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-11-27 22:38:09 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-11-27 22:38:09 -0800
commit3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887 (patch)
tree53f78e1930e1f0b2a43aa8af8f4ac80739b054eb
parent27e43abd192440de5b10a5cc022fd5705362b276 (diff)
downloadspark-3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887.tar.gz
spark-3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887.tar.bz2
spark-3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887.zip
Added zip to Java API
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala10
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java15
2 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 13fcee1004..482eb9281a 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -172,6 +172,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env))
+ /**
+ * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+ * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+ * partitions* and the *same number of elements in each partition* (e.g. one was made through
+ * a map on the other).
+ */
+ def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
+ JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
+ }
+
// Actions (launch a job to return a value to the user program)
/**
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5875506179..007bb28692 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -44,6 +44,8 @@ public class JavaAPISuite implements Serializable {
public void tearDown() {
sc.stop();
sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
}
static class ReverseIntComparator implements Comparator<Integer>, Serializable {
@@ -553,4 +555,17 @@ public class JavaAPISuite implements Serializable {
}
}).collect().toString());
}
+
+ @Test
+ public void zip() {
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
+ JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
+ @Override
+ public Double call(Integer x) {
+ return 1.0 * x;
+ }
+ });
+ JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
+ zipped.count();
+ }
}