aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java91
1 files changed, 31 insertions, 60 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index acbc345243..45a876decf 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -18,10 +18,8 @@
package org.apache.spark.examples.streaming;
import java.io.File;
-import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -30,12 +28,10 @@ import scala.Tuple2;
import com.google.common.io.Files;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -120,7 +116,7 @@ public final class JavaRecoverableNetworkWordCount {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
System.out.println("Creating new context");
- final File outputFile = new File(outputPath);
+ File outputFile = new File(outputPath);
if (outputFile.exists()) {
outputFile.delete();
}
@@ -132,52 +128,31 @@ public final class JavaRecoverableNetworkWordCount {
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(SPACE.split(x)).iterator();
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override
- public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
+ JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
+ .reduceByKey((i1, i2) -> i1 + i2);
+
+ wordCounts.foreachRDD((rdd, time) -> {
+ // Get or register the blacklist Broadcast
+ Broadcast<List<String>> blacklist =
+ JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+ // Get or register the droppedWordsCounter Accumulator
+ LongAccumulator droppedWordsCounter =
+ JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ String counts = rdd.filter(wordCount -> {
+ if (blacklist.value().contains(wordCount._1())) {
+ droppedWordsCounter.add(wordCount._2());
+ return false;
+ } else {
+ return true;
}
- });
-
- wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
- @Override
- public void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
- // Get or register the blacklist Broadcast
- final Broadcast<List<String>> blacklist =
- JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
- // Get or register the droppedWordsCounter Accumulator
- final LongAccumulator droppedWordsCounter =
- JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
- // Use blacklist to drop words and use droppedWordsCounter to count them
- String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<String, Integer> wordCount) {
- if (blacklist.value().contains(wordCount._1())) {
- droppedWordsCounter.add(wordCount._2());
- return false;
- } else {
- return true;
- }
- }
- }).collect().toString();
- String output = "Counts at time " + time + " " + counts;
- System.out.println(output);
- System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
- System.out.println("Appending to " + outputFile.getAbsolutePath());
- Files.append(output + "\n", outputFile, Charset.defaultCharset());
- }
+ }).collect().toString();
+ String output = "Counts at time " + time + " " + counts;
+ System.out.println(output);
+ System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
+ System.out.println("Appending to " + outputFile.getAbsolutePath());
+ Files.append(output + "\n", outputFile, Charset.defaultCharset());
});
return ssc;
@@ -198,19 +173,15 @@ public final class JavaRecoverableNetworkWordCount {
System.exit(1);
}
- final String ip = args[0];
- final int port = Integer.parseInt(args[1]);
- final String checkpointDirectory = args[2];
- final String outputPath = args[3];
+ String ip = args[0];
+ int port = Integer.parseInt(args[1]);
+ String checkpointDirectory = args[2];
+ String outputPath = args[3];
// Function to create JavaStreamingContext without any output operations
// (used to detect the new context)
- Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
- @Override
- public JavaStreamingContext call() {
- return createContext(ip, port, checkpointDirectory, outputPath);
- }
- };
+ Function0<JavaStreamingContext> createContextFunc =
+ () -> createContext(ip, port, checkpointDirectory, outputPath);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);