diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-02-11 09:21:06 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-02-11 10:03:37 -0800 |
commit | 20cf77054536acd9c064d6e7ffedce23a87fb6a5 (patch) | |
tree | 962aba41f08bcb6d23f3a1f8a9063ddf2179abf6 /streaming/src | |
parent | 314d87a038d84c4ae9a6471ea19a5431153ea604 (diff) | |
download | spark-20cf77054536acd9c064d6e7ffedce23a87fb6a5.tar.gz spark-20cf77054536acd9c064d6e7ffedce23a87fb6a5.tar.bz2 spark-20cf77054536acd9c064d6e7ffedce23a87fb6a5.zip |
Fix for flatmap
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala | 4 | ||||
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 42 |
2 files changed, 44 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 39fe0d0ccc..9cc263930e 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -78,10 +78,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ - def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = { + def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 4cf9d115ae..ec4e5ae18b 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -563,6 +563,48 @@ public class JavaAPISuite implements Serializable { } @Test + public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair + List<List<Tuple2<String, Integer>>> inputData = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("hi", 1), + new Tuple2<String, Integer>("ho", 2)), + Arrays.asList( + new Tuple2<String, Integer>("hi", 1), + new Tuple2<String, Integer>("ho", 2))); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "h"), + new Tuple2<Integer, String>(1, "i"), + new Tuple2<Integer, String>(2, "h"), + new Tuple2<Integer, String>(2, "o")), + Arrays.asList( + new Tuple2<Integer, String>(1, "h"), + new Tuple2<Integer, String>(1, "i"), + new Tuple2<Integer, String>(2, "h"), + new Tuple2<Integer, String>(2, "o"))); + + JavaDStream<Tuple2<String, Integer>> stream = + JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap( + new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception { + List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>(); + for (Character s: in._1().toCharArray()) { + out.add(new Tuple2<Integer, String>(in._2(), s.toString())); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test public void testPairGroupByKey() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; |