diff options
author | Sean Owen <sowen@cloudera.com> | 2016-01-26 11:55:28 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-01-26 11:55:28 +0000 |
commit | 649e9d0f5b2d5fc13f2dd5be675331510525927f (patch) | |
tree | cc500b373fda20ef42243c199ecfb6f381310abb /extras/kinesis-asl/src/main | |
parent | 5936bf9fa85ccf7f0216145356140161c2801682 (diff) | |
download | spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2 spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip |
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable.
CC rxin pwendell for API change; tdas since it also touches streaming.
Author: Sean Owen <sowen@cloudera.com>
Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'extras/kinesis-asl/src/main')
-rw-r--r-- | extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java index 06e0ff28af..64e044aa8e 100644 --- a/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java +++ b/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java @@ -16,7 +16,10 @@ */ package org.apache.spark.examples.streaming; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -38,7 +41,6 @@ import scala.Tuple2; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; -import com.google.common.collect.Lists; /** * Consumes messages from a Amazon Kinesis streams and does wordcount. @@ -154,8 +156,9 @@ public final class JavaKinesisWordCountASL { // needs to be public for access fr // Convert each line of Array[Byte] to String, and split into words JavaDStream<String> words = unionStreams.flatMap(new FlatMapFunction<byte[], String>() { @Override - public Iterable<String> call(byte[] line) { - return Lists.newArrayList(WORD_SEPARATOR.split(new String(line))); + public Iterator<String> call(byte[] line) { + String s = new String(line, StandardCharsets.UTF_8); + return Arrays.asList(WORD_SEPARATOR.split(s)).iterator(); } }); |