aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-04 18:30:48 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit22a8c7be9aebe46c7ee332228967039be811043b (patch)
tree057f92f90692712e2d91731de157a6c70a81edbe /streaming
parent91b3d414481cd06e0b75f621550a38042f9b2ffd (diff)
downloadspark-22a8c7be9aebe46c7ee332228967039be811043b.tar.gz
spark-22a8c7be9aebe46c7ee332228967039be811043b.tar.bz2
spark-22a8c7be9aebe46c7ee332228967039be811043b.zip
Adding more tests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java181
2 files changed, 173 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index e9391642f8..d0fa06ba7b 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -35,7 +35,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
def context(): StreamingContext = dstream.context()
- def window(windowTime: Time) = {
+ def window(windowTime: Time): JavaDStream[T] = {
dstream.window(windowTime)
}
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 13dfd921cf..9833478221 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -1,19 +1,20 @@
package spark.streaming;
+import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import spark.api.java.JavaRDD;
+import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
-import spark.api.java.function.Function2;
import spark.streaming.JavaTestUtils;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -38,21 +39,31 @@ public class JavaAPISuite implements Serializable {
@Test
public void testCount() {
List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3,4), Arrays.asList(3,4,5), Arrays.asList(3));
+ Arrays.asList(1,2,3,4),
+ Arrays.asList(3,4,5),
+ Arrays.asList(3));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(4),
+ Arrays.asList(3),
+ Arrays.asList(1));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
-
- Assert.assertEquals(result,
- Arrays.asList(Arrays.asList(4), Arrays.asList(3), Arrays.asList(1)));
+ assertOrderInvariantEquals(expected, result);
}
@Test
public void testMap() {
List<List<String>> inputData = Arrays.asList(
- Arrays.asList("hello", "world"), Arrays.asList("goodnight", "moon"));
+ Arrays.asList("hello", "world"),
+ Arrays.asList("goodnight", "moon"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(5,5),
+ Arrays.asList(9,4));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@@ -64,6 +75,158 @@ public class JavaAPISuite implements Serializable {
JavaTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2);
- Assert.assertEquals(result, Arrays.asList(Arrays.asList(5, 5), Arrays.asList(9, 4)));
+ assertOrderInvariantEquals(expected, result);
}
+
+ @Test
+ public void testWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6,1,2,3),
+ Arrays.asList(7,8,9,4,5,6),
+ Arrays.asList(7,8,9));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream windowedRDD = stream.window(new Time(2000));
+ JavaTestUtils.attachTestOutputStream(windowedRDD);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testWindowWithSlideTime() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9),
+ Arrays.asList(10,11,12),
+ Arrays.asList(13,14,15),
+ Arrays.asList(16,17,18));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6),
+ Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12),
+ Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
+ Arrays.asList(13,14,15,16,17,18));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream windowedRDD = stream.window(new Time(4000), new Time(2000));
+ JavaTestUtils.attachTestOutputStream(windowedRDD);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testTumble() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9),
+ Arrays.asList(10,11,12),
+ Arrays.asList(13,14,15),
+ Arrays.asList(16,17,18));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6),
+ Arrays.asList(7,8,9,10,11,12),
+ Arrays.asList(13,14,15,16,17,18));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream windowedRDD = stream.tumble(new Time(2000));
+ JavaTestUtils.attachTestOutputStream(windowedRDD);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("yankees"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
+ @Override
+ public Boolean call(String s) throws Exception {
+ return s.contains("a");
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<String>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testGlom() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<List<String>>> expected = Arrays.asList(
+ Arrays.asList(Arrays.asList("giants", "dodgers")),
+ Arrays.asList(Arrays.asList("yankees", "red socks")));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream glommed = stream.glom();
+ JavaTestUtils.attachTestOutputStream(glommed);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapPartitions() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("GIANTSDODGERS"),
+ Arrays.asList("YANKEESRED SOCKS"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
+ @Override
+ public Iterable<String> call(Iterator<String> in) {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ /*
+ * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+ * us to account for ordering variation within individual RDD's which occurs during windowing.
+ */
+ public static <T extends Comparable> void assertOrderInvariantEquals(
+ List<List<T>> expected, List<List<T>> actual) {
+ for (List<T> list: expected) {
+ Collections.sort(list);
+ }
+ for (List<T> list: actual) {
+ Collections.sort(list);
+ }
+ Assert.assertEquals(expected, actual);
+ }
+
}