diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-22 23:35:51 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-22 23:35:51 -0700 |
commit | 72d2e1dd777696640f64aaf92fecab64c6387df0 (patch) | |
tree | e39435e82233d4d2a8e8fd4eb10dda3683e10b48 /streaming/src/test/java | |
parent | 06664987990debcb4439a9dc26e1859508c601f5 (diff) | |
download | spark-72d2e1dd777696640f64aaf92fecab64c6387df0.tar.gz spark-72d2e1dd777696640f64aaf92fecab64c6387df0.tar.bz2 spark-72d2e1dd777696640f64aaf92fecab64c6387df0.zip |
Fixed bug in Java transformWith, added more Java testcases for transform and transformWith, added missing variations of Java join and cogroup, updated various Scala and Java API docs.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 219 |
1 files changed, 204 insertions, 15 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 9f885f07f2..16622a3459 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -320,17 +320,20 @@ public class JavaAPISuite implements Serializable { Arrays.asList(9,10,11)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> transformed = - stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { - @Override - public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { - return in.map(new Function<Integer, Integer>() { - @Override - public Integer call(Integer i) throws Exception { - return i + 2; - } - }); - }}); + JavaDStream<Integer> transformed = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return in.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + } + } + ); + JavaTestUtils.attachTestOutputStream(transformed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -338,6 +341,84 @@ public class JavaAPISuite implements Serializable { } @Test + public void testVariousTransform() { + // tests whether all variations of transform can be called from Java + + List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + + List<List<Tuple2<String, Integer>>> pairInputData = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1)); + + JavaDStream<Integer> transformed1 = stream.transform( + new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> transformed2 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed3 = stream.transform( + new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, Integer> transformed4 = stream.transform( + new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() { + @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed1 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaDStream<Integer> pairTransformed2 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() { + @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed3 = pairStream.transform( + new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception { + return null; + } + } + ); + + JavaPairDStream<String, String> pairTransformed4 = pairStream.transform( + new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() { + @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception { + return null; + } + } + ); + + } + + @Test public void testTransformWith() { List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( Arrays.asList(new Tuple2<String, String>("california", "dodgers"), @@ -374,10 +455,18 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( pairStream2, - new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() { - @Override - public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception { - return stringStringJavaPairRDD.join(stringStringJavaPairRDD2); + new Function3 < + JavaPairRDD<String, String>, + JavaPairRDD<String, String>, + Time, + JavaPairRDD<String, Tuple2<String, String>> + >() { + @Override public JavaPairRDD<String, Tuple2<String, String>> call( + JavaPairRDD<String, String> rdd1, + JavaPairRDD<String, String> rdd2, + Time time + ) throws Exception { + return rdd1.join(rdd2); } } ); @@ -389,6 +478,106 @@ public class JavaAPISuite implements Serializable { } + @Test + public void testVariousTransformWith() { + // tests whether all variations of transformWith can be called from Java + + List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1)); + List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x")); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1); + JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1); + + List<List<Tuple2<String, Integer>>> pairInputData1 = + Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1))); + List<List<Tuple2<Double, Character>>> pairInputData2 = + Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x'))); + JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1)); + JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream( + JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1)); + + JavaDStream<Double> transformed1 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> transformed2 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed3 = stream1.transformWith( + stream2, + new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> transformed4 = stream1.transformWith( + pairStream1, + new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed1 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith( + pairStream1, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() { + @Override + public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception { + return null; + } + } + ); + + JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith( + stream2, + new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception { + return null; + } + } + ); + + + JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith( + pairStream2, + new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() { + @Override + public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception { + return null; + } + } + ); + } + @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList( |