diff options
author | Sean Owen <sowen@cloudera.com> | 2016-01-26 11:55:28 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-01-26 11:55:28 +0000 |
commit | 649e9d0f5b2d5fc13f2dd5be675331510525927f (patch) | |
tree | cc500b373fda20ef42243c199ecfb6f381310abb /extras | |
parent | 5936bf9fa85ccf7f0216145356140161c2801682 (diff) | |
download | spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2 spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip |
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable.
CC rxin pwendell for API change; tdas since it also touches streaming.
Author: Sean Owen <sowen@cloudera.com>
Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'extras')
-rw-r--r-- | extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java | 2 | ||||
-rw-r--r-- | extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java | 9 |
2 files changed, 7 insertions, 4 deletions
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java index 27d494ce35..c0b58e713f 100644 --- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java +++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java @@ -294,7 +294,7 @@ public class Java8APISuite implements Serializable { sizeS += 1; s.next(); } - return Arrays.asList(sizeI, sizeS); + return Arrays.asList(sizeI, sizeS).iterator(); }; JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn); Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString()); diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index 06e0ff28af..64e044aa8e 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -16,7 +16,10 @@ */ package org.apache.spark.examples.streaming; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -38,7 +41,6 @@ import scala.Tuple2; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.google.common.collect.Lists; /** * Consumes messages from a Amazon Kinesis streams and does wordcount. @@ -154,8 +156,9 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr // Convert each line of Array[Byte] to String, and split into words JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { @Override - public Iterable<String> call(byte[] line) { - return Lists.newArrayList(WORD_SEPARATOR.split(new String(line))); + public Iterator<String> call(byte[] line) { + String s = new String(line, StandardCharsets.UTF_8); + return Arrays.asList(WORD_SEPARATOR.split(s)).iterator(); } }); |