diff options
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 58 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 4 |
2 files changed, 51 insertions, 11 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index e93bf18b6d..13fa64894b 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -23,6 +23,7 @@ import org.junit.Assert; import org.junit.Test; import java.io.*; import java.util.*; +import java.lang.Iterable; import com.google.common.base.Optional; import com.google.common.collect.Lists; @@ -45,6 +46,18 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; // see http://stackoverflow.com/questions/758570/. public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { + public void equalIterator(Iterator<?> a, Iterator<?> b) { + while (a.hasNext() && b.hasNext()) { + Assert.assertEquals(a.next(), b.next()); + } + Assert.assertEquals(a.hasNext(), b.hasNext()); + } + + public void equalIterable(Iterable<?> a, Iterable<?> b) { + equalIterator(a.iterator(), b.iterator()); + } + + @SuppressWarnings("unchecked") @Test public void testCount() { @@ -1016,11 +1029,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey(); + JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey(); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); + List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected.size(), result.size()); + Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = result.iterator(); + Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator(); + while (resultItr.hasNext() && expectedItr.hasNext()) { + Iterator<Tuple2<String, Iterable<String>>> resultElements = resultItr.next().iterator(); + Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator(); + while (resultElements.hasNext() && expectedElements.hasNext()) { + Tuple2<String, Iterable<String>> resultElement = resultElements.next(); + Tuple2<String, List<String>> expectedElement = expectedElements.next(); + Assert.assertEquals(expectedElement._1(), resultElement._1()); + equalIterable(expectedElement._2(), resultElement._2()); + } + Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext()); + } } @SuppressWarnings("unchecked") @@ -1128,7 +1154,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); - JavaPairDStream<String, List<Integer>> groupWindowed = + JavaPairDStream<String, Iterable<Integer>> groupWindowed = pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -1471,11 +1497,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa ssc, stringStringKVStream2, 1); JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); - JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); + JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); - - Assert.assertEquals(expected, result); + List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected.size(), result.size()); + Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> resultItr = result.iterator(); + Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr = expected.iterator(); + while (resultItr.hasNext() && expectedItr.hasNext()) { + Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> resultElements = resultItr.next().iterator(); + Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements = expectedItr.next().iterator(); + while (resultElements.hasNext() && expectedElements.hasNext()) { + Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> resultElement = resultElements.next(); + Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement = expectedElements.next(); + Assert.assertEquals(expectedElement._1(), resultElement._1()); + equalIterable(expectedElement._2()._1(), resultElement._2()._1()); + equalIterable(expectedElement._2()._2(), resultElement._2()._2()); + } + Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext()); + } } @SuppressWarnings("unchecked") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index bb73dbf29b..8aec27e394 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -117,7 +117,7 @@ class BasicOperationsSuite extends TestSuiteBase { test("groupByKey") { testOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), - (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(), + (s: DStream[String]) => s.map(x => (x, 1)).groupByKey().mapValues(_.toSeq), Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ), true ) @@ -251,7 +251,7 @@ class BasicOperationsSuite extends TestSuiteBase { Seq( ) ) val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))) + s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq)) } testOperation(inputData1, inputData2, operation, outputData, true) } |