aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-11-18 12:09:54 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-18 12:09:54 -0800
commit31921e0f0bd559d042148d1ea32f865fb3068f38 (patch)
tree9b255bcbce9e85e4aa5b473eee10d7ddd060a45c
parent94624eacb0fdbbe210894151a956f8150cdf527e (diff)
downloadspark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.gz
spark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.bz2
spark-31921e0f0bd559d042148d1ea32f865fb3068f38.zip
[SPARK-4557][STREAMING] Spark Streaming foreachRDD Java API method should accept a VoidFunction<...>
Currently streaming foreachRDD Java API uses a function prototype requiring a return value of null. This PR deprecates the old method and uses VoidFunction to allow for more concise declaration. Also added VoidFunction2 to Java API in order to use in Streaming methods. Unit test is added for using foreachRDD with VoidFunction, and changes have been tested with Java 7 and Java 8 using lambdas. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #9488 from BryanCutler/foreachRDD-VoidFunction-SPARK-4557.
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java27
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java26
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala24
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java41
5 files changed, 120 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
new file mode 100644
index 0000000000..6c576ab678
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import java.io.Serializable;
+
+/**
+ * A two-argument function that takes arguments of type T1 and T2 with no return value.
+ */
+public interface VoidFunction2<T1, T2> extends Serializable {
+ public void call(T1 v1, T2 v2) throws Exception;
+}
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 163ae92c12..4eee97bc89 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -361,6 +362,31 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
}
@Test
+ public void testForeachRDD() {
+ final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,1,1),
+ Arrays.asList(1,1,1));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+ stream.foreachRDD(rdd -> {
+ accumRdd.add(1);
+ rdd.foreach(x -> accumEle.add(1));
+ });
+
+ // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
+ stream.foreachRDD((rdd, time) -> null);
+
+ JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(2, accumRdd.value().intValue());
+ Assert.assertEquals(6, accumEle.value().intValue());
+ }
+
+ @Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants"),
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index eb70d27c34..bb45d1bb12 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -142,6 +142,10 @@ object MimaExcludes {
"org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream"),
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createRDD")
+ ) ++ Seq(
+ // SPARK-4557 Changed foreachRDD to use VoidFunction
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.api.java.JavaDStreamLike.foreachRDD")
)
case v if v.startsWith("1.5") =>
Seq(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index edfa474677..84acec7d8e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, VoidFunction => JVoidFunction, VoidFunction2 => JVoidFunction2, _}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.java.JavaDStream._
@@ -308,7 +308,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction)
*/
+ @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0")
def foreachRDD(foreachFunc: JFunction[R, Void]) {
dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
}
@@ -316,12 +319,31 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction2)
*/
+ @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0")
def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JVoidFunction[R]) {
+ dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JVoidFunction2[R, Time]) {
+ dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+ }
+
+ /**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c521714922..609bb4413b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -37,7 +37,9 @@ import com.google.common.base.Optional;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
+import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -45,7 +47,6 @@ import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
-import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -770,6 +771,44 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
+ public void testForeachRDD() {
+ final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,1,1),
+ Arrays.asList(1,1,1));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+ stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
+ @Override
+ public void call(JavaRDD<Integer> rdd) {
+ accumRdd.add(1);
+ rdd.foreach(new VoidFunction<Integer>() {
+ @Override
+ public void call(Integer i) {
+ accumEle.add(1);
+ }
+ });
+ }
+ });
+
+ // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
+ stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
+ @Override
+ public void call(JavaRDD<Integer> rdd, Time time) {
+ }
+ });
+
+ JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(2, accumRdd.value().intValue());
+ Assert.assertEquals(6, accumEle.value().intValue());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants"),