aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-12 13:57:57 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-12 14:02:32 -0800
commit3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef (patch)
treebee3c620d3df36ecf7ca27a2c9c38afbbc91f2dd /streaming/src/test/java
parentfd7e414bd0eab4f8d82e225d9981d2eba036e756 (diff)
downloadspark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.tar.gz
spark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.tar.bz2
spark-3f3e77f28b08fc1db110c3b14b2c90eaa6dca8ef.zip
STREAMING-50: Support transform workaround in JavaPairDStream
This ports a useful workaround (the `transform` function) to JavaPairDStream. It is necessary to do things like sorting which are not supported yet in the core streaming API.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java45
1 files changed, 45 insertions, 0 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 9bfcd83e4d..7b385f609d 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -11,6 +11,7 @@ import org.junit.Before;
import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
+import spark.api.java.JavaPairRDD;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
@@ -873,6 +874,50 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testPairTransform() {
+ List<List<Tuple2<Integer, Integer>>> inputData = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(2, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5),
+ new Tuple2<Integer, Integer>(1, 5)));
+
+ List<List<Tuple2<Integer, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)),
+ Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 5),
+ new Tuple2<Integer, Integer>(2, 5),
+ new Tuple2<Integer, Integer>(3, 5),
+ new Tuple2<Integer, Integer>(4, 5)));
+
+ JavaDStream<Tuple2<Integer, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ ssc, inputData, 1);
+ JavaPairDStream<Integer, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<Integer, Integer> sorted = pairStream.transform(
+ new Function<JavaPairRDD<Integer, Integer>, JavaPairRDD<Integer, Integer>>() {
+ @Override
+ public JavaPairRDD<Integer, Integer> call(JavaPairRDD<Integer, Integer> in) throws Exception {
+ return in.sortByKey();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(sorted);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
public void testMapValues() {
List<List<Tuple2<String, String>>> inputData = stringStringKVStream;