diff options
author | Bryan Cutler <bjcutler@us.ibm.com> | 2015-11-18 12:09:54 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-11-18 12:09:54 -0800 |
commit | 31921e0f0bd559d042148d1ea32f865fb3068f38 (patch) | |
tree | 9b255bcbce9e85e4aa5b473eee10d7ddd060a45c /extras | |
parent | 94624eacb0fdbbe210894151a956f8150cdf527e (diff) | |
download | spark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.gz spark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.bz2 spark-31921e0f0bd559d042148d1ea32f865fb3068f38.zip |
[SPARK-4557][STREAMING] Spark Streaming foreachRDD Java API method should accept a VoidFunction<...>
Currently streaming foreachRDD Java API uses a function prototype requiring a return value of null. This PR deprecates the old method and uses VoidFunction to allow for more concise declaration. Also added VoidFunction2 to Java API in order to use in Streaming methods. Unit test is added for using foreachRDD with VoidFunction, and changes have been tested with Java 7 and Java 8 using lambdas.
Author: Bryan Cutler <bjcutler@us.ibm.com>
Closes #9488 from BryanCutler/foreachRDD-VoidFunction-SPARK-4557.
Diffstat (limited to 'extras')
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 163ae92c12..4eee97bc89 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; +import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -361,6 +362,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ } @Test + public void testForeachRDD() { + final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0); + final Accumulator<Integer> accumEle = ssc.sc().accumulator(0); + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,1,1), + Arrays.asList(1,1,1)); + + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output + + stream.foreachRDD(rdd -> { + accumRdd.add(1); + rdd.foreach(x -> accumEle.add(1)); + }); + + // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java + stream.foreachRDD((rdd, time) -> null); + + JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(2, accumRdd.value().intValue()); + Assert.assertEquals(6, accumEle.value().intValue()); + } + + @Test public void testPairFlatMap() { List<List<String>> inputData = Arrays.asList( Arrays.asList("giants"), |