aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-05 19:02:46 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-11 10:03:37 -0800
commitf0b68c623c116540470e06967c1554855d16a500 (patch)
tree284bf52faff6dbc5bad9f85fc5a5f6659b1620b4
parentb1d809913b42d8eaf8bc0cc8b4f754c896c6c0b9 (diff)
downloadspark-f0b68c623c116540470e06967c1554855d16a500.tar.gz
spark-f0b68c623c116540470e06967c1554855d16a500.tar.bz2
spark-f0b68c623c116540470e06967c1554855d16a500.zip
Initial cut at replacing K, V in Java files
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java24
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala4
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java56
3 files changed, 82 insertions, 2 deletions
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 934e4c2f67..9ffe7c5f99 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+ JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+ new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+ return new Tuple2<Integer, Integer>(in._2(), in._1());
+ }
+ });
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(0, 2),
+ new Tuple2<Integer, Integer>(1, 3),
+ new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+
+ }
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b93cb7865a..39fe0d0ccc 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -59,8 +59,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 79d6093429..26ac82b71a 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -507,6 +507,62 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 1)));
@Test
+ public void testPairMap() { // Maps pair -> pair
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.map(
+ new PairFunction<Tuple2<String, Integer>, Integer, String>() {
+ @Override
+ public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+ return new Tuple2(in._2(), in._1());
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMap2() { // Maps pair -> single
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1, 3, 4, 1),
+ Arrays.asList(5, 5, 3, 1));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> reversed = pairStream.map(
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;