From c65988bdc1b75e88e6df77df0b84fc3a34c5b028 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 11 Feb 2013 09:51:57 -0800 Subject: Fix for MapPartitions --- .../spark/streaming/api/java/JavaDStreamLike.scala | 4 +- .../test/java/spark/streaming/JavaAPISuite.java | 67 +++++++++++++++++----- 2 files changed, 54 insertions(+), 17 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 9cc263930e..ec546c8190 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -100,8 +100,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition * of the RDD. */ - def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) - : JavaPairDStream[K, V] = { + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]) + : JavaPairDStream[K2, V2] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index ec4e5ae18b..67d82d546f 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -507,7 +507,7 @@ public class JavaAPISuite implements Serializable { new Tuple2("new york", 1))); @Test - public void testPairMap() { // Maps pair -> pair + public void testPairMap() { // Maps pair -> pair of different type List>> inputData = stringIntKVStream; List>> expected = Arrays.asList( @@ -538,6 +538,43 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, result); } + @Test + public void testPairMapPartitions() { // Maps pair -> pair of different type + List>> inputData = stringIntKVStream; + + List>> expected = Arrays.asList( + Arrays.asList( + new Tuple2(1, "california"), + new Tuple2(3, "california"), + new Tuple2(4, "new york"), + new Tuple2(1, "new york")), + Arrays.asList( + new Tuple2(5, "california"), + new Tuple2(5, "california"), + new Tuple2(3, "new york"), + new Tuple2(1, "new york"))); + + JavaDStream> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream reversed = pairStream.mapPartitions( + new PairFlatMapFunction>, Integer, String>() { + @Override + public Iterable> call(Iterator> in) throws Exception { + LinkedList> out = new LinkedList>(); + while (in.hasNext()) { + Tuple2 next = in.next(); + out.add(new Tuple2(next._2(), next._1())); + } + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + @Test public void testPairMap2() { // Maps pair -> single List>> inputData = stringIntKVStream; @@ -588,16 +625,16 @@ public class JavaAPISuite implements Serializable { JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream flatMapped = pairStream.flatMap( - new PairFlatMapFunction, Integer, String>() { - @Override - public Iterable> call(Tuple2 in) throws Exception { - List> out = new LinkedList>(); - for (Character s: in._1().toCharArray()) { - out.add(new Tuple2(in._2(), s.toString())); - } - return out; - } - }); + new PairFlatMapFunction, Integer, String>() { + @Override + public Iterable> call(Tuple2 in) throws Exception { + List> out = new LinkedList>(); + for (Character s : in._1().toCharArray()) { + out.add(new Tuple2(in._2(), s.toString())); + } + return out; + } + }); JavaTestUtils.attachTestOutputStream(flatMapped); List>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -668,7 +705,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream combined = pairStream.combineByKey( new Function() { - @Override + @Override public Integer call(Integer i) throws Exception { return i; } @@ -766,19 +803,19 @@ public class JavaAPISuite implements Serializable { JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream updated = pairStream.updateStateByKey( - new Function2, Optional, Optional>(){ + new Function2, Optional, Optional>() { @Override public Optional call(List values, Optional state) { int out = 0; if (state.isPresent()) { out = out + state.get(); } - for (Integer v: values) { + for (Integer v : values) { out = out + v; } return Optional.of(out); } - }); + }); JavaTestUtils.attachTestOutputStream(updated); List>> result = JavaTestUtils.runStreams(ssc, 3, 3); -- cgit v1.2.3