From f0b68c623c116540470e06967c1554855d16a500 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 5 Feb 2013 19:02:46 -0800 Subject: Initial cut at replacing K, V in Java files --- .../test/java/spark/streaming/JavaAPISuite.java | 56 ++++++++++++++++++++++ 1 file changed, 56 insertions(+) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 79d6093429..26ac82b71a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -506,6 +506,62 @@ public class JavaAPISuite implements Serializable { new Tuple2("new york", 3), new Tuple2("new york", 1))); + @Test + public void testPairMap() { // Maps pair -> pair + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(1, "california"), + new Tuple2(3, "california"), + new Tuple2(4, "new york"), + new Tuple2(1, "new york")), + Arrays.asList( + new Tuple2(5, "california"), + new Tuple2(5, "california"), + new Tuple2(3, "new york"), + new Tuple2(1, "new york"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream reversed = pairStream.map( + new PairFunction, Integer, String>() { + @Override + public Tuple2 call(Tuple2 in) throws Exception { + return new Tuple2(in._2(), in._1()); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMap2() { // Maps pair -> single + List>> inputData = stringIntKVStream; + + List> expected = Arrays.asList( + Arrays.asList(1, 3, 4, 1), + Arrays.asList(5, 5, 3, 1)); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream reversed = pairStream.map( + new Function, Integer>() { + @Override + public Integer call(Tuple2 in) throws Exception { + return in._2(); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + @Test public void testPairGroupByKey() { List>> inputData = stringStringKVStream; -- cgit v1.2.3 From 314d87a038d84c4ae9a6471ea19a5431153ea604 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 11 Feb 2013 09:20:37 -0800 Subject: Indentation fix --- .../src/test/java/spark/streaming/JavaAPISuite.java | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 26ac82b71a..4cf9d115ae 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -511,16 +511,16 @@ public class JavaAPISuite implements Serializable { List>> inputData = stringIntKVStream; List>> expected = Arrays.asList( - Arrays.asList( - new Tuple2(1, "california"), - new Tuple2(3, "california"), - new Tuple2(4, "new york"), - new Tuple2(1, "new york")), - Arrays.asList( - new Tuple2(5, "california"), - new Tuple2(5, "california"), - new Tuple2(3, "new york"), - new Tuple2(1, "new york"))); + Arrays.asList( + new Tuple2(1, "california"), + new Tuple2(3, "california"), + new Tuple2(4, "new york"), + new Tuple2(1, "new york")), + Arrays.asList( + new Tuple2(5, "california"), + new Tuple2(5, "california"), + new Tuple2(3, "new york"), + new Tuple2(1, "new york"))); JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); -- cgit v1.2.3 From 20cf77054536acd9c064d6e7ffedce23a87fb6a5 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 11 Feb 2013 09:21:06 -0800 Subject: Fix for flatmap --- .../spark/streaming/api/java/JavaDStreamLike.scala | 4 +-- .../test/java/spark/streaming/JavaAPISuite.java | 42 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 39fe0d0ccc..9cc263930e 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -78,10 +78,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = { + def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 4cf9d115ae..ec4e5ae18b 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -562,6 +562,48 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + @Test + public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair + List>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2("hi", 1), + new Tuple2("ho", 2)), + Arrays.asList( + new Tuple2("hi", 1), + new Tuple2("ho", 2))); + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(1, "h"), + new Tuple2(1, "i"), + new Tuple2(2, "h"), + new Tuple2(2, "o")), + Arrays.asList( + new Tuple2(1, "h"), + new Tuple2(1, "i"), + new Tuple2(2, "h"), + new Tuple2(2, "o"))); + + JavaDStream> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream flatMapped = pairStream.flatMap( + new PairFlatMapFunction, Integer, String>() { + @Override + public Iterable> call(Tuple2 in) throws Exception { + List> out = new LinkedList>(); + for (Character s: in._1().toCharArray()) { + out.add(new Tuple2(in._2(), s.toString())); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + @Test public void testPairGroupByKey() { List>> inputData = stringStringKVStream; -- cgit v1.2.3 From c65988bdc1b75e88e6df77df0b84fc3a34c5b028 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 11 Feb 2013 09:51:57 -0800 Subject: Fix for MapPartitions --- .../spark/streaming/api/java/JavaDStreamLike.scala | 4 +- .../test/java/spark/streaming/JavaAPISuite.java | 67 +++++++++++++++++----- 2 files changed, 54 insertions(+), 17 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 9cc263930e..ec546c8190 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -100,8 +100,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition * of the RDD. */ - def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) - : JavaPairDStream[K, V] = { + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]) + : JavaPairDStream[K2, V2] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index ec4e5ae18b..67d82d546f 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -507,7 +507,7 @@ public class JavaAPISuite implements Serializable { new Tuple2("new york", 1))); @Test - public void testPairMap() { // Maps pair -> pair + public void testPairMap() { // Maps pair -> pair of different type List>> inputData = stringIntKVStream; List>> expected = Arrays.asList( @@ -538,6 +538,43 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + @Test + public void testPairMapPartitions() { // Maps pair -> pair of different type + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(1, "california"), + new Tuple2(3, "california"), + new Tuple2(4, "new york"), + new Tuple2(1, "new york")), + Arrays.asList( + new Tuple2(5, "california"), + new Tuple2(5, "california"), + new Tuple2(3, "new york"), + new Tuple2(1, "new york"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream reversed = pairStream.mapPartitions( + new PairFlatMapFunction>, Integer, String>() { + @Override + public Iterable> call(Iterator> in) throws Exception { + LinkedList> out = new LinkedList>(); + while (in.hasNext()) { + Tuple2 next = in.next(); + out.add(new Tuple2(next._2(), next._1())); + } + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + @Test public void testPairMap2() { // Maps pair -> single List>> inputData = stringIntKVStream; @@ -588,16 +625,16 @@ public class JavaAPISuite implements Serializable { JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream flatMapped = pairStream.flatMap( - new PairFlatMapFunction, Integer, String>() { - @Override - public Iterable> call(Tuple2 in) throws Exception { - List> out = new LinkedList>(); - for (Character s: in._1().toCharArray()) { - out.add(new Tuple2(in._2(), s.toString())); - } - return out; - } - }); + new PairFlatMapFunction, Integer, String>() { + @Override + public Iterable> call(Tuple2 in) throws Exception { + List> out = new LinkedList>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2(in._2(), s.toString())); + } + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -668,7 +705,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream combined = pairStream.combineByKey( new Function() { - @Override + @Override public Integer call(Integer i) throws Exception { return i; } @@ -766,19 +803,19 @@ public class JavaAPISuite implements Serializable { JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>(){ + new Function2, Optional, Optional>() { @Override public Optional call(List values, Optional state) { int out = 0; if (state.isPresent()) { out = out + state.get(); } - for (Integer v: values) { + for (Integer v : values) { out = out + v; } return Optional.of(out); } - }); + }); JavaTestUtils.attachTestOutputStream(updated); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); -- cgit v1.2.3 From 04786d07391c4052d6dc42ff0828a79a37bbbfdf Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 11 Feb 2013 10:05:49 -0800 Subject: small fix --- streaming/src/test/java/spark/streaming/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 67d82d546f..551d4f15e4 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -810,12 +810,12 @@ public class JavaAPISuite implements Serializable { if (state.isPresent()) { out = out + state.get(); } - for (Integer v : values) { + for (Integer v: values) { out = out + v; } return Optional.of(out); } - }); + }); JavaTestUtils.attachTestOutputStream(updated); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); -- cgit v1.2.3 From d09c36065ca040044530a50f0392c92866b6d301 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 11 Feb 2013 10:45:45 -0800 Subject: Using tuple swap() --- streaming/src/test/java/spark/streaming/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 551d4f15e4..9bfcd83e4d 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -528,7 +528,7 @@ public class JavaAPISuite implements Serializable { new PairFunction, Integer, String>() { @Override public Tuple2 call(Tuple2 in) throws Exception { - return new Tuple2(in._2(), in._1()); + return in.swap(); } }); @@ -563,7 +563,7 @@ public class JavaAPISuite implements Serializable { LinkedList> out = new LinkedList>(); while (in.hasNext()) { Tuple2 next = in.next(); - out.add(new Tuple2(next._2(), next._1())); + out.add(next.swap()); } return out; } -- cgit v1.2.3 From 3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 12 Feb 2013 13:57:57 -0800 Subject: STREAMING-50: Support transform workaround in JavaPairDStream This ports a useful workaround (the `transform` function) to JavaPairDStream. It is necessary to do things like sorting which are not supported yet in the core streaming API. --- .../spark/streaming/api/java/JavaPairDStream.scala | 34 +++++++++++++++- .../test/java/spark/streaming/JavaAPISuite.java | 45 ++++++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index ef10c091ca..eb2495e3ac 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -8,11 +8,11 @@ import scala.collection.JavaConversions._ import spark.streaming._ import spark.streaming.StreamingContext._ import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import spark.Partitioner +import spark.{RDD, Partitioner} import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration -import spark.api.java.JavaPairRDD +import spark.api.java.{JavaRDD, JavaPairRDD} import spark.storage.StorageLevel import com.google.common.base.Optional @@ -81,6 +81,36 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = dstream.union(that.dstream) + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction[JavaPairRDD[K, V], JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[(K, V)]): RDD[(K2, V2)] = + transformFunc.call(new JavaPairRDD[K, V](in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[K2, V2](transformFunc: JFunction2[JavaPairRDD[K, V], Time, JavaPairRDD[K2, V2]]): + JavaPairDStream[K2, V2] = { + implicit val cmk: ClassManifest[K2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]] + implicit val cmv: ClassManifest[V2] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]] + def scalaTransform (in: RDD[(K, V)], time: Time): RDD[(K2, V2)] = + transformFunc.call(new JavaPairRDD[K, V](in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + // ======================================================================= // Methods only for PairDStream's // ======================================================================= diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 9bfcd83e4d..7b385f609d 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -11,6 +11,7 @@ import org.junit.Before; import org.junit.Test; import scala.Tuple2; import spark.HashPartitioner; +import spark.api.java.JavaPairRDD; import spark.api.java.JavaRDD; import spark.api.java.JavaSparkContext; import spark.api.java.function.*; @@ -872,6 +873,50 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + @Test + public void testPairTransform() { + List>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2(3, 5), + new Tuple2(1, 5), + new Tuple2(4, 5), + new Tuple2(2, 5)), + Arrays.asList( + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5), + new Tuple2(1, 5))); + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(1, 5), + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5)), + Arrays.asList( + new Tuple2(1, 5), + new Tuple2(2, 5), + new Tuple2(3, 5), + new Tuple2(4, 5))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream( + ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream sorted = pairStream.transform( + new Function, JavaPairRDD>() { + @Override + public JavaPairRDD call(JavaPairRDD in) throws Exception { + return in.sortByKey(); + } + }); + + JavaTestUtils.attachTestOutputStream(sorted); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + @Test public void testMapValues() { List>> inputData = stringStringKVStream; -- cgit v1.2.3