diff options
Diffstat (limited to 'examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java')
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java | 91 |
1 files changed, 31 insertions, 60 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index acbc345243..45a876decf 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -18,10 +18,8 @@ package org.apache.spark.examples.streaming; import java.io.File; -import java.io.IOException; import java.nio.charset.Charset; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; @@ -30,12 +28,10 @@ import scala.Tuple2; import com.google.common.io.Files; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; @@ -120,7 +116,7 @@ public final class JavaRecoverableNetworkWordCount { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint System.out.println("Creating new context"); - final File outputFile = new File(outputPath); + File outputFile = new File(outputPath); if (outputFile.exists()) { outputFile.delete(); } @@ -132,52 +128,31 @@ public final class JavaRecoverableNetworkWordCount { // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port); - JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(SPACE.split(x)).iterator(); - } - }); - JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<>(s, 1); - } - }).reduceByKey(new Function2<Integer, Integer, Integer>() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; + JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); + JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); + + wordCounts.foreachRDD((rdd, time) -> { + // Get or register the blacklist Broadcast + Broadcast<List<String>> blacklist = + JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); + // Get or register the droppedWordsCounter Accumulator + LongAccumulator droppedWordsCounter = + JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); + // Use blacklist to drop words and use droppedWordsCounter to count them + String counts = rdd.filter(wordCount -> { + if (blacklist.value().contains(wordCount._1())) { + droppedWordsCounter.add(wordCount._2()); + return false; + } else { + return true; } - }); - - wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() { - @Override - public void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException { - // Get or register the blacklist Broadcast - final Broadcast<List<String>> blacklist = - JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context())); - // Get or register the droppedWordsCounter Accumulator - final LongAccumulator droppedWordsCounter = - JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context())); - // Use blacklist to drop words and use droppedWordsCounter to count them - String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { - @Override - public Boolean call(Tuple2<String, Integer> wordCount) { - if (blacklist.value().contains(wordCount._1())) { - droppedWordsCounter.add(wordCount._2()); - return false; - } else { - return true; - } - } - }).collect().toString(); - String output = "Counts at time " + time + " " + counts; - System.out.println(output); - System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); - System.out.println("Appending to " + outputFile.getAbsolutePath()); - Files.append(output + "\n", outputFile, Charset.defaultCharset()); - } + }).collect().toString(); + String output = "Counts at time " + time + " " + counts; + System.out.println(output); + System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally"); + System.out.println("Appending to " + outputFile.getAbsolutePath()); + Files.append(output + "\n", outputFile, Charset.defaultCharset()); }); return ssc; @@ -198,19 +173,15 @@ public final class JavaRecoverableNetworkWordCount { System.exit(1); } - final String ip = args[0]; - final int port = Integer.parseInt(args[1]); - final String checkpointDirectory = args[2]; - final String outputPath = args[3]; + String ip = args[0]; + int port = Integer.parseInt(args[1]); + String checkpointDirectory = args[2]; + String outputPath = args[3]; // Function to create JavaStreamingContext without any output operations // (used to detect the new context) - Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() { - @Override - public JavaStreamingContext call() { - return createContext(ip, port, checkpointDirectory, outputPath); - } - }; + Function0<JavaStreamingContext> createContextFunc = + () -> createContext(ip, port, checkpointDirectory, outputPath); JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc); |