aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
committerSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
commit649e9d0f5b2d5fc13f2dd5be675331510525927f (patch)
treecc500b373fda20ef42243c199ecfb6f381310abb
parent5936bf9fa85ccf7f0216145356140161c2801682 (diff)
downloadspark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <sowen@cloudera.com> Closes #10413 from srowen/SPARK-3369.
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java3
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java3
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java3
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java3
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala20
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java24
-rw-r--r--docs/streaming-programming-guide.md4
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java18
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java7
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java9
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java11
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java5
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java2
-rw-r--r--extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java9
-rw-r--r--project/MimaExcludes.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java25
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala2
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java20
30 files changed, 146 insertions, 110 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 279639af5d..07aebb75e8 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -25,5 +25,5 @@ import java.util.Iterator;
* Datasets.
*/
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
- Iterable<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
+ Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 57fd0a7a80..576087b6f4 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that returns zero or more records of type Double from each input record.
*/
public interface DoubleFlatMapFunction<T> extends Serializable {
- public Iterable<Double> call(T t) throws Exception;
+ Iterator<Double> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index ef0d182412..2d8ea6d1a5 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that returns zero or more output records from each input record.
*/
public interface FlatMapFunction<T, R> extends Serializable {
- Iterable<R> call(T t) throws Exception;
+ Iterator<R> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index 14a98a38ef..fc97b63f82 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that takes two inputs and returns zero or more output records.
*/
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
- Iterable<R> call(T1 t1, T2 t2) throws Exception;
+ Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index d7a80e7b12..bae574ab57 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
* A function that returns zero or more output records from each grouping key and its values.
*/
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
- Iterable<R> call(K key, Iterator<V> values) throws Exception;
+ Iterator<R> call(K key, Iterator<V> values) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index 6cb569ce0c..cf9945a215 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
* Base interface for function used in Dataset's mapPartitions.
*/
public interface MapPartitionsFunction<T, U> extends Serializable {
- Iterable<U> call(Iterator<T> input) throws Exception;
+ Iterator<U> call(Iterator<T> input) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 691ef2eceb..51eed2e67b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -18,6 +18,7 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
import scala.Tuple2;
@@ -26,5 +27,5 @@ import scala.Tuple2;
* key-value pairs are represented as scala.Tuple2 objects.
*/
public interface PairFlatMapFunction<T, K, V> extends Serializable {
- public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+ Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 0f8d13cf5c..7340defabf 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
- def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
- def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
@@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
@@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
@@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
- (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala
+ (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
}
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 44d5cac7c2..8117ad9e60 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -880,8 +880,8 @@ public class JavaAPISuite implements Serializable {
"The quick brown fox jumps over the lazy dog."));
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
}
});
Assert.assertEquals("Hello", words.first());
@@ -890,12 +890,12 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
new PairFlatMapFunction<String, String, String>() {
@Override
- public Iterable<Tuple2<String, String>> call(String s) {
+ public Iterator<Tuple2<String, String>> call(String s) {
List<Tuple2<String, String>> pairs = new LinkedList<>();
for (String word : s.split(" ")) {
pairs.add(new Tuple2<>(word, word));
}
- return pairs;
+ return pairs.iterator();
}
}
);
@@ -904,12 +904,12 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
@Override
- public Iterable<Double> call(String s) {
+ public Iterator<Double> call(String s) {
List<Double> lengths = new LinkedList<>();
for (String word : s.split(" ")) {
lengths.add((double) word.length());
}
- return lengths;
+ return lengths.iterator();
}
});
Assert.assertEquals(5.0, doubles.first(), 0.01);
@@ -930,8 +930,8 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
- public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
- return Collections.singletonList(item.swap());
+ public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
+ return Collections.singletonList(item.swap()).iterator();
}
});
swapped.collect();
@@ -951,12 +951,12 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> partitionSums = rdd.mapPartitions(
new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
- public Iterable<Integer> call(Iterator<Integer> iter) {
+ public Iterator<Integer> call(Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
- return Collections.singletonList(sum);
+ return Collections.singletonList(sum).iterator();
}
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
@@ -1367,8 +1367,8 @@ public class JavaAPISuite implements Serializable {
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
@Override
- public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
- return Arrays.asList(Iterators.size(i), Iterators.size(s));
+ public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
+ return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
}
};
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 93c34efb66..7e681b67cf 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -165,8 +165,8 @@ space into words.
// Split each line into words
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
- @Override public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
+ @Override public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
}
});
{% endhighlight %}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index a5db8accdf..635fb6a373 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,10 @@
package org.apache.spark.examples;
-
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.regex.Pattern;
import scala.Tuple2;
@@ -32,11 +35,6 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-import java.util.regex.Pattern;
-
/**
* Computes the PageRank of URLs from an input file. Input file should
* be in format of:
@@ -108,13 +106,13 @@ public final class JavaPageRank {
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
- public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
+ public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
int urlCount = Iterables.size(s._1);
- List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
+ List<Tuple2<String, Double>> results = new ArrayList<>();
for (String n : s._1) {
- results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
+ results.add(new Tuple2<>(n, s._2() / urlCount));
}
- return results;
+ return results.iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 9a6a944f7e..d746a3d2b6 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -27,6 +27,7 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -46,8 +47,8 @@ public final class JavaWordCount {
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) {
- return Arrays.asList(SPACE.split(s));
+ public Iterator<String> call(String s) {
+ return Arrays.asList(SPACE.split(s)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 62e563380a..cf774667f6 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
+import java.util.Iterator;
import scala.Tuple2;
@@ -120,8 +121,8 @@ public class JavaActorWordCount {
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) {
- return Arrays.asList(s.split("\\s+"));
+ public Iterator<String> call(String s) {
+ return Arrays.asList(s.split("\\s+")).iterator();
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 4b50fbf59f..3d668adcf8 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -17,7 +17,6 @@
package org.apache.spark.examples.streaming;
-import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import org.apache.spark.SparkConf;
@@ -37,6 +36,8 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
/**
@@ -74,8 +75,8 @@ public class JavaCustomReceiver extends Receiver<String> {
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index f9a5e7f69f..5107500a12 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -20,11 +20,11 @@ package org.apache.spark.examples.streaming;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
import scala.Tuple2;
-import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
@@ -87,8 +87,8 @@ public final class JavaDirectKafkaWordCount {
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 337f8ffb5b..0df4cb40a9 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -17,20 +17,19 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
-
import scala.Tuple2;
-import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -88,8 +87,8 @@ public final class JavaKafkaWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 3e9f0f4b8f..b82b319acb 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -17,8 +17,11 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
import scala.Tuple2;
-import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -31,8 +34,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import java.util.regex.Pattern;
-
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
@@ -67,8 +68,8 @@ public final class JavaNetworkWordCount {
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bc963a02be..bc8cbcdef7 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -21,11 +21,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import scala.Tuple2;
-import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.spark.Accumulator;
@@ -138,8 +138,8 @@ public final class JavaRecoverableNetworkWordCount {
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 084f68a8be..f0228f5e63 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -17,10 +17,10 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
-import com.google.common.collect.Lists;
-
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
@@ -72,8 +72,8 @@ public final class JavaSqlNetworkWordCount {
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index f52cc7c205..6beab90f08 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -73,8 +74,8 @@ public class JavaStatefulNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
index d869768026..f0ae9a99ba 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
@@ -34,6 +34,7 @@ import scala.Tuple2;
import twitter4j.Status;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
/**
@@ -70,8 +71,8 @@ public class JavaTwitterHashTagJoinSentiments {
JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
@Override
- public Iterable<String> call(Status s) {
- return Arrays.asList(s.getText().split(" "));
+ public Iterator<String> call(Status s) {
+ return Arrays.asList(s.getText().split(" ")).iterator();
}
});
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index 27d494ce35..c0b58e713f 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -294,7 +294,7 @@ public class Java8APISuite implements Serializable {
sizeS += 1;
s.next();
}
- return Arrays.asList(sizeI, sizeS);
+ return Arrays.asList(sizeI, sizeS).iterator();
};
JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index 06e0ff28af..64e044aa8e 100644
--- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -16,7 +16,10 @@
*/
package org.apache.spark.examples.streaming;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -38,7 +41,6 @@ import scala.Tuple2;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-import com.google.common.collect.Lists;
/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
@@ -154,8 +156,9 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr
// Convert each line of Array[Byte] to String, and split into words
JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() {
@Override
- public Iterable<String> call(byte[] line) {
- return Lists.newArrayList(WORD_SEPARATOR.split(new String(line)));
+ public Iterator<String> call(byte[] line) {
+ String s = new String(line, StandardCharsets.UTF_8);
+ return Arrays.asList(WORD_SEPARATOR.split(s)).iterator();
}
});
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 501456b043..643bee6969 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -61,6 +61,37 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
) ++
Seq(
+ // SPARK-3369 Fix Iterable/Iterator in Java API
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.DoubleFlatMapFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction2.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.FlatMapFunction2.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.PairFlatMapFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.PairFlatMapFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.CoGroupFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.CoGroupFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.MapPartitionsFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.MapPartitionsFunction.call"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.function.FlatMapGroupsFunction.call"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.function.FlatMapGroupsFunction.call")
+ ) ++
+ Seq(
// SPARK-4819 replace Guava Optional
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getCheckpointDir"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getSparkHome"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index bd99c39957..f182270a08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -346,7 +346,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
- val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).iterator.asScala
+ val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).asScala
mapPartitions(func)(encoder)
}
@@ -366,7 +366,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
- val func: (T) => Iterable[U] = x => f.call(x).asScala
+ val func: (T) => Iterator[U] = x => f.call(x).asScala
flatMap(func)(encoder)
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 3c0f25a5dc..a6fb62c17d 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -111,24 +111,24 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> parMapped = ds.mapPartitions(new MapPartitionsFunction<String, String>() {
@Override
- public Iterable<String> call(Iterator<String> it) throws Exception {
- List<String> ls = new LinkedList<String>();
+ public Iterator<String> call(Iterator<String> it) {
+ List<String> ls = new LinkedList<>();
while (it.hasNext()) {
- ls.add(it.next().toUpperCase());
+ ls.add(it.next().toUpperCase(Locale.ENGLISH));
}
- return ls;
+ return ls.iterator();
}
}, Encoders.STRING());
Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList());
Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) throws Exception {
- List<String> ls = new LinkedList<String>();
+ public Iterator<String> call(String s) {
+ List<String> ls = new LinkedList<>();
for (char c : s.toCharArray()) {
ls.add(String.valueOf(c));
}
- return ls;
+ return ls.iterator();
}
}, Encoders.STRING());
Assert.assertEquals(
@@ -192,12 +192,12 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> flatMapped = grouped.flatMapGroups(
new FlatMapGroupsFunction<Integer, String, String>() {
@Override
- public Iterable<String> call(Integer key, Iterator<String> values) throws Exception {
+ public Iterator<String> call(Integer key, Iterator<String> values) {
StringBuilder sb = new StringBuilder(key.toString());
while (values.hasNext()) {
sb.append(values.next());
}
- return Collections.singletonList(sb.toString());
+ return Collections.singletonList(sb.toString()).iterator();
}
},
Encoders.STRING());
@@ -228,10 +228,7 @@ public class JavaDatasetSuite implements Serializable {
grouped2,
new CoGroupFunction<Integer, String, Integer, String>() {
@Override
- public Iterable<String> call(
- Integer key,
- Iterator<String> left,
- Iterator<Integer> right) throws Exception {
+ public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Integer> right) {
StringBuilder sb = new StringBuilder(key.toString());
while (left.hasNext()) {
sb.append(left.next());
@@ -240,7 +237,7 @@ public class JavaDatasetSuite implements Serializable {
while (right.hasNext()) {
sb.append(right.next());
}
- return Collections.singletonList(sb.toString());
+ return Collections.singletonList(sb.toString()).iterator();
}
},
Encoders.STRING());
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a791a474c6..f10de485d0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -166,8 +166,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
new JavaDStream(dstream.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -176,8 +175,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* and then flattening the results
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- import scala.collection.JavaConverters._
- def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = fakeClassTag
new JavaPairDStream(dstream.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -189,7 +187,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDStream(dstream.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -202,7 +200,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2])
: JavaPairDStream[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaPairDStream(dstream.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1dfb4e7abc..db79eeab9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -550,7 +550,7 @@ abstract class DStream[T: ClassTag] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
- def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
+ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 96a444a7ba..d60a617978 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming.{Duration, Time}
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
- flatMapFunc: T => Traversable[U]
+ flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index 4dbcef2934..806cea24ca 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -271,12 +271,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> mapped = stream.mapPartitions(
new FlatMapFunction<Iterator<String>, String>() {
@Override
- public Iterable<String> call(Iterator<String> in) {
+ public Iterator<String> call(Iterator<String> in) {
StringBuilder out = new StringBuilder();
while (in.hasNext()) {
out.append(in.next().toUpperCase(Locale.ENGLISH));
}
- return Arrays.asList(out.toString());
+ return Arrays.asList(out.toString()).iterator();
}
});
JavaTestUtils.attachTestOutputStream(mapped);
@@ -759,8 +759,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split("(?!^)"));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split("(?!^)")).iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -846,12 +846,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = stream.flatMapToPair(
new PairFlatMapFunction<String, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(String in) {
+ public Iterator<Tuple2<Integer, String>> call(String in) {
List<Tuple2<Integer, String>> out = new ArrayList<>();
for (String letter: in.split("(?!^)")) {
out.add(new Tuple2<>(in.length(), letter));
}
- return out;
+ return out.iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
@@ -1019,13 +1019,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> reversed = pairStream.mapPartitionsToPair(
new PairFlatMapFunction<Iterator<Tuple2<String, Integer>>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
+ public Iterator<Tuple2<Integer, String>> call(Iterator<Tuple2<String, Integer>> in) {
List<Tuple2<Integer, String>> out = new LinkedList<>();
while (in.hasNext()) {
Tuple2<String, Integer> next = in.next();
out.add(next.swap());
}
- return out;
+ return out.iterator();
}
});
@@ -1089,12 +1089,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaPairDStream<Integer, String> flatMapped = pairStream.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
- public Iterable<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
+ public Iterator<Tuple2<Integer, String>> call(Tuple2<String, Integer> in) {
List<Tuple2<Integer, String>> out = new LinkedList<>();
for (Character s : in._1().toCharArray()) {
out.add(new Tuple2<>(in._2(), s.toString()));
}
- return out;
+ return out.iterator();
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);