diff options
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index 09491fe300..d46c7107c7 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -39,7 +39,6 @@ import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; - /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second starting with initial value of word count. @@ -65,17 +64,17 @@ public class JavaStatefulNetworkWordCount { StreamingExamples.setStreamingLogLevels(); // Update the cumulative count function - final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new - Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { - @Override - public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { - Integer newSum = state.or(0); - for (Integer value : values) { - newSum += value; - } - return Optional.of(newSum); - } - }; + final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = + new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + Integer newSum = state.or(0); + for (Integer value : values) { + newSum += value; + } + return Optional.of(newSum); + } + }; // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); @@ -97,12 +96,13 @@ public class JavaStatefulNetworkWordCount { } }); - JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(new PairFunction<String, String, Integer>() { - @Override - public Tuple2<String, Integer> call(String s) { - return new Tuple2<String, Integer>(s, 1); - } - }); + JavaPairDStream<String, Integer> wordsDstream = words.mapToPair( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }); // This will give a Dstream made of state (which is the cumulative count of the words) JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction, |