diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-03-03 22:31:30 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-03 22:31:30 -0800 |
commit | 181ec5030792a10f3ce77e997d0e2eda9bcd6139 (patch) | |
tree | 9b88504e5a3eca8177e4ebe1257ea9ce56120c13 /streaming/src/test/java/org/apache | |
parent | b14ede789abfabe25144385e8dc2fb96691aba81 (diff) | |
download | spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.gz spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.bz2 spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.zip |
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Patrick Wendell <pwendell@gmail.com>
Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits:
95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch.
85a954e [Prashant Sharma] Nit. import orderings.
673f7ac [Prashant Sharma] Added support for -java-home as well
80a13e8 [Prashant Sharma] Used fake class tag syntax
26eb3f6 [Prashant Sharma] Patrick's comments on PR.
35d8d79 [Prashant Sharma] Specified java 8 building in the docs
31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag.
4ab87d3 [Prashant Sharma] Review feedback on the pr
c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 54a0791d04..e93bf18b6d 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -247,14 +247,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Assert.assertEquals(expected, result); } - private class IntegerSum extends Function2<Integer, Integer, Integer> { + private class IntegerSum implements Function2<Integer, Integer, Integer> { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } } - private class IntegerDifference extends Function2<Integer, Integer, Integer> { + private class IntegerDifference implements Function2<Integer, Integer, Integer> { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 - i2; @@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, Integer> transformed3 = stream.transform( + JavaPairDStream<String, Integer> transformed3 = stream.transformToPair( new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { return null; @@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, Integer> transformed4 = stream.transform( + JavaPairDStream<String, Integer> transformed4 = stream.transformToPair( new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { return null; @@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, String> pairTransformed3 = pairStream.transform( + JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair( new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { return null; @@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<String, String> pairTransformed4 = pairStream.transform( + JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair( new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { return null; @@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, stringStringKVStream2, 1); JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair( pairStream2, new Function3< JavaPairRDD<String, String>, @@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> transformed3 = stream1.transformWith( + JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair( stream2, new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> transformed4 = stream1.transformWith( + JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair( pairStream1, new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith( + JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair( stream2, new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa } ); - JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith( + JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair( pairStream2, new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { @Override @@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa List<JavaDStream<?>> listOfDStreams2 = Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream()); - JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform( + JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair( listOfDStreams2, new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() { public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) { @@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa return new Tuple2<Integer, Integer>(i, i); } }; - return rdd1.union(rdd2).map(mapToTuple).join(prdd3); + return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3); } } ); @@ -742,17 +742,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<Integer, String>(9, "s"))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<Integer,String> flatMapped = stream.flatMap( - new PairFlatMapFunction<String, Integer, String>() { - @Override - public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { - List<Tuple2<Integer, String>> out = Lists.newArrayList(); - for (String letter: in.split("(?!^)")) { - out.add(new Tuple2<Integer, String>(in.length(), letter)); - } - return out; + JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair( + new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); } - }); + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream<String, Integer> pairStream = stream.map( + JavaPairDStream<String, Integer> pairStream = stream.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String in) throws Exception { @@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.map( + JavaPairDStream<Integer, String> reversed = pairStream.mapToPair( new PairFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { @@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions( + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair( new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception { @@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap( + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { @@ -1228,7 +1228,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), + new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1300,7 +1301,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, inputData, 1); JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<Integer, Integer> sorted = pairStream.transform( + JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair( new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { @Override public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { @@ -1632,7 +1633,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @Test public void testSocketString() { - class Converter extends Function<InputStream, Iterable<String>> { + + class Converter implements Function<InputStream, Iterable<String>> { public Iterable<String> call(InputStream in) throws IOException { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); List<String> out = new ArrayList<String>(); |