diff options
3 files changed, 42 insertions, 2 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 127973b658..bc5d81f12d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -90,6 +90,10 @@ object MimaExcludes { // SPARK-5297 Java FileStream do not work with custom key/values ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream") + ) ++ Seq( + // SPARK-5315 Spark Streaming Java API returns Scala DStream + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow") ) case v if v.startsWith("1.2") => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index e0542eda13..c382a12f4d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -211,7 +211,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval + * @deprecated As this API is not Java compatible. */ + @deprecated("Use Java-compatible version of reduceByWindow", "1.3.0") def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, @@ -222,6 +224,24 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a sliding window over this DStream. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + windowDuration: Duration, + slideDuration: Duration + ): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. However, the reduction is done incrementally * using the old window's reduced value : * 1. reduce the new values that entered the window (e.g., adding new counts) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index d92e7fe899..d4c4074565 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -306,7 +306,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa @SuppressWarnings("unchecked") @Test - public void testReduceByWindow() { + public void testReduceByWindowWithInverse() { + testReduceByWindow(true); + } + + @SuppressWarnings("unchecked") + @Test + public void testReduceByWindowWithoutInverse() { + testReduceByWindow(false); + } + + private void testReduceByWindow(boolean withInverse) { List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,2,3), Arrays.asList(4,5,6), @@ -319,8 +329,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList(24)); JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(), + JavaDStream<Integer> reducedWindowed = null; + if (withInverse) { + reducedWindowed = stream.reduceByWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + } else { + reducedWindowed = stream.reduceByWindow(new IntegerSum(), + new Duration(2000), new Duration(1000)); + } JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); |