aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-17 22:37:56 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-17 22:37:56 -0800
commit12b72b3e73798a5a2cc6c745610e135b1d6825a6 (patch)
treeebd2b59a0ddd2b9cd3b9037b85520119277ab043 /examples/src
parentc46dd2de78ae0c13060d0a9d2dea110c655659f0 (diff)
downloadspark-12b72b3e73798a5a2cc6c745610e135b1d6825a6.tar.gz
spark-12b72b3e73798a5a2cc6c745610e135b1d6825a6.tar.bz2
spark-12b72b3e73798a5a2cc6c745610e135b1d6825a6.zip
NetworkWordCount example
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java62
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala2
2 files changed, 63 insertions, 1 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java
new file mode 100644
index 0000000000..4299febfd6
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java
@@ -0,0 +1,62 @@
+package spark.streaming.examples;
+
+import com.google.common.collect.Lists;
+import scala.Tuple2;
+import spark.api.java.function.FlatMapFunction;
+import spark.api.java.function.Function2;
+import spark.api.java.function.PairFunction;
+import spark.streaming.Duration;
+import spark.streaming.api.java.JavaDStream;
+import spark.streaming.api.java.JavaPairDStream;
+import spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: NetworkWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
+ */
+public class JavaNetworkWordCount {
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1");
+ System.exit(1);
+ }
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(
+ args[0], "NetworkWordCount", new Duration(1000));
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2]));
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split(" "));
+ }
+ });
+ JavaPairDStream<String, Integer> wordCounts = words.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 43c01d5db2..32f7d57bea 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -22,7 +22,7 @@ object NetworkWordCount {
System.exit(1)
}
- // Create the context and set the batch size
+ // Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
// Create a NetworkInputDStream on target ip:port and count the