aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-10-22 23:35:51 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-10-22 23:35:51 -0700
commit72d2e1dd777696640f64aaf92fecab64c6387df0 (patch)
treee39435e82233d4d2a8e8fd4eb10dda3683e10b48 /streaming/src/test/java
parent06664987990debcb4439a9dc26e1859508c601f5 (diff)
downloadspark-72d2e1dd777696640f64aaf92fecab64c6387df0.tar.gz
spark-72d2e1dd777696640f64aaf92fecab64c6387df0.tar.bz2
spark-72d2e1dd777696640f64aaf92fecab64c6387df0.zip
Fixed bug in Java transformWith, added more Java testcases for transform and transformWith, added missing variations of Java join and cogroup, updated various Scala and Java API docs.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java219
1 files changed, 204 insertions, 15 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 9f885f07f2..16622a3459 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -320,17 +320,20 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(9,10,11));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> transformed =
- stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
- @Override
- public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
- return in.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer i) throws Exception {
- return i + 2;
- }
- });
- }});
+ JavaDStream<Integer> transformed = stream.transform(
+ new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return in.map(new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i + 2;
+ }
+ });
+ }
+ }
+ );
+
JavaTestUtils.attachTestOutputStream(transformed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -338,6 +341,84 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testVariousTransform() {
+ // tests whether all variations of transform can be called from Java
+
+ List<List<Integer>> inputData = Arrays.asList(Arrays.asList(1));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData, 1));
+
+ JavaDStream<Integer> transformed1 = stream.transform(
+ new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> transformed2 = stream.transform(
+ new Function2<JavaRDD<Integer>, Time, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, Integer> transformed3 = stream.transform(
+ new Function<JavaRDD<Integer>, JavaPairRDD<String, Integer>>() {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, Integer> transformed4 = stream.transform(
+ new Function2<JavaRDD<Integer>, Time, JavaPairRDD<String, Integer>>() {
+ @Override public JavaPairRDD<String, Integer> call(JavaRDD<Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> pairTransformed1 = pairStream.transform(
+ new Function<JavaPairRDD<String, Integer>, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Integer> pairTransformed2 = pairStream.transform(
+ new Function2<JavaPairRDD<String, Integer>, Time, JavaRDD<Integer>>() {
+ @Override public JavaRDD<Integer> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, String> pairTransformed3 = pairStream.transform(
+ new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, String>>() {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<String, String> pairTransformed4 = pairStream.transform(
+ new Function2<JavaPairRDD<String, Integer>, Time, JavaPairRDD<String, String>>() {
+ @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, Integer> in, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ }
+
+ @Test
public void testTransformWith() {
List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
@@ -374,10 +455,18 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.transformWith(
pairStream2,
- new Function3<JavaPairRDD<String, String>, JavaPairRDD<String, String>, Time, JavaPairRDD<String, Tuple2<String, String>>>() {
- @Override
- public JavaPairRDD<String, Tuple2<String, String>> call(JavaPairRDD<String, String> stringStringJavaPairRDD, JavaPairRDD<String, String> stringStringJavaPairRDD2, Time time) throws Exception {
- return stringStringJavaPairRDD.join(stringStringJavaPairRDD2);
+ new Function3 <
+ JavaPairRDD<String, String>,
+ JavaPairRDD<String, String>,
+ Time,
+ JavaPairRDD<String, Tuple2<String, String>>
+ >() {
+ @Override public JavaPairRDD<String, Tuple2<String, String>> call(
+ JavaPairRDD<String, String> rdd1,
+ JavaPairRDD<String, String> rdd2,
+ Time time
+ ) throws Exception {
+ return rdd1.join(rdd2);
}
}
);
@@ -389,6 +478,106 @@ public class JavaAPISuite implements Serializable {
}
+ @Test
+ public void testVariousTransformWith() {
+ // tests whether all variations of transformWith can be called from Java
+
+ List<List<Integer>> inputData1 = Arrays.asList(Arrays.asList(1));
+ List<List<String>> inputData2 = Arrays.asList(Arrays.asList("x"));
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 1);
+ JavaDStream<String> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 1);
+
+ List<List<Tuple2<String, Integer>>> pairInputData1 =
+ Arrays.asList(Arrays.asList(new Tuple2<String, Integer>("x", 1)));
+ List<List<Tuple2<Double, Character>>> pairInputData2 =
+ Arrays.asList(Arrays.asList(new Tuple2<Double, Character>(1.0, 'x')));
+ JavaPairDStream<String, Integer> pairStream1 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData1, 1));
+ JavaPairDStream<Double, Character> pairStream2 = JavaPairDStream.fromJavaDStream(
+ JavaTestUtils.attachTestInputStream(ssc, pairInputData2, 1));
+
+ JavaDStream<Double> transformed1 = stream1.transformWith(
+ stream2,
+ new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> transformed2 = stream1.transformWith(
+ pairStream1,
+ new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> transformed3 = stream1.transformWith(
+ stream2,
+ new Function3<JavaRDD<Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> transformed4 = stream1.transformWith(
+ pairStream1,
+ new Function3<JavaRDD<Integer>, JavaPairRDD<String, Integer>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaRDD<Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> pairTransformed1 = pairStream1.transformWith(
+ stream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaDStream<Double> pairTransformed2_ = pairStream1.transformWith(
+ pairStream1,
+ new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>, Time, JavaRDD<Double>>() {
+ @Override
+ public JavaRDD<Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<String, Integer> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+ JavaPairDStream<Double, Double> pairTransformed3 = pairStream1.transformWith(
+ stream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaRDD<String>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaRDD<String> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+
+
+ JavaPairDStream<Double, Double> pairTransformed4 = pairStream1.transformWith(
+ pairStream2,
+ new Function3<JavaPairRDD<String, Integer>, JavaPairRDD<Double, Character>, Time, JavaPairRDD<Double, Double>>() {
+ @Override
+ public JavaPairRDD<Double, Double> call(JavaPairRDD<String, Integer> rdd1, JavaPairRDD<Double, Character> rdd2, Time time) throws Exception {
+ return null;
+ }
+ }
+ );
+ }
+
@Test
public void testFlatMap() {
List<List<String>> inputData = Arrays.asList(