aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
committerSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
commit649e9d0f5b2d5fc13f2dd5be675331510525927f (patch)
treecc500b373fda20ef42243c199ecfb6f381310abb /core
parent5936bf9fa85ccf7f0216145356140161c2801682 (diff)
downloadspark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <sowen@cloudera.com> Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java3
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java3
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java3
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java2
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java3
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala20
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java24
9 files changed, 33 insertions, 29 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 279639af5d..07aebb75e8 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -25,5 +25,5 @@ import java.util.Iterator;
* Datasets.
*/
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
- Iterable<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
+ Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 57fd0a7a80..576087b6f4 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that returns zero or more records of type Double from each input record.
*/
public interface DoubleFlatMapFunction<T> extends Serializable {
- public Iterable<Double> call(T t) throws Exception;
+ Iterator<Double> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index ef0d182412..2d8ea6d1a5 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that returns zero or more output records from each input record.
*/
public interface FlatMapFunction<T, R> extends Serializable {
- Iterable<R> call(T t) throws Exception;
+ Iterator<R> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index 14a98a38ef..fc97b63f82 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
/**
* A function that takes two inputs and returns zero or more output records.
*/
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
- Iterable<R> call(T1 t1, T2 t2) throws Exception;
+ Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index d7a80e7b12..bae574ab57 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
* A function that returns zero or more output records from each grouping key and its values.
*/
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
- Iterable<R> call(K key, Iterator<V> values) throws Exception;
+ Iterator<R> call(K key, Iterator<V> values) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index 6cb569ce0c..cf9945a215 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -24,5 +24,5 @@ import java.util.Iterator;
* Base interface for function used in Dataset's mapPartitions.
*/
public interface MapPartitionsFunction<T, U> extends Serializable {
- Iterable<U> call(Iterator<T> input) throws Exception;
+ Iterator<U> call(Iterator<T> input) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 691ef2eceb..51eed2e67b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -18,6 +18,7 @@
package org.apache.spark.api.java.function;
import java.io.Serializable;
+import java.util.Iterator;
import scala.Tuple2;
@@ -26,5 +27,5 @@ import scala.Tuple2;
* key-value pairs are represented as scala.Tuple2 objects.
*/
public interface PairFlatMapFunction<T, K, V> extends Serializable {
- public Iterable<Tuple2<K, V>> call(T t) throws Exception;
+ Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 0f8d13cf5c..7340defabf 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
- def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
- def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
+ def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
@@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
@@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
@@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
- (x: Iterator[T]) => f.call(x.asJava).iterator().asScala
+ (x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
@@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
- (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala
+ (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
}
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 44d5cac7c2..8117ad9e60 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -880,8 +880,8 @@ public class JavaAPISuite implements Serializable {
"The quick brown fox jumps over the lazy dog."));
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(x.split(" "));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
}
});
Assert.assertEquals("Hello", words.first());
@@ -890,12 +890,12 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
new PairFlatMapFunction<String, String, String>() {
@Override
- public Iterable<Tuple2<String, String>> call(String s) {
+ public Iterator<Tuple2<String, String>> call(String s) {
List<Tuple2<String, String>> pairs = new LinkedList<>();
for (String word : s.split(" ")) {
pairs.add(new Tuple2<>(word, word));
}
- return pairs;
+ return pairs.iterator();
}
}
);
@@ -904,12 +904,12 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
@Override
- public Iterable<Double> call(String s) {
+ public Iterator<Double> call(String s) {
List<Double> lengths = new LinkedList<>();
for (String word : s.split(" ")) {
lengths.add((double) word.length());
}
- return lengths;
+ return lengths.iterator();
}
});
Assert.assertEquals(5.0, doubles.first(), 0.01);
@@ -930,8 +930,8 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
- public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
- return Collections.singletonList(item.swap());
+ public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
+ return Collections.singletonList(item.swap()).iterator();
}
});
swapped.collect();
@@ -951,12 +951,12 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> partitionSums = rdd.mapPartitions(
new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
- public Iterable<Integer> call(Iterator<Integer> iter) {
+ public Iterator<Integer> call(Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
- return Collections.singletonList(sum);
+ return Collections.singletonList(sum).iterator();
}
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
@@ -1367,8 +1367,8 @@ public class JavaAPISuite implements Serializable {
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
@Override
- public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
- return Arrays.asList(Iterators.size(i), Iterators.size(s));
+ public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
+ return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
}
};