aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-10-21 05:34:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-10-21 05:34:09 -0700
commit06664987990debcb4439a9dc26e1859508c601f5 (patch)
tree28641321cde005c2ea41edfe61c343bfc1ce5359 /streaming/src/test/java
parentcf64f63f8a3b54dec37e991856260ac63f7e222e (diff)
downloadspark-06664987990debcb4439a9dc26e1859508c601f5.tar.gz
spark-06664987990debcb4439a9dc26e1859508c601f5.tar.bz2
spark-06664987990debcb4439a9dc26e1859508c601f5.zip
Updated TransformDStream to allow n-ary DStream transform. Added transformWith, leftOuterJoin and rightOuterJoin operations to DStream for Scala and Java APIs. Also added n-ary union and n-ary transform operations to StreamingContext for Scala and Java APIs.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java89
1 files changed, 86 insertions, 3 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..9f885f07f2 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -223,7 +223,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(mapped);
- List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -338,6 +338,58 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testTransformWith() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+ pairStream2,
+ new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() {
+ @Override
+ public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception {
+ return stringStringJavaPairRDD.join(stringStringJavaPairRDD2);
+ }
+ }
+ );
+
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+
+ }
+
+ @Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("go", "giants"),
@@ -1099,7 +1151,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -1142,7 +1194,38 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testLeftOuterJoin() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants") ),
+ Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
+
+ );
+
+ List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2);
+ JavaDStream<Long> counted = joined.count();
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}