diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-29 17:57:53 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-06-29 17:57:53 -0700 |
commit | ccfe953a4db25c920157554a2cd820f8afb41ca3 (patch) | |
tree | 983dee966a3fc61553490636b7299a351a3ca621 /examples | |
parent | 5cfcd3c336cc13e9fd448ae122216e4b583b77b4 (diff) | |
parent | cbf6a5ee1e7d290d04a0c5dac78d360266d415a4 (diff) | |
download | spark-ccfe953a4db25c920157554a2cd820f8afb41ca3.tar.gz spark-ccfe953a4db25c920157554a2cd820f8afb41ca3.tar.bz2 spark-ccfe953a4db25c920157554a2cd820f8afb41ca3.zip |
Merge pull request #577 from skumargithub/master
Example of cumulative counting using updateStateByKey
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala new file mode 100644 index 0000000000..51c3c9f9b4 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -0,0 +1,50 @@ +package spark.streaming.examples + +import spark.streaming._ +import spark.streaming.StreamingContext._ + +/** + * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: StatefulNetworkWordCount <master> <hostname> <port> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` + */ +object StatefulNetworkWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" + + "In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) + + val previousCount = state.getOrElse(0) + + Some(currentCount + previousCount) + } + + // Create the context with a 1 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + ssc.checkpoint(".") + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(args(1), args(2).toInt) + val words = lines.flatMap(_.split(" ")) + val wordDstream = words.map(x => (x, 1)) + + // Update the cumulative count using updateStateByKey + // This will give a Dstream made of state (which is the cumulative count of the words) + val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) + stateDstream.print() + ssc.start() + } +} |