aboutsummaryrefslogtreecommitdiff
path: root/external/java8-tests/src/test/java/org/apache/spark
diff options
context:
space:
mode:
Diffstat (limited to 'external/java8-tests/src/test/java/org/apache/spark')
-rw-r--r--external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java10
-rw-r--r--external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java18
2 files changed, 15 insertions, 13 deletions
diff --git a/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index c0b58e713f..6ac5ca9cf5 100644
--- a/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/external/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -188,7 +188,7 @@ public class Java8APISuite implements Serializable {
public void flatMap() {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
"The quick brown fox jumps over the lazy dog."));
- JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
+ JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
Assert.assertEquals("Hello", words.first());
Assert.assertEquals(11, words.count());
@@ -198,7 +198,7 @@ public class Java8APISuite implements Serializable {
for (String word : s.split(" ")) {
pairs2.add(new Tuple2<>(word, word));
}
- return pairs2;
+ return pairs2.iterator();
});
Assert.assertEquals(new Tuple2<>("Hello", "Hello"), pairs.first());
@@ -209,7 +209,7 @@ public class Java8APISuite implements Serializable {
for (String word : s.split(" ")) {
lengths.add((double) word.length());
}
- return lengths;
+ return lengths.iterator();
});
Assert.assertEquals(5.0, doubles.first(), 0.01);
@@ -227,7 +227,7 @@ public class Java8APISuite implements Serializable {
// Regression test for SPARK-668:
JavaPairRDD<String, Integer> swapped =
- pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
+ pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()).iterator());
swapped.collect();
// There was never a bug here, but it's worth testing:
@@ -242,7 +242,7 @@ public class Java8APISuite implements Serializable {
while (iter.hasNext()) {
sum += iter.next();
}
- return Collections.singletonList(sum);
+ return Collections.singletonList(sum).iterator();
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
diff --git a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index 604d818ef1..67bc64a444 100644
--- a/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/external/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -29,6 +29,7 @@ import org.junit.Test;
import org.apache.spark.Accumulator;
import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
@@ -95,7 +96,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
while (in.hasNext()) {
out = out + in.next().toUpperCase();
}
- return Lists.newArrayList(out);
+ return Lists.newArrayList(out).iterator();
});
JavaTestUtils.attachTestOutputStream(mapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -351,7 +352,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList("a", "t", "h", "l", "e", "t", "i", "c", "s"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(s -> Lists.newArrayList(s.split("(?!^)")));
+ JavaDStream<String> flatMapped = stream.flatMap(
+ s -> Lists.newArrayList(s.split("(?!^)")).iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -360,8 +362,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
@Test
public void testForeachRDD() {
- final Accumulator<Integer> accumRdd = ssc.sc().accumulator(0);
- final Accumulator<Integer> accumEle = ssc.sc().accumulator(0);
+ final Accumulator<Integer> accumRdd = ssc.sparkContext().accumulator(0);
+ final Accumulator<Integer> accumEle = ssc.sparkContext().accumulator(0);
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,1,1),
Arrays.asList(1,1,1));
@@ -375,7 +377,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
});
// This is a test to make sure foreachRDD(VoidFunction2) can be called from Java
- stream.foreachRDD((rdd, time) -> null);
+ stream.foreachRDD((rdd, time) -> { return; });
JavaTestUtils.runStreams(ssc, 2, 2);
@@ -423,7 +425,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
for (String letter : s.split("(?!^)")) {
out.add(new Tuple2<>(s.length(), letter));
}
- return out;
+ return out.iterator();
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -541,7 +543,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
}
- return out;
+ return out.iterator();
});
JavaTestUtils.attachTestOutputStream(reversed);
@@ -598,7 +600,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
for (Character s : in._1().toCharArray()) {
out.add(new Tuple2<>(in._2(), s.toString()));
}
- return out;
+ return out.iterator();
});
JavaTestUtils.attachTestOutputStream(flatMapped);