aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-10-21 05:34:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-10-21 05:34:09 -0700
commit06664987990debcb4439a9dc26e1859508c601f5 (patch)
tree28641321cde005c2ea41edfe61c343bfc1ce5359 /streaming/src/test
parentcf64f63f8a3b54dec37e991856260ac63f7e222e (diff)
downloadspark-06664987990debcb4439a9dc26e1859508c601f5.tar.gz
spark-06664987990debcb4439a9dc26e1859508c601f5.tar.bz2
spark-06664987990debcb4439a9dc26e1859508c601f5.zip
Updated TransformDStream to allow n-ary DStream transform. Added transformWith, leftOuterJoin and rightOuterJoin operations to DStream for Scala and Java APIs. Also added n-ary union and n-ary transform operations to StreamingContext for Scala and Java APIs.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java89
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala103
2 files changed, 187 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..9f885f07f2 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -223,7 +223,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(mapped);
- List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -338,6 +338,58 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testTransformWith() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
+ pairStream2,
+ new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() {
+ @Override
+ public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception {
+ return stringStringJavaPairRDD.join(stringStringJavaPairRDD2);
+ }
+ }
+ );
+
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+
+ }
+
+ @Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("go", "giants"),
@@ -1099,7 +1151,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@@ -1142,7 +1194,38 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testLeftOuterJoin() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks") ));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants") ),
+ Arrays.asList(new Tuple2<String, String>("new york", "islanders") )
+
+ );
+
+ List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L));
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ ssc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2);
+ JavaDStream<Long> counted = joined.count();
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 11586f72b6..a2ac510a98 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -18,7 +18,10 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
-import scala.runtime.RichInt
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+
import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
@@ -143,6 +146,72 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
+ test("union") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 8, 101 to 108, 201 to 208)
+ testOperation(
+ input,
+ (s: DStream[Int]) => s.union(s.map(_ + 4)) ,
+ output
+ )
+ }
+
+ test("StreamingContext.union") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 12, 101 to 112, 201 to 212)
+ // union over 3 DStreams
+ testOperation(
+ input,
+ (s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))),
+ output
+ )
+ }
+
+ test("transform") {
+ val input = Seq(1 to 4, 5 to 8, 9 to 12)
+ testOperation(
+ input,
+ (r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform
+ input.map(_.map(_.toString))
+ )
+ }
+
+ test("transformWith") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (1, "x")), ("b", (1, "x")) ),
+ Seq( ("", (1, "x")) ),
+ Seq( ),
+ Seq( )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ val t1 = s1.map(x => (x, 1))
+ val t2 = s2.map(x => (x, "x"))
+ t1.transformWith( // RDD.join in transform
+ t2,
+ (rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2)
+ )
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("StreamingContext.transform") {
+ val input = Seq(1 to 4, 101 to 104, 201 to 204)
+ val output = Seq(1 to 12, 101 to 112, 201 to 212)
+
+ // transform over 3 DStreams by doing union of the 3 RDDs
+ val operation = (s: DStream[Int]) => {
+ s.context.transform(
+ Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams
+ (rdds: Seq[RDD[_]], time: Time) =>
+ rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs
+ )
+ }
+
+ testOperation(input, operation, output)
+ }
+
test("cogroup") {
val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() )
val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() )
@@ -168,7 +237,37 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
- s1.map(x => (x,1)).join(s2.map(x => (x,"x")))
+ s1.map(x => (x, 1)).join(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("leftOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ),
+ Seq( ("", (1, Some("x"))), ("a", (1, None)) ),
+ Seq( ("", (1, None)) ),
+ Seq( )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x")))
+ }
+ testOperation(inputData1, inputData2, operation, outputData, true)
+ }
+
+ test("rightOuterJoin") {
+ val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() )
+ val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") )
+ val outputData = Seq(
+ Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ),
+ Seq( ("", (Some(1), "x")), ("b", (None, "x")) ),
+ Seq( ),
+ Seq( ("", (None, "x")) )
+ )
+ val operation = (s1: DStream[String], s2: DStream[String]) => {
+ s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x")))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}