diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-02-05 19:02:46 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-02-11 10:03:37 -0800 |
commit | f0b68c623c116540470e06967c1554855d16a500 (patch) | |
tree | 284bf52faff6dbc5bad9f85fc5a5f6659b1620b4 | |
parent | b1d809913b42d8eaf8bc0cc8b4f754c896c6c0b9 (diff) | |
download | spark-f0b68c623c116540470e06967c1554855d16a500.tar.gz spark-f0b68c623c116540470e06967c1554855d16a500.tar.bz2 spark-f0b68c623c116540470e06967c1554855d16a500.zip |
Initial cut at replacing K, V in Java files
3 files changed, 82 insertions, 2 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 934e4c2f67..9ffe7c5f99 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); } + + @Test + public void mapOnPairRDD() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); + JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i, i % 2); + } + }); + JavaPairRDD<Integer, Integer> rdd3 = rdd2.map( + new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception { + return new Tuple2<Integer, Integer>(in._2(), in._1()); + } + }); + Assert.assertEquals(Arrays.asList( + new Tuple2<Integer, Integer>(1, 1), + new Tuple2<Integer, Integer>(0, 2), + new Tuple2<Integer, Integer>(1, 3), + new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); + + } } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index b93cb7865a..39fe0d0ccc 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -59,8 +59,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** Return a new DStream by applying a function to all elements of this DStream. */ - def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { - def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = { + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) } diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 79d6093429..26ac82b71a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -507,6 +507,62 @@ public class JavaAPISuite implements Serializable { new Tuple2<String, Integer>("new york", 1))); @Test + public void testPairMap() { // Maps pair -> pair + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(1, "california"), + new Tuple2<Integer, String>(3, "california"), + new Tuple2<Integer, String>(4, "new york"), + new Tuple2<Integer, String>(1, "new york")), + Arrays.asList( + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(5, "california"), + new Tuple2<Integer, String>(3, "new york"), + new Tuple2<Integer, String>(1, "new york"))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaPairDStream<Integer, String> reversed = pairStream.map( + new PairFunction<Tuple2<String, Integer>, Integer, String>() { + @Override + public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception { + return new Tuple2(in._2(), in._1()); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairMap2() { // Maps pair -> single + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1, 3, 4, 1), + Arrays.asList(5, 5, 3, 1)); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + JavaDStream<Integer> reversed = pairStream.map( + new Function<Tuple2<String, Integer>, Integer>() { + @Override + public Integer call(Tuple2<String, Integer> in) throws Exception { + return in._2(); + } + }); + + JavaTestUtils.attachTestOutputStream(reversed); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test public void testPairGroupByKey() { List<List<Tuple2<String, String>>> inputData = stringStringKVStream; |