aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-01-22 22:04:21 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-22 22:04:21 -0800
commite0f7fb7f9f497b34d42f9ba147197cf9ffc51607 (patch)
tree65b789005b712b8d4b61256c348b78f53f0a8554 /streaming
parent3c3fa632e6ba45ce536065aa1145698385301fb2 (diff)
downloadspark-e0f7fb7f9f497b34d42f9ba147197cf9ffc51607.tar.gz
spark-e0f7fb7f9f497b34d42f9ba147197cf9ffc51607.tar.bz2
spark-e0f7fb7f9f497b34d42f9ba147197cf9ffc51607.zip
[SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug
`reduceByWindow` for Java API is actually not Java compatible, change to make it Java compatible. Current solution is to deprecate the old one and add a new API, but since old API actually is not correct, so is keeping the old one meaningful? just to keep the binary compatible? Also even adding new API still need to add to Mima exclusion, I'm not sure to change the API, or deprecate the old API and add a new one, which is the best solution? Author: jerryshao <saisai.shao@intel.com> Closes #4104 from jerryshao/SPARK-5315 and squashes the following commits: 5bc8987 [jerryshao] Address the comment c7aa1b4 [jerryshao] Deprecate the old one to keep binary compatible 8e9dc67 [jerryshao] Fix JavaDStream reduceByWindow signature error
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala20
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java20
2 files changed, 38 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index e0542eda13..c382a12f4d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -211,7 +211,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
+ * @deprecated As this API is not Java compatible.
*/
+ @deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
def reduceByWindow(
reduceFunc: (T, T) => T,
windowDuration: Duration,
@@ -222,6 +224,24 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a sliding window over this DStream.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByWindow(
+ reduceFunc: JFunction2[T, T, T],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): JavaDStream[T] = {
+ dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a sliding window over this DStream. However, the reduction is done incrementally
* using the old window's reduced value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index d92e7fe899..d4c4074565 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -306,7 +306,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
- public void testReduceByWindow() {
+ public void testReduceByWindowWithInverse() {
+ testReduceByWindow(true);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReduceByWindowWithoutInverse() {
+ testReduceByWindow(false);
+ }
+
+ private void testReduceByWindow(boolean withInverse) {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),
@@ -319,8 +329,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ JavaDStream<Integer> reducedWindowed = null;
+ if (withInverse) {
+ reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
+ } else {
+ reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ new Duration(2000), new Duration(1000));
+ }
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);