aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
committerSean Owen <sowen@cloudera.com>2016-01-26 11:55:28 +0000
commit649e9d0f5b2d5fc13f2dd5be675331510525927f (patch)
treecc500b373fda20ef42243c199ecfb6f381310abb /examples
parent5936bf9fa85ccf7f0216145356140161c2801682 (diff)
downloadspark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.gz
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.tar.bz2
spark-649e9d0f5b2d5fc13f2dd5be675331510525927f.zip
[SPARK-3369][CORE][STREAMING] Java mapPartitions Iterator->Iterable is inconsistent with Scala's Iterator->Iterator
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable. CC rxin pwendell for API change; tdas since it also touches streaming. Author: Sean Owen <sowen@cloudera.com> Closes #10413 from srowen/SPARK-3369.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java18
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java7
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java9
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java11
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java5
11 files changed, 44 insertions, 41 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index a5db8accdf..635fb6a373 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,10 @@
package org.apache.spark.examples;
-
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+import java.util.regex.Pattern;
import scala.Tuple2;
@@ -32,11 +35,6 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Iterator;
-import java.util.regex.Pattern;
-
/**
* Computes the PageRank of URLs from an input file. Input file should
* be in format of:
@@ -108,13 +106,13 @@ public final class JavaPageRank {
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
- public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
+ public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
int urlCount = Iterables.size(s._1);
- List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
+ List<Tuple2<String, Double>> results = new ArrayList<>();
for (String n : s._1) {
- results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
+ results.add(new Tuple2<>(n, s._2() / urlCount));
}
- return results;
+ return results.iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 9a6a944f7e..d746a3d2b6 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -27,6 +27,7 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -46,8 +47,8 @@ public final class JavaWordCount {
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) {
- return Arrays.asList(SPACE.split(s));
+ public Iterator<String> call(String s) {
+ return Arrays.asList(SPACE.split(s)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 62e563380a..cf774667f6 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
+import java.util.Iterator;
import scala.Tuple2;
@@ -120,8 +121,8 @@ public class JavaActorWordCount {
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String s) {
- return Arrays.asList(s.split("\\s+"));
+ public Iterator<String> call(String s) {
+ return Arrays.asList(s.split("\\s+")).iterator();
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 4b50fbf59f..3d668adcf8 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -17,7 +17,6 @@
package org.apache.spark.examples.streaming;
-import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import org.apache.spark.SparkConf;
@@ -37,6 +36,8 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
/**
@@ -74,8 +75,8 @@ public class JavaCustomReceiver extends Receiver<String> {
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index f9a5e7f69f..5107500a12 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -20,11 +20,11 @@ package org.apache.spark.examples.streaming;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
import scala.Tuple2;
-import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
@@ -87,8 +87,8 @@ public final class JavaDirectKafkaWordCount {
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 337f8ffb5b..0df4cb40a9 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -17,20 +17,19 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
-
import scala.Tuple2;
-import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -88,8 +87,8 @@ public final class JavaKafkaWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 3e9f0f4b8f..b82b319acb 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -17,8 +17,11 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
import scala.Tuple2;
-import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -31,8 +34,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import java.util.regex.Pattern;
-
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
@@ -67,8 +68,8 @@ public final class JavaNetworkWordCount {
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index bc963a02be..bc8cbcdef7 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -21,11 +21,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import scala.Tuple2;
-import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.spark.Accumulator;
@@ -138,8 +138,8 @@ public final class JavaRecoverableNetworkWordCount {
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 084f68a8be..f0228f5e63 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -17,10 +17,10 @@
package org.apache.spark.examples.streaming;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.regex.Pattern;
-import com.google.common.collect.Lists;
-
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
@@ -72,8 +72,8 @@ public final class JavaSqlNetworkWordCount {
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index f52cc7c205..6beab90f08 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -73,8 +74,8 @@ public class JavaStatefulNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> call(String x) {
- return Arrays.asList(SPACE.split(x));
+ public Iterator<String> call(String x) {
+ return Arrays.asList(SPACE.split(x)).iterator();
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
index d869768026..f0ae9a99ba 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java
@@ -34,6 +34,7 @@ import scala.Tuple2;
import twitter4j.Status;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
/**
@@ -70,8 +71,8 @@ public class JavaTwitterHashTagJoinSentiments {
JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
@Override
- public Iterable<String> call(Status s) {
- return Arrays.asList(s.getText().split(" "));
+ public Iterator<String> call(Status s) {
+ return Arrays.asList(s.getText().split(" ")).iterator();
}
});