diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-04 20:28:08 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3 (patch) | |
tree | d747b4effd1da474490cac4e4986d87ed1f9b36a | |
parent | 22a8c7be9aebe46c7ee332228967039be811043b (diff) | |
download | spark-0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3.tar.gz spark-0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3.tar.bz2 spark-0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3.zip |
Reduce tests
-rw-r--r-- | streaming/src/test/scala/spark/streaming/JavaAPISuite.java | 69 |
1 files changed, 63 insertions, 6 deletions
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 9833478221..2d1b0f35f9 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -7,6 +7,7 @@ import org.junit.Before; import org.junit.Test; import spark.api.java.function.FlatMapFunction; import spark.api.java.function.Function; +import spark.api.java.function.Function2; import spark.streaming.JavaTestUtils; import spark.streaming.api.java.JavaDStream; import spark.streaming.api.java.JavaStreamingContext; @@ -92,8 +93,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); - JavaDStream windowedRDD = stream.window(new Time(2000)); - JavaTestUtils.attachTestOutputStream(windowedRDD); + JavaDStream windowed = stream.window(new Time(2000)); + JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4); assertOrderInvariantEquals(expected, result); @@ -116,8 +117,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(13,14,15,16,17,18)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); - JavaDStream windowedRDD = stream.window(new Time(4000), new Time(2000)); - JavaTestUtils.attachTestOutputStream(windowedRDD); + JavaDStream windowed = stream.window(new Time(4000), new Time(2000)); + JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4); assertOrderInvariantEquals(expected, result); @@ -139,8 +140,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(13,14,15,16,17,18)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); - JavaDStream windowedRDD = stream.tumble(new Time(2000)); - JavaTestUtils.attachTestOutputStream(windowedRDD); + JavaDStream windowed = stream.tumble(new Time(2000)); + JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3); assertOrderInvariantEquals(expected, result); @@ -214,6 +215,62 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + private class IntegerSum extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + } + + private class IntegerDifference extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 - i2; + } + } + + @Test + public void testReduce() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(21), + Arrays.asList(39), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + new IntegerDifference(), new Time(2000), new Time(1000)); + JavaTestUtils.attachTestOutputStream(reducedWindowed); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4); + + Assert.assertEquals(expected, result); + } + /* * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. |