aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java29
1 files changed, 19 insertions, 10 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 0d2145da9a..8b7d7709bf 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -28,6 +28,7 @@ import java.util.*;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
+import com.google.common.collect.Sets;
import org.apache.spark.SparkConf;
import org.apache.spark.HashPartitioner;
@@ -441,13 +442,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
new Tuple2<String, String>("new york", "islanders")));
- List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
- Arrays.asList(
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("dodgers", "giants")),
new Tuple2<String, Tuple2<String, String>>("new york",
- new Tuple2<String, String>("yankees", "mets"))),
- Arrays.asList(
+ new Tuple2<String, String>("yankees", "mets"))),
+ Sets.newHashSet(
new Tuple2<String, Tuple2<String, String>>("california",
new Tuple2<String, String>("sharks", "ducks")),
new Tuple2<String, Tuple2<String, String>>("new york",
@@ -482,8 +483,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList();
+ for (List<Tuple2<String, Tuple2<String, String>>> res: result) {
+ unorderedResult.add(Sets.newHashSet(res));
+ }
- Assert.assertEquals(expected, result);
+ Assert.assertEquals(expected, unorderedResult);
}
@@ -1196,15 +1201,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("hello", "moon"),
Arrays.asList("hello"));
- List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
+ List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Long>("hello", 1L),
new Tuple2<String, Long>("world", 1L)),
- Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("world", 1L),
new Tuple2<String, Long>("moon", 1L)),
- Arrays.asList(
+ Sets.newHashSet(
new Tuple2<String, Long>("hello", 2L),
new Tuple2<String, Long>("moon", 1L)));
@@ -1214,8 +1219,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList();
+ for (List<Tuple2<String, Long>> res: result) {
+ unorderedResult.add(Sets.newHashSet(res));
+ }
- Assert.assertEquals(expected, result);
+ Assert.assertEquals(expected, unorderedResult);
}
@Test