aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-03-03 22:31:30 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-03-03 22:31:30 -0800
commit181ec5030792a10f3ce77e997d0e2eda9bcd6139 (patch)
tree9b88504e5a3eca8177e4ebe1257ea9ce56120c13 /streaming/src/test/java/org/apache
parentb14ede789abfabe25144385e8dc2fb96691aba81 (diff)
downloadspark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.gz
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.tar.bz2
spark-181ec5030792a10f3ce77e997d0e2eda9bcd6139.zip
[java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Author: Prashant Sharma <prashant.s@imaginea.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits: 95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch. 85a954e [Prashant Sharma] Nit. import orderings. 673f7ac [Prashant Sharma] Added support for -java-home as well 80a13e8 [Prashant Sharma] Used fake class tag syntax 26eb3f6 [Prashant Sharma] Patrick's comments on PR. 35d8d79 [Prashant Sharma] Specified java 8 building in the docs 31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag. 4ab87d3 [Prashant Sharma] Review feedback on the pr c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java62
1 files changed, 32 insertions, 30 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 54a0791d04..e93bf18b6d 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -247,14 +247,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Assert.assertEquals(expected, result);
}
- private class IntegerSum extends Function2<Integer, Integer, Integer> {
+ private class IntegerSum implements Function2<Integer, Integer, Integer> {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
}
- private class IntegerDifference extends Function2<Integer, Integer, Integer> {
+ private class IntegerDifference implements Function2<Integer, Integer, Integer> {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 - i2;
@@ -392,7 +392,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, Integer> transformed3 = stream.transform(
+ JavaPairDStream<String, Integer> transformed3 = stream.transformToPair(
new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
return null;
@@ -400,7 +400,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, Integer> transformed4 = stream.transform(
+ JavaPairDStream<String, Integer> transformed4 = stream.transformToPair(
new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
return null;
@@ -424,7 +424,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transformToPair(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
@Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
return null;
@@ -432,7 +432,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+ JavaPairDStream<String, String> pairTransformed4 = pairStream.transformToPair(
new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
@Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
return null;
@@ -482,7 +482,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
- JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWithToPair(
pairStream2,
new Function3<
JavaPairRDD<String, String>,
@@ -551,7 +551,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+ JavaPairDStream<Double, Double> transformed3 = stream1.transformWithToPair(
stream2,
new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -561,7 +561,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+ JavaPairDStream<Double, Double> transformed4 = stream1.transformWithToPair(
pairStream1,
new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -591,7 +591,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+ JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWithToPair(
stream2,
new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -601,7 +601,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
}
);
- JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+ JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWithToPair(
pairStream2,
new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
@Override
@@ -656,7 +656,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
List<JavaDStream<?>> listOfDStreams2 =
Arrays.<JavaDStream<?>>asList(stream1, stream2, pairStream1.toJavaDStream());
- JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transform(
+ JavaPairDStream<Integer, Tuple2<Integer, String>> transformed2 = ssc.transformToPair(
listOfDStreams2,
new Function2<List<JavaRDD<?>>, Time, JavaPairRDD<Integer, Tuple2<Integer, String>>>() {
public JavaPairRDD<Integer, Tuple2<Integer, String>> call(List<JavaRDD<?>> listOfRDDs, Time time) {
@@ -671,7 +671,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
return new Tuple2<Integer, Integer>(i, i);
}
};
- return rdd1.union(rdd2).map(mapToTuple).join(prdd3);
+ return rdd1.union(rdd2).mapToPair(mapToTuple).join(prdd3);
}
}
);
@@ -742,17 +742,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<Integer, String>(9, "s")));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<Integer,String> flatMapped = stream.flatMap(
- new PairFlatMapFunction<String, Integer, String>() {
- @Override
- public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
- List<Tuple2<Integer, String>> out = Lists.newArrayList();
- for (String letter: in.split("(?!^)")) {
- out.add(new Tuple2<Integer, String>(in.length(), letter));
- }
- return out;
+ JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
+ new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
}
- });
+ return out;
+ }
+ });
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -816,7 +816,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, Integer> pairStream = stream.map(
+ JavaPairDStream<String, Integer> pairStream = stream.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String in) throws Exception {
@@ -880,7 +880,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.map(
+ JavaPairDStream<Integer, String> reversed = pairStream.mapToPair(
new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
@@ -913,7 +913,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
@@ -983,7 +983,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream =
JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
@@ -1228,7 +1228,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(),
+ new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1300,7 +1301,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transformToPair(
new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
@Override
public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
@@ -1632,7 +1633,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testSocketString() {
- class Converter extends Function<InputStream, Iterable<String>> {
+
+ class Converter implements Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
List<String> out = new ArrayList<String>();