aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorwitgo <witgo@qq.com>2014-04-29 11:30:47 -0700
committerReynold Xin <rxin@apache.org>2014-04-29 11:30:47 -0700
commit7d1505841069c6ecc3fa7e4896db535f18e4ce84 (patch)
tree7d28c5f3e8db400d508072953ec92334fb5a23ba /core
parent8db0f7e28f5f0330a3344705ff48d8e7b97c383f (diff)
downloadspark-7d1505841069c6ecc3fa7e4896db535f18e4ce84.tar.gz
spark-7d1505841069c6ecc3fa7e4896db535f18e4ce84.tar.bz2
spark-7d1505841069c6ecc3fa7e4896db535f18e4ce84.zip
SPARK-1509: add zipWithIndex zipWithUniqueId methods to java api
Author: witgo <witgo@qq.com> Closes #423 from witgo/zipWithIndex and squashes the following commits: 039ec04 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex 24d74c9 [witgo] review commit 763a5e4 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex 59747d1 [witgo] review commit 7bf4d06 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex daa8f84 [witgo] review commit 4070613 [witgo] Merge branch 'master' of https://github.com/apache/spark into zipWithIndex 18e6c97 [witgo] java api zipWithIndex test 11e2e7f [witgo] add zipWithIndex zipWithUniqueId methods to java api
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala22
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java31
2 files changed, 45 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 574a98636a..af06d1dca9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -18,7 +18,7 @@
package org.apache.spark.api.java
import java.util.{Comparator, List => JList, Iterator => JIterator}
-import java.lang.{Iterable => JIterable}
+import java.lang.{Iterable => JIterable, Long => JLong}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -264,6 +264,26 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
}
+ /**
+ * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
+ * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
+ * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
+ */
+ def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
+ JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
+ }
+
+ /**
+ * Zips this RDD with its element indices. The ordering is first based on the partition index
+ * and then the ordering of items within each partition. So the first item in the first
+ * partition gets index 0, and the last item in the last partition receives the largest index.
+ * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
+ * This method needs to trigger a spark job when this RDD contains more than one partitions.
+ */
+ def zipWithIndex(): JavaPairRDD[T, JLong] = {
+ JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
+ }
+
// Actions (launch a job to return a value to the user program)
/**
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 76c6f5af82..c3e03cea91 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -182,13 +182,30 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(2, foreachCalls);
}
- @Test
- public void toLocalIterator() {
- List<Integer> correct = Arrays.asList(1, 2, 3, 4);
- JavaRDD<Integer> rdd = sc.parallelize(correct);
- List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
- Assert.assertTrue(correct.equals(result));
- }
+ @Test
+ public void toLocalIterator() {
+ List<Integer> correct = Arrays.asList(1, 2, 3, 4);
+ JavaRDD<Integer> rdd = sc.parallelize(correct);
+ List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
+ Assert.assertTrue(correct.equals(result));
+ }
+
+ @Test
+ public void zipWithUniqueId() {
+ List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
+ JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithUniqueId();
+ JavaRDD<Long> indexes = zip.values();
+ Assert.assertTrue(new HashSet<Long>(indexes.collect()).size() == 4);
+ }
+
+ @Test
+ public void zipWithIndex() {
+ List<Integer> dataArray = Arrays.asList(1, 2, 3, 4);
+ JavaPairRDD<Integer, Long> zip = sc.parallelize(dataArray).zipWithIndex();
+ JavaRDD<Long> indexes = zip.values();
+ List<Long> correctIndexes = Arrays.asList(0L, 1L, 2L, 3L);
+ Assert.assertTrue(indexes.collect().equals(correctIndexes));
+ }
@SuppressWarnings("unchecked")
@Test