aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-04 20:28:08 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3 (patch)
treed747b4effd1da474490cac4e4986d87ed1f9b36a
parent22a8c7be9aebe46c7ee332228967039be811043b (diff)
downloadspark-0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3.tar.gz
spark-0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3.tar.bz2
spark-0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3.zip
Reduce tests
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java69
1 files changed, 63 insertions, 6 deletions
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 9833478221..2d1b0f35f9 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -7,6 +7,7 @@ import org.junit.Before;
import org.junit.Test;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
+import spark.api.java.function.Function2;
import spark.streaming.JavaTestUtils;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;
@@ -92,8 +93,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
- JavaDStream windowedRDD = stream.window(new Time(2000));
- JavaTestUtils.attachTestOutputStream(windowedRDD);
+ JavaDStream windowed = stream.window(new Time(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
assertOrderInvariantEquals(expected, result);
@@ -116,8 +117,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(13,14,15,16,17,18));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
- JavaDStream windowedRDD = stream.window(new Time(4000), new Time(2000));
- JavaTestUtils.attachTestOutputStream(windowedRDD);
+ JavaDStream windowed = stream.window(new Time(4000), new Time(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4);
assertOrderInvariantEquals(expected, result);
@@ -139,8 +140,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(13,14,15,16,17,18));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
- JavaDStream windowedRDD = stream.tumble(new Time(2000));
- JavaTestUtils.attachTestOutputStream(windowedRDD);
+ JavaDStream windowed = stream.tumble(new Time(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3);
assertOrderInvariantEquals(expected, result);
@@ -214,6 +215,62 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
+ private class IntegerSum extends Function2<Integer, Integer, Integer> {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ }
+
+ private class IntegerDifference extends Function2<Integer, Integer, Integer> {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 - i2;
+ }
+ }
+
+ @Test
+ public void testReduce() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(15),
+ Arrays.asList(24));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream reduced = stream.reduce(new IntegerSum());
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(21),
+ Arrays.asList(39),
+ Arrays.asList(24));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ new IntegerDifference(), new Time(2000), new Time(1000));
+ JavaTestUtils.attachTestOutputStream(reducedWindowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
+
+ Assert.assertEquals(expected, result);
+ }
+
/*
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.