diff options
Diffstat (limited to 'external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java')
-rw-r--r-- | external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java index 604d818ef1..67bc64a444 100644 --- a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java +++ b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.apache.spark.Accumulator; import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.PairFunction; @@ -95,7 +96,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ while (in.hasNext()) { out = out + in.next().toUpperCase(); } - return Lists.newArrayList(out); + return Lists.newArrayList(out).iterator(); }); JavaTestUtils.attachTestOutputStream(mapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -351,7 +352,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s")); JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)"))); + JavaDStream<String> flatMapped = stream.flatMap( + s -> Lists.newArrayList(s.split("(?!^)")).iterator()); JavaTestUtils.attachTestOutputStream(flatMapped); List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -360,8 +362,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ @Test public void testForeachRDD() { - final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0); - final Accumulator<Integer> accumEle = ssc.sc().accumulator(0); + final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0); + final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0); List<List<Integer>> inputData = Arrays.asList( Arrays.asList(1,1,1), Arrays.asList(1,1,1)); @@ -375,7 +377,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ }); // This is a test to make sure foreachRDD(VoidFunction2) can be called from Java - stream.foreachRDD((rdd, time) -> null); + stream.foreachRDD((rdd, time) -> { return; }); JavaTestUtils.runStreams(ssc, 2, 2); @@ -423,7 +425,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ for (String letter : s.split("(?!^)")) { out.add(new Tuple2<>(s.length(), letter)); } - return out; + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(flatMapped); @@ -541,7 +543,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ Tuple2<String, Integer> next = in.next(); out.add(next.swap()); } - return out; + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(reversed); @@ -598,7 +600,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ for (Character s : in._1().toCharArray()) { out.add(new Tuple2<>(in._2(), s.toString())); } - return out; + return out.iterator(); }); JavaTestUtils.attachTestOutputStream(flatMapped); |