diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-17 22:37:56 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-17 22:37:56 -0800 |
commit | 12b72b3e73798a5a2cc6c745610e135b1d6825a6 (patch) | |
tree | ebd2b59a0ddd2b9cd3b9037b85520119277ab043 /examples/src | |
parent | c46dd2de78ae0c13060d0a9d2dea110c655659f0 (diff) | |
download | spark-12b72b3e73798a5a2cc6c745610e135b1d6825a6.tar.gz spark-12b72b3e73798a5a2cc6c745610e135b1d6825a6.tar.bz2 spark-12b72b3e73798a5a2cc6c745610e135b1d6825a6.zip |
NetworkWordCount example
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java | 62 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala | 2 |
2 files changed, 63 insertions, 1 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java new file mode 100644 index 0000000000..4299febfd6 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java @@ -0,0 +1,62 @@ +package spark.streaming.examples; + +import com.google.common.collect.Lists; +import scala.Tuple2; +import spark.api.java.function.FlatMapFunction; +import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; +import spark.streaming.Duration; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` + */ +public class JavaNetworkWordCount { + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1"); + System.exit(1); + } + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext( + args[0], "NetworkWordCount", new Duration(1000)); + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2])); + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); + JavaPairDStream<String, Integer> wordCounts = words.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 43c01d5db2..32f7d57bea 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -22,7 +22,7 @@ object NetworkWordCount { System.exit(1) } - // Create the context and set the batch size + // Create the context with a 1 second batch size val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) // Create a NetworkInputDStream on target ip:port and count the |