aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-05 15:06:20 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commitf144e0413a1e42d193a86fa04af769e2da9dc58b (patch)
tree1678a899ef0052021d995de14e612e718c0c11eb /streaming
parent0d0bab25bd0dfefdd5a91d22a4e81d347d255cf3 (diff)
downloadspark-f144e0413a1e42d193a86fa04af769e2da9dc58b.tar.gz
spark-f144e0413a1e42d193a86fa04af769e2da9dc58b.tar.bz2
spark-f144e0413a1e42d193a86fa04af769e2da9dc58b.zip
Adding transform and union
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala14
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java62
2 files changed, 72 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index d0fa06ba7b..56e54c719a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -86,6 +86,20 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) = {
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
}
+
+ def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T]): RDD[U] = {
+ transformFunc.call(new JavaRDD[T](in)).rdd
+ }
+ dstream.transform(scalaTransform(_))
+ }
+ // TODO: transform with time
+
+ def union(that: JavaDStream[T]): JavaDStream[T] = {
+ dstream.union(that.dstream)
+ }
}
object JavaDStream {
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 2d1b0f35f9..c4629c8d97 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -5,6 +5,7 @@ import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import spark.api.java.JavaRDD;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function;
import spark.api.java.function.Function2;
@@ -13,10 +14,7 @@ import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaStreamingContext;
import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -271,6 +269,62 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(expected, result);
}
+ @Test
+ public void testTransform() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3,4,5),
+ Arrays.asList(6,7,8),
+ Arrays.asList(9,10,11));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return in.map(new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i + 2;
+ }
+ });
+ }});
+ JavaTestUtils.attachTestOutputStream(transformed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testUnion() {
+ List<List<Integer>> inputData1 = Arrays.asList(
+ Arrays.asList(1,1),
+ Arrays.asList(2,2),
+ Arrays.asList(3,3));
+
+ List<List<Integer>> inputData2 = Arrays.asList(
+ Arrays.asList(4,4),
+ Arrays.asList(5,5),
+ Arrays.asList(6,6));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,1,4,4),
+ Arrays.asList(2,2,5,5),
+ Arrays.asList(3,3,6,6));
+
+ JavaDStream stream1 = JavaTestUtils.attachTestInputStream(sc, inputData1, 2);
+ JavaDStream stream2 = JavaTestUtils.attachTestInputStream(sc, inputData2, 2);
+
+ JavaDStream unioned = stream1.union(stream2);
+ JavaTestUtils.attachTestOutputStream(unioned);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
/*
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.