From f144e0413a1e42d193a86fa04af769e2da9dc58b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 5 Jan 2013 15:06:20 -0800 Subject: Adding transform and union --- .../spark/streaming/api/java/JavaDStream.scala | 14 +++++ .../test/scala/spark/streaming/JavaAPISuite.java | 62 ++++++++++++++++++++-- 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index d0fa06ba7b..56e54c719a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -86,6 +86,20 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = { dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) } + + def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T]): RDD[U] = { + transformFunc.call(new JavaRDD[T](in)).rdd + } + dstream.transform(scalaTransform(_)) + } + // TODO: transform with time + + def union(that: JavaDStream[T]): JavaDStream[T] = { + dstream.union(that.dstream) + } } object JavaDStream { diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 2d1b0f35f9..c4629c8d97 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -5,6 +5,7 @@ import org.junit.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; +import spark.api.java.JavaRDD; import spark.api.java.function.FlatMapFunction; import spark.api.java.function.Function; import spark.api.java.function.Function2; @@ -13,10 +14,7 @@ import spark.streaming.api.java.JavaDStream; import spark.streaming.api.java.JavaStreamingContext; import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; +import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; @@ -271,6 +269,62 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + @Test + public void testTransform() { + List> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List> expected = Arrays.asList( + Arrays.asList(3,4,5), + Arrays.asList(6,7,8), + Arrays.asList(9,10,11)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream transformed = stream.transform(new Function, JavaRDD>() { + @Override + public JavaRDD call(JavaRDD in) throws Exception { + return in.map(new Function() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + }}); + JavaTestUtils.attachTestOutputStream(transformed); + List> result = JavaTestUtils.runStreams(sc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testUnion() { + List> inputData1 = Arrays.asList( + Arrays.asList(1,1), + Arrays.asList(2,2), + Arrays.asList(3,3)); + + List> inputData2 = Arrays.asList( + Arrays.asList(4,4), + Arrays.asList(5,5), + Arrays.asList(6,6)); + + List> expected = Arrays.asList( + Arrays.asList(1,1,4,4), + Arrays.asList(2,2,5,5), + Arrays.asList(3,3,6,6)); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(sc, inputData1, 2); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(sc, inputData2, 2); + + JavaDStream unioned = stream1.union(stream2); + JavaTestUtils.attachTestOutputStream(unioned); + List> result = JavaTestUtils.runStreams(sc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + /* * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. -- cgit v1.2.3