aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-11-18 12:09:54 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-18 12:09:54 -0800
commit31921e0f0bd559d042148d1ea32f865fb3068f38 (patch)
tree9b255bcbce9e85e4aa5b473eee10d7ddd060a45c /streaming
parent94624eacb0fdbbe210894151a956f8150cdf527e (diff)
downloadspark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.gz
spark-31921e0f0bd559d042148d1ea32f865fb3068f38.tar.bz2
spark-31921e0f0bd559d042148d1ea32f865fb3068f38.zip
[SPARK-4557][STREAMING] Spark Streaming foreachRDD Java API method should accept a VoidFunction<...>
Currently streaming foreachRDD Java API uses a function prototype requiring a return value of null. This PR deprecates the old method and uses VoidFunction to allow for more concise declaration. Also added VoidFunction2 to Java API in order to use in Streaming methods. Unit test is added for using foreachRDD with VoidFunction, and changes have been tested with Java 7 and Java 8 using lambdas. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #9488 from BryanCutler/foreachRDD-VoidFunction-SPARK-4557.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala24
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java41
2 files changed, 63 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index edfa474677..84acec7d8e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, VoidFunction => JVoidFunction, VoidFunction2 => JVoidFunction2, _}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.java.JavaDStream._
@@ -308,7 +308,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction)
*/
+ @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0")
def foreachRDD(foreachFunc: JFunction[R, Void]) {
dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
}
@@ -316,12 +319,31 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction2)
*/
+ @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0")
def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JVoidFunction[R]) {
+ dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JVoidFunction2[R, Time]) {
+ dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+ }
+
+ /**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c521714922..609bb4413b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -37,7 +37,9 @@ import com.google.common.base.Optional;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
+import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -45,7 +47,6 @@ import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
-import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -770,6 +771,44 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
+ public void testForeachRDD() {
+ final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,1,1),
+ Arrays.asList(1,1,1));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+ stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
+ @Override
+ public void call(JavaRDD<Integer> rdd) {
+ accumRdd.add(1);
+ rdd.foreach(new VoidFunction<Integer>() {
+ @Override
+ public void call(Integer i) {
+ accumEle.add(1);
+ }
+ });
+ }
+ });
+
+ // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
+ stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
+ @Override
+ public void call(JavaRDD<Integer> rdd, Time time) {
+ }
+ });
+
+ JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(2, accumRdd.value().intValue());
+ Assert.assertEquals(6, accumEle.value().intValue());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants"),