aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-17 09:04:56 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 18:41:58 -0800
commit61b877c688d6c9a4e9c4d8f22ca0cadae29895bb (patch)
treea1d9d368d7020078b113c5de9b4335f3a5f76fcd /streaming/src
parentd5570c7968baba1c1fe86c68dc1c388fae23907b (diff)
downloadspark-61b877c688d6c9a4e9c4d8f22ca0cadae29895bb.tar.gz
spark-61b877c688d6c9a4e9c4d8f22ca0cadae29895bb.tar.bz2
spark-61b877c688d6c9a4e9c4d8f22ca0cadae29895bb.zip
Adding flatMap
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala29
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java81
2 files changed, 106 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 32df665a98..b93cb7865a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -65,6 +65,27 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: T) => f.apply(x).asScala
+ new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: T) => f.apply(x).asScala
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+ }
+
+ /**
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
@@ -151,4 +172,12 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
transformFunc.call(new JavaRDD[T](in), time).rdd
dstream.transform(scalaTransform(_, _))
}
+
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
+ def checkpoint(interval: Duration) = {
+ dstream.checkpoint(interval)
+ }
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 549fb5b733..41fd9f99ff 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -11,10 +11,7 @@ import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaRDD;
-import spark.api.java.function.FlatMapFunction;
-import spark.api.java.function.Function;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
+import spark.api.java.function.*;
import spark.storage.StorageLevel;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaPairDStream;
@@ -309,6 +306,82 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("go", "giants"),
+ Arrays.asList("boo", "dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("g","o","g","i","a","n","t","s"),
+ Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
+ Arrays.asList("a","t","h","l","e","t","i","c","s"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split("(?!^)"));
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<String>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testPairFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(6, "g"),
+ new Tuple2<Integer, String>(6, "i"),
+ new Tuple2<Integer, String>(6, "a"),
+ new Tuple2<Integer, String>(6, "n"),
+ new Tuple2<Integer, String>(6, "t"),
+ new Tuple2<Integer, String>(6, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "o"),
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "g"),
+ new Tuple2<Integer, String>(7, "e"),
+ new Tuple2<Integer, String>(7, "r"),
+ new Tuple2<Integer, String>(7, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(9, "a"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "h"),
+ new Tuple2<Integer, String>(9, "l"),
+ new Tuple2<Integer, String>(9, "e"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "i"),
+ new Tuple2<Integer, String>(9, "c"),
+ new Tuple2<Integer, String>(9, "s")));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
+ }
+ return out;
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testUnion() {
List<List<Integer>> inputData1 = Arrays.asList(
Arrays.asList(1,1),