From 0dc1e2d60f89f07f54e0985d37cdcd32ad388f6a Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 22 Apr 2013 09:22:45 -0600 Subject: Examaple of cumulative counting using updateStateByKey --- ...etworkWordCumulativeCountUpdateStateByKey.scala | 63 ++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala (limited to 'examples') diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala new file mode 100644 index 0000000000..db62246387 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala @@ -0,0 +1,63 @@ +package spark.streaming.examples + +import spark.streaming._ +import spark.streaming.StreamingContext._ + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCumulativeCountUpdateStateByKey + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run spark.streaming.examples.NetworkWordCumulativeCountUpdateStateByKey local[2] localhost 9999` + */ +object NetworkWordCumulativeCountUpdateStateByKey { + private def className[A](a: A)(implicit m: Manifest[A]) = m.toString + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: NetworkWordCountUpdateStateByKey \n" + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) + //println("currentCount: " + currentCount) + + val previousCount = state.getOrElse(0) + //println("previousCount: " + previousCount) + + val cumulative = Some(currentCount + previousCount) + //println("Cumulative: " + cumulative) + + cumulative + } + + // Create the context with a 10 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(10), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + ssc.checkpoint(".") + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(args(1), args(2).toInt) + val words = lines.flatMap(_.split(" ")) + val wordDstream = words.map(x => (x, 1)) + + // Update the cumulative count using updateStateByKey + // This will give a Dstream made of state (which is the cumulative count of the words) + val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) + + stateDstream.foreach(rdd => { + rdd.foreach(rddVal => { + println("Current Count: " + rddVal) + }) + }) + + ssc.start() + } +} -- cgit v1.2.3 From 1d54401d7e41095d8cbeeefd42c9d39ee500cd9f Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 30 Apr 2013 23:01:32 -0600 Subject: Modified as per TD's suggestions --- ...etworkWordCumulativeCountUpdateStateByKey.scala | 63 ---------------------- .../examples/StatefulNetworkWordCount.scala | 52 ++++++++++++++++++ 2 files changed, 52 insertions(+), 63 deletions(-) delete mode 100644 examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala create mode 100644 examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala (limited to 'examples') diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala deleted file mode 100644 index db62246387..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala +++ /dev/null @@ -1,63 +0,0 @@ -package spark.streaming.examples - -import spark.streaming._ -import spark.streaming.StreamingContext._ - -/** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCumulativeCountUpdateStateByKey - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and describe the TCP server that Spark Streaming would connect to receive data. - * - * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run spark.streaming.examples.NetworkWordCumulativeCountUpdateStateByKey local[2] localhost 9999` - */ -object NetworkWordCumulativeCountUpdateStateByKey { - private def className[A](a: A)(implicit m: Manifest[A]) = m.toString - - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCountUpdateStateByKey \n" + - "In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - - val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.foldLeft(0)(_ + _) - //println("currentCount: " + currentCount) - - val previousCount = state.getOrElse(0) - //println("previousCount: " + previousCount) - - val cumulative = Some(currentCount + previousCount) - //println("Cumulative: " + cumulative) - - cumulative - } - - // Create the context with a 10 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(10), - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - ssc.checkpoint(".") - - // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt) - val words = lines.flatMap(_.split(" ")) - val wordDstream = words.map(x => (x, 1)) - - // Update the cumulative count using updateStateByKey - // This will give a Dstream made of state (which is the cumulative count of the words) - val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) - - stateDstream.foreach(rdd => { - rdd.foreach(rddVal => { - println("Current Count: " + rddVal) - }) - }) - - ssc.start() - } -} diff --git a/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala new file mode 100644 index 0000000000..b662cb1162 --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -0,0 +1,52 @@ +package spark.streaming.examples + +import spark.streaming._ +import spark.streaming.StreamingContext._ + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: StatefulNetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` + */ +object StatefulNetworkWordCount { + private def className[A](a: A)(implicit m: Manifest[A]) = m.toString + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: StatefulNetworkWordCount \n" + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) + + val previousCount = state.getOrElse(0) + + Some(currentCount + previousCount) + } + + // Create the context with a 10 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(10), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + ssc.checkpoint(".") + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(args(1), args(2).toInt) + val words = lines.flatMap(_.split(" ")) + val wordDstream = words.map(x => (x, 1)) + + // Update the cumulative count using updateStateByKey + // This will give a Dstream made of state (which is the cumulative count of the words) + val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) + stateDstream.print() + ssc.start() + } +} -- cgit v1.2.3 From cbf6a5ee1e7d290d04a0c5dac78d360266d415a4 Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 6 May 2013 08:05:45 -0600 Subject: Removed unused code, clarified intent of the program, batch size to 1 second --- .../scala/spark/streaming/examples/StatefulNetworkWordCount.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'examples') diff --git a/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala index b662cb1162..51c3c9f9b4 100644 --- a/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -4,7 +4,7 @@ import spark.streaming._ import spark.streaming.StreamingContext._ /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every second. * Usage: StatefulNetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. @@ -15,8 +15,6 @@ import spark.streaming.StreamingContext._ * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` */ object StatefulNetworkWordCount { - private def className[A](a: A)(implicit m: Manifest[A]) = m.toString - def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: StatefulNetworkWordCount \n" + @@ -32,8 +30,8 @@ object StatefulNetworkWordCount { Some(currentCount + previousCount) } - // Create the context with a 10 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(10), + // Create the context with a 1 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", Seconds(1), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) ssc.checkpoint(".") -- cgit v1.2.3