diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-11-27 22:38:09 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-11-27 22:38:09 -0800 |
commit | 3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887 (patch) | |
tree | 53f78e1930e1f0b2a43aa8af8f4ac80739b054eb | |
parent | 27e43abd192440de5b10a5cc022fd5705362b276 (diff) | |
download | spark-3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887.tar.gz spark-3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887.tar.bz2 spark-3ebd8e18853bfca6f0bcd99ac79f0c6717aa0887.zip |
Added zip to Java API
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDDLike.scala | 10 | ||||
-rw-r--r-- | core/src/test/scala/spark/JavaAPISuite.java | 15 |
2 files changed, 25 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 13fcee1004..482eb9281a 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -172,6 +172,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] = rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env)) + /** + * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, + * second element in each RDD, etc. Assumes that the two RDDs have the *same number of + * partitions* and the *same number of elements in each partition* (e.g. one was made through + * a map on the other). + */ + def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = { + JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest) + } + // Actions (launch a job to return a value to the user program) /** diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 5875506179..007bb28692 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -44,6 +44,8 @@ public class JavaAPISuite implements Serializable { public void tearDown() { sc.stop(); sc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); } static class ReverseIntComparator implements Comparator<Integer>, Serializable { @@ -553,4 +555,17 @@ public class JavaAPISuite implements Serializable { } }).collect().toString()); } + + @Test + public void zip() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); + JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() { + @Override + public Double call(Integer x) { + return 1.0 * x; + } + }); + JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles); + zipped.count(); + } } |