aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java115
1 files changed, 115 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
new file mode 100644
index 0000000000..09491fe300
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import scala.Tuple2;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.StorageLevels;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+
+/**
+ * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
+ * second starting with initial value of word count.
+ * Usage: JavaStatefulNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ * data.
+ * <p/>
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example
+ * org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999`
+ */
+public class JavaStatefulNetworkWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Update the cumulative count function
+ final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new
+ Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ Integer newSum = state.or(0);
+ for (Integer value : values) {
+ newSum += value;
+ }
+ return Optional.of(newSum);
+ }
+ };
+
+ // Create the context with a 1 second batch size
+ SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount");
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+ ssc.checkpoint(".");
+
+ // Initial RDD input to updateStateByKey
+ List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1),
+ new Tuple2<String, Integer>("world", 1));
+ JavaPairRDD<String, Integer> initialRDD = ssc.sc().parallelizePairs(tuples);
+
+ JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
+ args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2);
+
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+
+ JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ });
+
+ // This will give a Dstream made of state (which is the cumulative count of the words)
+ JavaPairDStream<String, Integer> stateDstream = wordsDstream.updateStateByKey(updateFunction,
+ new HashPartitioner(ssc.sc().defaultParallelism()), initialRDD);
+
+ stateDstream.print();
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}