aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java28
1 files changed, 28 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 4234f6eac7..2862ed3019 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -897,4 +897,32 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
}
+
+ @Test
+ public void collectPartition() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
+
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+
+ Assert.assertEquals(Arrays.asList(1, 2), rdd1.collectPartition(0));
+ Assert.assertEquals(Arrays.asList(3, 4), rdd1.collectPartition(1));
+ Assert.assertEquals(Arrays.asList(5, 6, 7), rdd1.collectPartition(2));
+
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(2, 0)),
+ rdd2.collectPartition(0));
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
+ new Tuple2<Integer, Integer>(4, 0)),
+ rdd2.collectPartition(1));
+ Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
+ new Tuple2<Integer, Integer>(6, 0),
+ new Tuple2<Integer, Integer>(7, 1)),
+ rdd2.collectPartition(2));
+ }
+
}