diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-02-16 16:36:12 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-02-19 08:31:58 -0800 |
commit | 35880de42edb30cf705036083710c85a74a351fa (patch) | |
tree | 8edb0259819b287ad353e99c9fd250e73b5d6c01 /streaming/src/test/java | |
parent | 9d49a6b03fb91d516bf40e50f67e87155c69dba1 (diff) | |
download | spark-35880de42edb30cf705036083710c85a74a351fa.tar.gz spark-35880de42edb30cf705036083710c85a74a351fa.tar.bz2 spark-35880de42edb30cf705036083710c85a74a351fa.zip |
Use RDD type for `transform` operator in Java.
This is an improved implementation of the `transform` operator in Java.
The main difference is that this allows all four possible types of
transform functions
1. JavaRDD -> JavaRDD
2. JavaRDD -> JavaPairRDD
3. JavaPairRDD -> JavaPairRDD
4. JavaPairRDD -> JavaRDD
whereas previously only (1) and (3) were possible.
Conflicts:
streaming/src/test/java/spark/streaming/JavaAPISuite.java
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 89 |
1 files changed, 87 insertions, 2 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 4fe2de5a1a..9be680dbdc 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -294,8 +294,9 @@ public class JavaAPISuite implements Serializable { Arrays.asList(6,7,8), Arrays.asList(9,10,11)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> transformed = + stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { return in.map(new Function<Integer, Integer>() { @@ -742,6 +743,90 @@ public class JavaAPISuite implements Serializable { } @Test + public void testPairTransform() { + List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(2, 5)), + Arrays.asList( + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(1, 5))); + + List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5)), + Arrays.asList( + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5))); + + JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<Integer, Integer> sorted = pairStream.transform( + new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { + @Override + public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + return in.sortByKey(); + } + }); + + JavaTestUtils.attachTestOutputStream(sorted); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairToNormalRDDTransform() { + List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(2, 5)), + Arrays.asList( + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(1, 5))); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(3,1,4,2), + Arrays.asList(2,3,4,1)); + + JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaDStream<Integer> firstParts = pairStream.transform( + new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + return in.map(new Function<Tuple2<Integer, Integer>, Integer>() { + @Override + public Integer call(Tuple2<Integer, Integer> in) { + return in._1(); + } + }); + } + }); + + JavaTestUtils.attachTestOutputStream(firstParts); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + public void testMapValues() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; |