diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-21 05:34:09 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-21 05:34:09 -0700 |
commit | 06664987990debcb4439a9dc26e1859508c601f5 (patch) | |
tree | 28641321cde005c2ea41edfe61c343bfc1ce5359 /streaming/src/test | |
parent | cf64f63f8a3b54dec37e991856260ac63f7e222e (diff) | |
download | spark-06664987990debcb4439a9dc26e1859508c601f5.tar.gz spark-06664987990debcb4439a9dc26e1859508c601f5.tar.bz2 spark-06664987990debcb4439a9dc26e1859508c601f5.zip |
Updated TransformDStream to allow n-ary DStream transform. Added transformWith, leftOuterJoin and rightOuterJoin operations to DStream for Scala and Java APIs. Also added n-ary union and n-ary transform operations to StreamingContext for Scala and Java APIs.
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 89 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 103 |
2 files changed, 187 insertions, 5 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff87..9f885f07f2 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -223,7 +223,7 @@ public class JavaAPISuite implements Serializable { } }); JavaTestUtils.attachTestOutputStream(mapped); - List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -338,6 +338,58 @@ public class JavaAPISuite implements Serializable { } @Test + public void testTransformWith() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("dodgers", "giants")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("sharks", "ducks")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith( + pairStream2, + new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() { + @Override + public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception { + return stringStringJavaPairRDD.join(stringStringJavaPairRDD2); + } + } + ); + + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + + } + + @Test public void testFlatMap() { List<List<String>> inputData = Arrays.asList( Arrays.asList("go", "giants"), @@ -1099,7 +1151,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); JavaTestUtils.attachTestOutputStream(grouped); - List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } @@ -1142,7 +1194,38 @@ public class JavaAPISuite implements Serializable { JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); JavaTestUtils.attachTestOutputStream(joined); - List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testLeftOuterJoin() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks") )); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants") ), + Arrays.asList(new Tuple2<String, String>("new york", "islanders") ) + + ); + + List<List<Long>> expected = Arrays.asList(Arrays.asList(2L), Arrays.asList(1L)); + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + ssc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, Optional<String>>> joined = pairStream1.leftOuterJoin(pairStream2); + JavaDStream<Long> counted = joined.count(); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Long>> result = JavaTestUtils.runStreams(ssc, 2, 2); Assert.assertEquals(expected, result); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 11586f72b6..a2ac510a98 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -18,7 +18,10 @@ package org.apache.spark.streaming import org.apache.spark.streaming.StreamingContext._ -import scala.runtime.RichInt + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext._ + import util.ManualClock class BasicOperationsSuite extends TestSuiteBase { @@ -143,6 +146,72 @@ class BasicOperationsSuite extends TestSuiteBase { ) } + test("union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 8, 101 to 108, 201 to 208) + testOperation( + input, + (s: DStream[Int]) => s.union(s.map(_ + 4)) , + output + ) + } + + test("StreamingContext.union") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + // union over 3 DStreams + testOperation( + input, + (s: DStream[Int]) => s.context.union(Seq(s, s.map(_ + 4), s.map(_ + 8))), + output + ) + } + + test("transform") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => rdd.map(_.toString)), // RDD.map in transform + input.map(_.map(_.toString)) + ) + } + + test("transformWith") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, "x")), ("b", (1, "x")) ), + Seq( ("", (1, "x")) ), + Seq( ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + val t1 = s1.map(x => (x, 1)) + val t2 = s2.map(x => (x, "x")) + t1.transformWith( // RDD.join in transform + t2, + (rdd1: RDD[(String, Int)], rdd2: RDD[(String, String)]) => rdd1.join(rdd2) + ) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("StreamingContext.transform") { + val input = Seq(1 to 4, 101 to 104, 201 to 204) + val output = Seq(1 to 12, 101 to 112, 201 to 212) + + // transform over 3 DStreams by doing union of the 3 RDDs + val operation = (s: DStream[Int]) => { + s.context.transform( + Seq(s, s.map(_ + 4), s.map(_ + 8)), // 3 DStreams + (rdds: Seq[RDD[_]], time: Time) => + rdds.head.context.union(rdds.map(_.asInstanceOf[RDD[Int]])) // union of RDDs + ) + } + + testOperation(input, operation, output) + } + test("cogroup") { val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) @@ -168,7 +237,37 @@ class BasicOperationsSuite extends TestSuiteBase { Seq( ) ) val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).join(s2.map(x => (x,"x"))) + s1.map(x => (x, 1)).join(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("leftOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, Some("x"))), ("b", (1, Some("x"))) ), + Seq( ("", (1, Some("x"))), ("a", (1, None)) ), + Seq( ("", (1, None)) ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).leftOuterJoin(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("rightOuterJoin") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (Some(1), "x")), ("b", (Some(1), "x")) ), + Seq( ("", (Some(1), "x")), ("b", (None, "x")) ), + Seq( ), + Seq( ("", (None, "x")) ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x, 1)).rightOuterJoin(s2.map(x => (x, "x"))) } testOperation(inputData1, inputData2, operation, outputData, true) } |