aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorunknown <skumar@SKUMAR01.Trueffect.local>2013-04-22 09:22:45 -0600
committerunknown <skumar@SKUMAR01.Trueffect.local>2013-04-22 09:22:45 -0600
commit0dc1e2d60f89f07f54e0985d37cdcd32ad388f6a (patch)
tree38428f24a7d997abfbc8d56e95569f0abbaae471 /examples
parent17e076de800ea0d4c55f2bd657348641f6f9c55b (diff)
downloadspark-0dc1e2d60f89f07f54e0985d37cdcd32ad388f6a.tar.gz
spark-0dc1e2d60f89f07f54e0985d37cdcd32ad388f6a.tar.bz2
spark-0dc1e2d60f89f07f54e0985d37cdcd32ad388f6a.zip
Examaple of cumulative counting using updateStateByKey
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala63
1 files changed, 63 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala
new file mode 100644
index 0000000000..db62246387
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala
@@ -0,0 +1,63 @@
+package spark.streaming.examples
+
+import spark.streaming._
+import spark.streaming.StreamingContext._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: NetworkWordCumulativeCountUpdateStateByKey <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.NetworkWordCumulativeCountUpdateStateByKey local[2] localhost 9999`
+ */
+object NetworkWordCumulativeCountUpdateStateByKey {
+ private def className[A](a: A)(implicit m: Manifest[A]) = m.toString
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCountUpdateStateByKey <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ val currentCount = values.foldLeft(0)(_ + _)
+ //println("currentCount: " + currentCount)
+
+ val previousCount = state.getOrElse(0)
+ //println("previousCount: " + previousCount)
+
+ val cumulative = Some(currentCount + previousCount)
+ //println("Cumulative: " + cumulative)
+
+ cumulative
+ }
+
+ // Create the context with a 10 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(10),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ ssc.checkpoint(".")
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(args(1), args(2).toInt)
+ val words = lines.flatMap(_.split(" "))
+ val wordDstream = words.map(x => (x, 1))
+
+ // Update the cumulative count using updateStateByKey
+ // This will give a Dstream made of state (which is the cumulative count of the words)
+ val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
+
+ stateDstream.foreach(rdd => {
+ rdd.foreach(rddVal => {
+ println("Current Count: " + rddVal)
+ })
+ })
+
+ ssc.start()
+ }
+}