diff options
Diffstat (limited to 'examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java')
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java | 25 |
1 files changed, 3 insertions, 22 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index 7a8fe99f48..b217672def 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -18,15 +18,11 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.Iterator; import java.util.regex.Pattern; import scala.Tuple2; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; @@ -66,24 +62,9 @@ public final class JavaNetworkWordCount { // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); wordCounts.print(); ssc.start(); |