aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/java
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
committerSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
commit649e9d0f5b2d5fc13f2dd5be675331510525927f (patch)
treecc500b373fda20ef42243c199ecfb6f381310abb /sql/core/src/test/java
parent5936bf9fa85ccf7f0216145356140161c2801682 (diff)
downloadspark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <sowen@cloudera.com> Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'sql/core/src/test/java')
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java25
1 files changed, 11 insertions, 14 deletions
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 3c0f25a5dc..a6fb62c17d 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -111,24 +111,24 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> parMapped = ds.mapPartitions(new MapPartitionsFunction<String, String>() {
@Override
- public Iterable<String> call(Iterator<String> it) throws Exception {
- List<String> ls = new LinkedList<String>();
+ public Iterator<String> call(Iterator<String> it) {
+ List<String> ls = new LinkedList<>();
while (it.hasNext()) {
- ls.add(it.next().toUpperCase());
+ ls.add(it.next().toUpperCase(Locale.ENGLISH));
}
- return ls;
+ return ls.iterator();
}
}, Encoders.STRING());
Assert.assertEquals(Arrays.asList("HELLO", "WORLD"), parMapped.collectAsList());
Dataset<String> flatMapped = ds.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) throws Exception {
- List<String> ls = new LinkedList<String>();
+ public Iterator<String> call(String s) {
+ List<String> ls = new LinkedList<>();
for (char c : s.toCharArray()) {
ls.add(String.valueOf(c));
}
- return ls;
+ return ls.iterator();
}
}, Encoders.STRING());
Assert.assertEquals(
@@ -192,12 +192,12 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> flatMapped = grouped.flatMapGroups(
new FlatMapGroupsFunction<Integer, String, String>() {
@Override
- public Iterable<String> call(Integer key, Iterator<String> values) throws Exception {
+ public Iterator<String> call(Integer key, Iterator<String> values) {
StringBuilder sb = new StringBuilder(key.toString());
while (values.hasNext()) {
sb.append(values.next());
}
- return Collections.singletonList(sb.toString());
+ return Collections.singletonList(sb.toString()).iterator();
}
},
Encoders.STRING());
@@ -228,10 +228,7 @@ public class JavaDatasetSuite implements Serializable {
grouped2,
new CoGroupFunction<Integer, String, Integer, String>() {
@Override
- public Iterable<String> call(
- Integer key,
- Iterator<String> left,
- Iterator<Integer> right) throws Exception {
+ public Iterator<String> call(Integer key, Iterator<String> left, Iterator<Integer> right) {
StringBuilder sb = new StringBuilder(key.toString());
while (left.hasNext()) {
sb.append(left.next());
@@ -240,7 +237,7 @@ public class JavaDatasetSuite implements Serializable {
while (right.hasNext()) {
sb.append(right.next());
}
- return Collections.singletonList(sb.toString());
+ return Collections.singletonList(sb.toString()).iterator();
}
},
Encoders.STRING());