aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-11 19:19:05 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-11 19:19:05 -0800
commitfd7e414bd0eab4f8d82e225d9981d2eba036e756 (patch)
tree2cc4b83593a44e8180ea8ae574f571e06a2fa15c /streaming
parentbfeed4725df47b578cb0e735e1ffbc836acde673 (diff)
parentd09c36065ca040044530a50f0392c92866b6d301 (diff)
downloadspark-fd7e414bd0eab4f8d82e225d9981d2eba036e756.tar.gz
spark-fd7e414bd0eab4f8d82e225d9981d2eba036e756.tar.bz2
spark-fd7e414bd0eab4f8d82e225d9981d2eba036e756.zip
Merge pull request #464 from pwendell/java-type-fix
SPARK-694: All references to [K, V] in JavaDStreamLike should be changed to [K2, V2]
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala12
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java141
2 files changed, 144 insertions, 9 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index b93cb7865a..ec546c8190 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -59,8 +59,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/** Return a new DStream by applying a function to all elements of this DStream. */
- def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -78,10 +78,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = {
+ def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -100,8 +100,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
- def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
- : JavaPairDStream[K, V] = {
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
+ : JavaPairDStream[K2, V2] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
}
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 79d6093429..9bfcd83e4d 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -507,6 +507,141 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 1)));
@Test
+ public void testPairMap() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.map(
+ new PairFunction<Tuple2<String, Integer>, Integer, String>() {
+ @Override
+ public Tuple2<Integer, String> call(Tuple2<String, Integer> in) throws Exception {
+ return in.swap();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMapPartitions() { // Maps pair -> pair of different type
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "california"),
+ new Tuple2<Integer, String>(3, "california"),
+ new Tuple2<Integer, String>(4, "new york"),
+ new Tuple2<Integer, String>(1, "new york")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(5, "california"),
+ new Tuple2<Integer, String>(3, "new york"),
+ new Tuple2<Integer, String>(1, "new york")));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> reversed = pairStream.mapPartitions(
+ new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) throws Exception {
+ LinkedList<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ while (in.hasNext()) {
+ Tuple2<String, Integer> next = in.next();
+ out.add(next.swap());
+ }
+ return out;
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairMap2() { // Maps pair -> single
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1, 3, 4, 1),
+ Arrays.asList(5, 5, 3, 1));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Integer> reversed = pairStream.map(
+ new Function<Tuple2<String, Integer>, Integer>() {
+ @Override
+ public Integer call(Tuple2<String, Integer> in) throws Exception {
+ return in._2();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(reversed);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairToPairFlatMapWithChangingTypes() { // Maps pair -> pair
+ List<List<Tuple2<String, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("hi", 1),
+ new Tuple2<String, Integer>("ho", 2)));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(1, "h"),
+ new Tuple2<Integer, String>(1, "i"),
+ new Tuple2<Integer, String>(2, "h"),
+ new Tuple2<Integer, String>(2, "o")));
+
+ JavaDStream<Tuple2<String, Integer>> stream =
+ JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaPairDStream<Integer, String> flatMapped = pairStream.flatMap(
+ new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) throws Exception {
+ List<Tuple2<Integer, String>> out = new LinkedList<Tuple2<Integer, String>>();
+ for (Character s : in._1().toCharArray()) {
+ out.add(new Tuple2<Integer, String>(in._2(), s.toString()));
+ }
+ return out;
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testPairGroupByKey() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
@@ -570,7 +705,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
new Function<Integer, Integer>() {
- @Override
+ @Override
public Integer call(Integer i) throws Exception {
return i;
}
@@ -668,7 +803,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){
+ new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
int out = 0;
@@ -680,7 +815,7 @@ public class JavaAPISuite implements Serializable {
}
return Optional.of(out);
}
- });
+ });
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);