aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala24
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java41
2 files changed, 63 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index edfa474677..84acec7d8e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaRDDLike}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
-import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, _}
+import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, Function3 => JFunction3, VoidFunction => JVoidFunction, VoidFunction2 => JVoidFunction2, _}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.api.java.JavaDStream._
@@ -308,7 +308,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction)
*/
+ @deprecated("Use foreachRDD(foreachFunc: JVoidFunction[R])", "1.6.0")
def foreachRDD(foreachFunc: JFunction[R, Void]) {
dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
}
@@ -316,12 +319,31 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
+ *
+ * @deprecated As of release 1.6.0, replaced by foreachRDD(JVoidFunction2)
*/
+ @deprecated("Use foreachRDD(foreachFunc: JVoidFunction2[R, Time])", "1.6.0")
def foreachRDD(foreachFunc: JFunction2[R, Time, Void]) {
dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
}
/**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JVoidFunction[R]) {
+ dstream.foreachRDD(rdd => foreachFunc.call(wrapRDD(rdd)))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreachRDD(foreachFunc: JVoidFunction2[R, Time]) {
+ dstream.foreachRDD((rdd, time) => foreachFunc.call(wrapRDD(rdd), time))
+ }
+
+ /**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c521714922..609bb4413b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -37,7 +37,9 @@ import com.google.common.base.Optional;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
+import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -45,7 +47,6 @@ import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.util.Utils;
-import org.apache.spark.SparkConf;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -770,6 +771,44 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
+ public void testForeachRDD() {
+ final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,1,1),
+ Arrays.asList(1,1,1));
+
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaTestUtils.attachTestOutputStream(stream.count()); // dummy output
+
+ stream.foreachRDD(new VoidFunction<JavaRDD<Integer>>() {
+ @Override
+ public void call(JavaRDD<Integer> rdd) {
+ accumRdd.add(1);
+ rdd.foreach(new VoidFunction<Integer>() {
+ @Override
+ public void call(Integer i) {
+ accumEle.add(1);
+ }
+ });
+ }
+ });
+
+ // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
+ stream.foreachRDD(new VoidFunction2<JavaRDD<Integer>, Time>() {
+ @Override
+ public void call(JavaRDD<Integer> rdd, Time time) {
+ }
+ });
+
+ JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(2, accumRdd.value().intValue());
+ Assert.assertEquals(6, accumEle.value().intValue());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testPairFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants"),