aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-01-22 22:04:21 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-01-22 22:04:21 -0800
commite0f7fb7f9f497b34d42f9ba147197cf9ffc51607 (patch)
tree65b789005b712b8d4b61256c348b78f53f0a8554 /streaming/src/test/java/org/apache
parent3c3fa632e6ba45ce536065aa1145698385301fb2 (diff)
downloadspark-e0f7fb7f9f497b34d42f9ba147197cf9ffc51607.tar.gz
spark-e0f7fb7f9f497b34d42f9ba147197cf9ffc51607.tar.bz2
spark-e0f7fb7f9f497b34d42f9ba147197cf9ffc51607.zip
[SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug
`reduceByWindow` for Java API is actually not Java compatible, change to make it Java compatible. Current solution is to deprecate the old one and add a new API, but since old API actually is not correct, so is keeping the old one meaningful? just to keep the binary compatible? Also even adding new API still need to add to Mima exclusion, I'm not sure to change the API, or deprecate the old API and add a new one, which is the best solution? Author: jerryshao <saisai.shao@intel.com> Closes #4104 from jerryshao/SPARK-5315 and squashes the following commits: 5bc8987 [jerryshao] Address the comment c7aa1b4 [jerryshao] Deprecate the old one to keep binary compatible 8e9dc67 [jerryshao] Fix JavaDStream reduceByWindow signature error
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java20
1 files changed, 18 insertions, 2 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index d92e7fe899..d4c4074565 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -306,7 +306,17 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@SuppressWarnings("unchecked")
@Test
- public void testReduceByWindow() {
+ public void testReduceByWindowWithInverse() {
+ testReduceByWindow(true);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReduceByWindowWithoutInverse() {
+ testReduceByWindow(false);
+ }
+
+ private void testReduceByWindow(boolean withInverse) {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),
@@ -319,8 +329,14 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ JavaDStream<Integer> reducedWindowed = null;
+ if (withInverse) {
+ reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
+ } else {
+ reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ new Duration(2000), new Duration(1000));
+ }
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);