diff options
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 47 | ||||
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaTestUtils.scala | 5 |
2 files changed, 48 insertions, 4 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 17cd5ed795..4530af5f6a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -13,6 +13,8 @@ import scala.Tuple2; import spark.HashPartitioner; import spark.api.java.JavaPairRDD; import spark.api.java.JavaRDD; +import spark.api.java.JavaRDDLike; +import spark.api.java.JavaPairRDD; import spark.api.java.JavaSparkContext; import spark.api.java.function.*; import spark.storage.StorageLevel; @@ -294,8 +296,9 @@ public class JavaAPISuite implements Serializable { Arrays.asList(6,7,8), Arrays.asList(9,10,11)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> transformed = + stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { return in.map(new Function<Integer, Integer>() { @@ -921,6 +924,46 @@ public class JavaAPISuite implements Serializable { } @Test + public void testPairToNormalRDDTransform() { + List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(1, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(2, 5)), + Arrays.asList( + new Tuple2<Integer, Integer>(2, 5), + new Tuple2<Integer, Integer>(3, 5), + new Tuple2<Integer, Integer>(4, 5), + new Tuple2<Integer, Integer>(1, 5))); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(3,1,4,2), + Arrays.asList(2,3,4,1)); + + JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaDStream<Integer> firstParts = pairStream.transform( + new Function<JavaPairRDD<Integer, Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception { + return in.map(new Function<Tuple2<Integer, Integer>, Integer>() { + @Override + public Integer call(Tuple2<Integer, Integer> in) { + return in._1(); + } + }); + } + }); + + JavaTestUtils.attachTestOutputStream(firstParts); + List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + public void testMapValues() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 52ea28732a..64a7e7cbf9 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -31,8 +31,9 @@ trait JavaTestBase extends TestSuiteBase { * Attach a provided stream to it's associated StreamingContext as a * [[spark.streaming.TestOutputStream]]. **/ - def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( - dstream: JavaDStreamLike[T, This]) = { + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R], + R <: spark.api.java.JavaRDDLike[T, R]]( + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] val ostream = new TestOutputStream(dstream.dstream, |