diff options
author | Sean Owen <sowen@cloudera.com> | 2016-01-26 11:55:28 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-01-26 11:55:28 +0000 |
commit | 649e9d0f5b2d5fc13f2dd5be675331510525927f (patch) | |
tree | cc500b373fda20ef42243c199ecfb6f381310abb /sql/core | |
parent | 5936bf9fa85ccf7f0216145356140161c2801682 (diff) | |
download | spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2 spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip |
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable.
CC rxin pwendell for API change; tdas since it also touches streaming.
Author: Sean Owen <sowen@cloudera.com>
Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 | ||||
-rw-r--r-- | sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java | 25 |
2 files changed, 13 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index bd99c39957..f182270a08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -346,7 +346,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = { - val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).iterator.asScala + val func: (Iterator[T]) => Iterator[U] = x => f.call(x.asJava).asScala mapPartitions(func)(encoder) } @@ -366,7 +366,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = { - val func: (T) => Iterable[U] = x => f.call(x).asScala + val func: (T) => Iterator[U] = x => f.call(x).asScala flatMap(func)(encoder) } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 3c0f25a5dc..a6fb62c17d 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -111,24 +111,24 @@ public class JavaDatasetSuite implements Serializable { Dataset<String> parMapped = ds.mapPartitions(new MapPartitionsFunction<String, String>() { @Override - public Iterable<String> call(Iterator<String> it) throws Exception { - List<String> ls = new LinkedList<String>(); + public Iterator<String> call(Iterator<String> it) { + List<String> ls = new LinkedList<>(); while (it.hasNext()) { - ls.add(it.next().toUpperCase()); + ls.add(it.next().toUpperCase(Locale.ENGLISH)); } - return ls; + return ls.iterator(); } }, Encoders.STRING()); Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList()); Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, String>() { @Override - public Iterable<String> call(String s) throws Exception { - List<String> ls = new LinkedList<String>(); + public Iterator<String> call(String s) { + List<String> ls = new LinkedList<>(); for (char c : s.toCharArray()) { ls.add(String.valueOf(c)); } - return ls; + return ls.iterator(); } }, Encoders.STRING()); Assert.assertEquals( @@ -192,12 +192,12 @@ public class JavaDatasetSuite implements Serializable { Dataset<String> flatMapped = grouped.flatMapGroups( new FlatMapGroupsFunction<Integer, String, String>() { @Override - public Iterable<String> call(Integer key, Iterator<String> values) throws Exception { + public Iterator<String> call(Integer key, Iterator<String> values) { StringBuilder sb = new StringBuilder(key.toString()); while (values.hasNext()) { sb.append(values.next()); } - return Collections.singletonList(sb.toString()); + return Collections.singletonList(sb.toString()).iterator(); } }, Encoders.STRING()); @@ -228,10 +228,7 @@ public class JavaDatasetSuite implements Serializable { grouped2, new CoGroupFunction<Integer, String, Integer, String>() { @Override - public Iterable<String> call( - Integer key, - Iterator<String> left, - Iterator<Integer> right) throws Exception { + public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Integer> right) { StringBuilder sb = new StringBuilder(key.toString()); while (left.hasNext()) { sb.append(left.next()); @@ -240,7 +237,7 @@ public class JavaDatasetSuite implements Serializable { while (right.hasNext()) { sb.append(right.next()); } - return Collections.singletonList(sb.toString()); + return Collections.singletonList(sb.toString()).iterator(); } }, Encoders.STRING()); |