aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2014-04-08 18:15:52 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-08 18:15:59 -0700
commitce8ec5456169682f27f846e7b8d51e6c4bcf75e3 (patch)
tree029a7ba0926eb1a8384ba73e74fc0bb018121528 /streaming/src/test/java/org/apache
parent12c077d5aa0b76a808a55db625c9677a52bd43f9 (diff)
downloadspark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.gz
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.bz2
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.zip
Spark 1271: Co-Group and Group-By should pass Iterable[X]
Author: Holden Karau <holden@pigscanfly.ca> Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits: f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator 77048f8 [Holden Karau] Fix merge up to master d3fe909 [Holden Karau] use toSeq instead 7a092a3 [Holden Karau] switch resultitr to resultiterable eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables c5075aa [Holden Karau] If guava 14 had iterables 2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API 11e730c [Holden Karau] Fix streaming tests 66b583d [Holden Karau] Fix the core test suite to compile 4ed579b [Holden Karau] Refactor from iterator to iterable d052c07 [Holden Karau] Python tests now pass with iterator pandas 3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work" cd1e81c [Holden Karau] Try and make pickling list iterators work c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well 88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming a5ee714 [Holden Karau] oops, was checking wrong iterator e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming ec8cc3e [Holden Karau] Fix test issues\! 4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD" ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas" b692868 [Holden Karau] Revert 7e533f7 [Holden Karau] Fix the bug 8a5153a [Holden Karau] Revert me, but we have some stuff to debug b4e86a9 [Holden Karau] Add a join based on the problem in SVD c4510e2 [Holden Karau] Revert this but for now put things in list pandas b4e0b1d [Holden Karau] Fix style issues 71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness. b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work 37888ec [Holden Karau] core/tests now pass 249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes 6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy" fe992fe [Holden Karau] hmmm try and fix up basic operation suite 172705c [Holden Karau] Fix Java API suite caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy 88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator 4991af6 [Holden Karau] Fix some tests be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after 687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java58
1 files changed, 49 insertions, 9 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index e93bf18b6d..13fa64894b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.*;
import java.util.*;
+import java.lang.Iterable;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -45,6 +46,18 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
+ public void equalIterator(Iterator<?> a, Iterator<?> b) {
+ while (a.hasNext() && b.hasNext()) {
+ Assert.assertEquals(a.next(), b.next());
+ }
+ Assert.assertEquals(a.hasNext(), b.hasNext());
+ }
+
+ public void equalIterable(Iterable<?> a, Iterable<?> b) {
+ equalIterator(a.iterator(), b.iterator());
+ }
+
+
@SuppressWarnings("unchecked")
@Test
public void testCount() {
@@ -1016,11 +1029,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
+ JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey();
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
+ List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected.size(), result.size());
+ Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = result.iterator();
+ Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator();
+ while (resultItr.hasNext() && expectedItr.hasNext()) {
+ Iterator<Tuple2<String, Iterable<String>>> resultElements = resultItr.next().iterator();
+ Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator();
+ while (resultElements.hasNext() && expectedElements.hasNext()) {
+ Tuple2<String, Iterable<String>> resultElement = resultElements.next();
+ Tuple2<String, List<String>> expectedElement = expectedElements.next();
+ Assert.assertEquals(expectedElement._1(), resultElement._1());
+ equalIterable(expectedElement._2(), resultElement._2());
+ }
+ Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
+ }
}
@SuppressWarnings("unchecked")
@@ -1128,7 +1154,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<Integer>> groupWindowed =
+ JavaPairDStream<String, Iterable<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1471,11 +1497,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
- JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
+ JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
+ List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected.size(), result.size());
+ Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> resultItr = result.iterator();
+ Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr = expected.iterator();
+ while (resultItr.hasNext() && expectedItr.hasNext()) {
+ Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> resultElements = resultItr.next().iterator();
+ Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements = expectedItr.next().iterator();
+ while (resultElements.hasNext() && expectedElements.hasNext()) {
+ Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> resultElement = resultElements.next();
+ Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement = expectedElements.next();
+ Assert.assertEquals(expectedElement._1(), resultElement._1());
+ equalIterable(expectedElement._2()._1(), resultElement._2()._1());
+ equalIterable(expectedElement._2()._2(), resultElement._2()._2());
+ }
+ Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
+ }
}
@SuppressWarnings("unchecked")