aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-11-18 12:09:54 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-18 12:09:54 -0800
commit31921e0f0bd559d042148d1ea32f865fb3068f38 (patch)
tree9b255bcbce9e85e4aa5b473eee10d7ddd060a45c /extras
parent94624eacb0fdbbe210894151a956f8150cdf527e (diff)
downloadspark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.gz
spark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.bz2
spark-31921e0f0bd559d042148d1ea32f865fb3068f38.zip
[SPARK-4557][STREAMING] Spark Streaming foreachRDD Java API method should accept a VoidFunction<...>
Currently streaming foreachRDD Java API uses a function prototype requiring a return value of null. This PR deprecates the old method and uses VoidFunction to allow for more concise declaration. Also added VoidFunction2 to Java API in order to use in Streaming methods. Unit test is added for using foreachRDD with VoidFunction, and changes have been tested with Java 7 and Java 8 using lambdas. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #9488 from BryanCutler/foreachRDD-VoidFunction-SPARK-4557.
Diffstat (limited to 'extras')
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java26
1 files changed, 26 insertions, 0 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 163ae92c12..4eee97bc89 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -361,6 +362,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
}
@Test
+ public void testForeachRDD() {
+ final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,1,1),
+ Arrays.asList(1,1,1));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+ stream.foreachRDD(rdd -> {
+ accumRdd.add(1);
+ rdd.foreach(x -> accumEle.add(1));
+ });
+
+ // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
+ stream.foreachRDD((rdd, time) -> null);
+
+ JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(2, accumRdd.value().intValue());
+ Assert.assertEquals(6, accumEle.value().intValue());
+ }
+
+ @Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants"),