diff options
Diffstat (limited to 'external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java')
-rw-r--r-- | external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 604d818ef1..d0fed303e6 100644 --- a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.PairFunction; @@ -95,7 +96,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ while (in.hasNext()) { out = out + in.next().toUpperCase(); } - return Lists.newArrayList(out); + return Lists.newArrayList(out).iterator(); }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -351,7 +352,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)"))); + JavaDStream<String> flatMapped = stream.flatMap( + s -> Lists.newArrayList(s.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -360,8 +362,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ @Test public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sc().accumulator(0); + final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0); + final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0); List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,1,1), Arrays.asList(1,1,1)); @@ -375,7 +377,9 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ }); // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD((rdd, time) -> null); + stream.foreachRDD((rdd, time) -> { + return; + }); JavaTestUtils.runStreams(ssc, 2, 2); @@ -423,7 +427,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ for (String letter : s.split("(?!^)")) { out.add(new Tuple2<>(s.length(), letter)); } - return out; + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(flatMapped); @@ -541,7 +545,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Tuple2<String, Integer> next = in.next(); out.add(next.swap()); } - return out; + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(reversed); @@ -598,7 +602,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ for (Character s : in._1().toCharArray()) { out.add(new Tuple2<>(in._2(), s.toString())); } - return out; + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(flatMapped); @@ -871,7 +875,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream = wordsDstream.mapWithState( - StateSpec.<String, Integer, Boolean, Double> function((time, key, value, state) -> { + StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> { // Use all State's methods here state.exists(); state.get(); |