aboutsummaryrefslogtreecommitdiff
path: root/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java')
-rw-r--r--external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java22
1 files changed, 13 insertions, 9 deletions
diff --git a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 604d818ef1..d0fed303e6 100644
--- a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -29,6 +29,7 @@ import org.junit.Test;
import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
@@ -95,7 +96,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
while (in.hasNext()) {
out = out + in.next().toUpperCase();
}
- return Lists.newArrayList(out);
+ return Lists.newArrayList(out).iterator();
});
JavaTestUtils.attachTestOutputStream(mapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -351,7 +352,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
+ JavaDStream<String> flatMapped = stream.flatMap(
+ s -> Lists.newArrayList(s.split("(?!^)")).iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -360,8 +362,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
@Test
public void testForeachRDD() {
- final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
- final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,1,1),
Arrays.asList(1,1,1));
@@ -375,7 +377,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
});
// This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
- stream.foreachRDD((rdd, time) -> null);
+ stream.foreachRDD((rdd, time) -> {
+ return;
+ });
JavaTestUtils.runStreams(ssc, 2, 2);
@@ -423,7 +427,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
for (String letter : s.split("(?!^)")) {
out.add(new Tuple2<>(s.length(), letter));
}
- return out;
+ return out.iterator();
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -541,7 +545,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
}
- return out;
+ return out.iterator();
});
JavaTestUtils.attachTestOutputStream(reversed);
@@ -598,7 +602,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
for (Character s : in._1().toCharArray()) {
out.add(new Tuple2<>(in._2(), s.toString()));
}
- return out;
+ return out.iterator();
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -871,7 +875,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> {
+ StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> {
// Use all State's methods here
state.exists();
state.get();