From 649e9d0f5b2d5fc13f2dd5be675331510525927f Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 26 Jan 2016 11:55:28 +0000 Subject: [SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen Closes #10413 from srowen/SPARK-3369. --- .../spark/api/java/function/CoGroupFunction.java | 2 +- .../api/java/function/DoubleFlatMapFunction.java | 3 ++- .../spark/api/java/function/FlatMapFunction.java | 3 ++- .../spark/api/java/function/FlatMapFunction2.java | 3 ++- .../api/java/function/FlatMapGroupsFunction.java | 2 +- .../api/java/function/MapPartitionsFunction.java | 2 +- .../api/java/function/PairFlatMapFunction.java | 3 ++- .../org/apache/spark/api/java/JavaRDDLike.scala | 20 +++++++++--------- .../test/java/org/apache/spark/JavaAPISuite.java | 24 +++++++++++----------- 9 files changed, 33 insertions(+), 29 deletions(-) (limited to 'core') diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java index 279639af5d..07aebb75e8 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java @@ -25,5 +25,5 @@ import java.util.Iterator; * Datasets. */ public interface CoGroupFunction extends Serializable { - Iterable call(K key, Iterator left, Iterator right) throws Exception; + Iterator call(K key, Iterator left, Iterator right) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java index 57fd0a7a80..576087b6f4 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java @@ -18,10 +18,11 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; /** * A function that returns zero or more records of type Double from each input record. */ public interface DoubleFlatMapFunction extends Serializable { - public Iterable call(T t) throws Exception; + Iterator call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java index ef0d182412..2d8ea6d1a5 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java @@ -18,10 +18,11 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; /** * A function that returns zero or more output records from each input record. */ public interface FlatMapFunction extends Serializable { - Iterable call(T t) throws Exception; + Iterator call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java index 14a98a38ef..fc97b63f82 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java @@ -18,10 +18,11 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; /** * A function that takes two inputs and returns zero or more output records. */ public interface FlatMapFunction2 extends Serializable { - Iterable call(T1 t1, T2 t2) throws Exception; + Iterator call(T1 t1, T2 t2) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java index d7a80e7b12..bae574ab57 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java @@ -24,5 +24,5 @@ import java.util.Iterator; * A function that returns zero or more output records from each grouping key and its values. */ public interface FlatMapGroupsFunction extends Serializable { - Iterable call(K key, Iterator values) throws Exception; + Iterator call(K key, Iterator values) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java index 6cb569ce0c..cf9945a215 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java @@ -24,5 +24,5 @@ import java.util.Iterator; * Base interface for function used in Dataset's mapPartitions. */ public interface MapPartitionsFunction extends Serializable { - Iterable call(Iterator input) throws Exception; + Iterator call(Iterator input) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java index 691ef2eceb..51eed2e67b 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java @@ -18,6 +18,7 @@ package org.apache.spark.api.java.function; import java.io.Serializable; +import java.util.Iterator; import scala.Tuple2; @@ -26,5 +27,5 @@ import scala.Tuple2; * key-value pairs are represented as scala.Tuple2 objects. */ public interface PairFlatMapFunction extends Serializable { - public Iterable> call(T t) throws Exception; + Iterator> call(T t) throws Exception; } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 0f8d13cf5c..7340defabf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * RDD, and then flattening the results. */ def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = { - def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U]) } @@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * RDD, and then flattening the results. */ def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { - def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue())) } @@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * RDD, and then flattening the results. */ def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala + def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]] JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2]) } @@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U]) } @@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaRDD.fromRDD( rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U]) @@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue())) } @@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2]) } @@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) .map(x => x.doubleValue())) @@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { - (x: Iterator[T]) => f.call(x.asJava).iterator().asScala + (x: Iterator[T]) => f.call(x.asJava).asScala } JavaPairRDD.fromRDD( rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2]) @@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = { def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { - (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala + (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala } JavaRDD.fromRDD( rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V]) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 44d5cac7c2..8117ad9e60 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -880,8 +880,8 @@ public class JavaAPISuite implements Serializable { "The quick brown fox jumps over the lazy dog.")); JavaRDD words = rdd.flatMap(new FlatMapFunction() { @Override - public Iterable call(String x) { - return Arrays.asList(x.split(" ")); + public Iterator call(String x) { + return Arrays.asList(x.split(" ")).iterator(); } }); Assert.assertEquals("Hello", words.first()); @@ -890,12 +890,12 @@ public class JavaAPISuite implements Serializable { JavaPairRDD pairsRDD = rdd.flatMapToPair( new PairFlatMapFunction() { @Override - public Iterable> call(String s) { + public Iterator> call(String s) { List> pairs = new LinkedList<>(); for (String word : s.split(" ")) { pairs.add(new Tuple2<>(word, word)); } - return pairs; + return pairs.iterator(); } } ); @@ -904,12 +904,12 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction() { @Override - public Iterable call(String s) { + public Iterator call(String s) { List lengths = new LinkedList<>(); for (String word : s.split(" ")) { lengths.add((double) word.length()); } - return lengths; + return lengths.iterator(); } }); Assert.assertEquals(5.0, doubles.first(), 0.01); @@ -930,8 +930,8 @@ public class JavaAPISuite implements Serializable { JavaPairRDD swapped = pairRDD.flatMapToPair( new PairFlatMapFunction, String, Integer>() { @Override - public Iterable> call(Tuple2 item) { - return Collections.singletonList(item.swap()); + public Iterator> call(Tuple2 item) { + return Collections.singletonList(item.swap()).iterator(); } }); swapped.collect(); @@ -951,12 +951,12 @@ public class JavaAPISuite implements Serializable { JavaRDD partitionSums = rdd.mapPartitions( new FlatMapFunction, Integer>() { @Override - public Iterable call(Iterator iter) { + public Iterator call(Iterator iter) { int sum = 0; while (iter.hasNext()) { sum += iter.next(); } - return Collections.singletonList(sum); + return Collections.singletonList(sum).iterator(); } }); Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); @@ -1367,8 +1367,8 @@ public class JavaAPISuite implements Serializable { FlatMapFunction2, Iterator, Integer> sizesFn = new FlatMapFunction2, Iterator, Integer>() { @Override - public Iterable call(Iterator i, Iterator s) { - return Arrays.asList(Iterators.size(i), Iterators.size(s)); + public Iterator call(Iterator i, Iterator s) { + return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator(); } }; -- cgit v1.2.3