diff options
Diffstat (limited to 'examples/src/main')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala (renamed from examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala) | 23 |
1 files changed, 6 insertions, 17 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala index db62246387..b662cb1162 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala +++ b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -5,36 +5,31 @@ import spark.streaming.StreamingContext._ /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCumulativeCountUpdateStateByKey <master> <hostname> <port> + * Usage: StatefulNetworkWordCount <master> <hostname> <port> * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run spark.streaming.examples.NetworkWordCumulativeCountUpdateStateByKey local[2] localhost 9999` + * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` */ -object NetworkWordCumulativeCountUpdateStateByKey { +object StatefulNetworkWordCount { private def className[A](a: A)(implicit m: Manifest[A]) = m.toString def main(args: Array[String]) { if (args.length < 3) { - System.err.println("Usage: NetworkWordCountUpdateStateByKey <master> <hostname> <port>\n" + + System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) - //println("currentCount: " + currentCount) val previousCount = state.getOrElse(0) - //println("previousCount: " + previousCount) - val cumulative = Some(currentCount + previousCount) - //println("Cumulative: " + cumulative) - - cumulative + Some(currentCount + previousCount) } // Create the context with a 10 second batch size @@ -51,13 +46,7 @@ object NetworkWordCumulativeCountUpdateStateByKey { // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) - - stateDstream.foreach(rdd => { - rdd.foreach(rddVal => { - println("Current Count: " + rddVal) - }) - }) - + stateDstream.print() ssc.start() } } |