diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-20 09:01:29 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-20 09:01:29 -0800 |
commit | fb9956256d19b9f8f79de43099d2b5fc851bcf08 (patch) | |
tree | 8a4bd96ce7f122342dcdc11626dae46b90e0c24c /streaming/src/test | |
parent | 7e30c46aaf337eb95c9ec37ddc2ad79439430c96 (diff) | |
parent | 03d847999e8c54684128573b94973544026081b2 (diff) | |
download | spark-fb9956256d19b9f8f79de43099d2b5fc851bcf08.tar.gz spark-fb9956256d19b9f8f79de43099d2b5fc851bcf08.tar.bz2 spark-fb9956256d19b9f8f79de43099d2b5fc851bcf08.zip |
Merge branch 'mesos-master' into streaming
Conflicts:
core/src/main/scala/spark/rdd/CheckpointRDD.scala
streaming/src/main/scala/spark/streaming/dstream/ReducedWindowedDStream.scala
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 186 |
1 files changed, 183 insertions, 3 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 5d510fd89f..17cd5ed795 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -11,6 +11,7 @@ import org.junit.Before; import org.junit.Test; import scala.Tuple2; import spark.HashPartitioner; +import spark.api.java.JavaPairRDD; import spark.api.java.JavaRDD; import spark.api.java.JavaSparkContext; import spark.api.java.function.*; @@ -485,6 +486,141 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Integer>("new york", 1))); @Test + public void testPairMap() { // Maps pair -> pair of different type + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "california"), + new Tuple2<Integer, String>(3, "california"), + new Tuple2<Integer, String>(4, "new york"), + new Tuple2<Integer, String>(1, "new york")), + Arrays.asList( + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(3, "new york"), + new Tuple2<Integer, String>(1, "new york"))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> reversed = pairStream.map( + new PairFunction<Tuple2<String, Integer>, Integer, String>() { + @Override + public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { + return in.swap(); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMapPartitions() { // Maps pair -> pair of different type + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "california"), + new Tuple2<Integer, String>(3, "california"), + new Tuple2<Integer, String>(4, "new york"), + new Tuple2<Integer, String>(1, "new york")), + Arrays.asList( + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(3, "new york"), + new Tuple2<Integer, String>(1, "new york"))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions( + new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception { + LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + while (in.hasNext()) { + Tuple2<String, Integer> next = in.next(); + out.add(next.swap()); + } + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMap2() { // Maps pair -> single + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1, 3, 4, 1), + Arrays.asList(5, 5, 3, 1)); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Integer> reversed = pairStream.map( + new Function<Tuple2<String, Integer>, Integer>() { + @Override + public Integer call(Tuple2<String, Integer> in) throws Exception { + return in._2(); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair + List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("hi", 1), + new Tuple2<String, Integer>("ho", 2)), + Arrays.asList( + new Tuple2<String, Integer>("hi", 1), + new Tuple2<String, Integer>("ho", 2))); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "h"), + new Tuple2<Integer, String>(1, "i"), + new Tuple2<Integer, String>(2, "h"), + new Tuple2<Integer, String>(2, "o")), + Arrays.asList( + new Tuple2<Integer, String>(1, "h"), + new Tuple2<Integer, String>(1, "i"), + new Tuple2<Integer, String>(2, "h"), + new Tuple2<Integer, String>(2, "o"))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap( + new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { + List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2<Integer, String>(in._2(), s.toString())); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test public void testPairGroupByKey() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; @@ -548,7 +684,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( new Function<Integer, Integer>() { - @Override + @Override public Integer call(Integer i) throws Exception { return i; } @@ -669,7 +805,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( - new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){ + new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { int out = 0; @@ -681,7 +817,7 @@ public class JavaAPISuite implements Serializable { } return Optional.of(out); } - }); + }); JavaTestUtils.attachTestOutputStream(updated); List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -741,6 +877,50 @@ public class JavaAPISuite implements Serializable { } @Test + public void testPairTransform() { + List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(2, 5)), + Arrays.asList( + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(1, 5))); + + List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5)), + Arrays.asList( + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5))); + + JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<Integer, Integer> sorted = pairStream.transform( + new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() { + @Override + public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + return in.sortByKey(); + } + }); + + JavaTestUtils.attachTestOutputStream(sorted); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test public void testMapValues() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; |