diff options
author | unknown <skumar@SKUMAR01.Trueffect.local> | 2013-04-30 23:01:32 -0600 |
---|---|---|
committer | unknown <skumar@SKUMAR01.Trueffect.local> | 2013-04-30 23:01:32 -0600 |
commit | 1d54401d7e41095d8cbeeefd42c9d39ee500cd9f (patch) | |
tree | 6ba0b82cefcf4442aaae2ba63c41998a89959b85 /examples/src/main | |
parent | 0dc1e2d60f89f07f54e0985d37cdcd32ad388f6a (diff) | |
download | spark-1d54401d7e41095d8cbeeefd42c9d39ee500cd9f.tar.gz spark-1d54401d7e41095d8cbeeefd42c9d39ee500cd9f.tar.bz2 spark-1d54401d7e41095d8cbeeefd42c9d39ee500cd9f.zip |
Modified as per TD's suggestions
Diffstat (limited to 'examples/src/main')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala (renamed from examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala) | 23 |
1 files changed, 6 insertions, 17 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala index db62246387..b662cb1162 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala +++ b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -5,36 +5,31 @@ import spark.streaming.StreamingContext._ /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCumulativeCountUpdateStateByKey <master> <hostname> <port> + * Usage: StatefulNetworkWordCount <master> <hostname> <port> * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run spark.streaming.examples.NetworkWordCumulativeCountUpdateStateByKey local[2] localhost 9999` + * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` */ -object NetworkWordCumulativeCountUpdateStateByKey { +object StatefulNetworkWordCount { private def className[A](a: A)(implicit m: Manifest[A]) = m.toString def main(args: Array[String]) { if (args.length < 3) { - System.err.println("Usage: NetworkWordCountUpdateStateByKey <master> <hostname> <port>\n" + + System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" + "In local mode, <master> should be 'local[n]' with n > 1") System.exit(1) } val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) - //println("currentCount: " + currentCount) val previousCount = state.getOrElse(0) - //println("previousCount: " + previousCount) - val cumulative = Some(currentCount + previousCount) - //println("Cumulative: " + cumulative) - - cumulative + Some(currentCount + previousCount) } // Create the context with a 10 second batch size @@ -51,13 +46,7 @@ object NetworkWordCumulativeCountUpdateStateByKey { // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) - - stateDstream.foreach(rdd => { - rdd.foreach(rddVal => { - println("Current Count: " + rddVal) - }) - }) - + stateDstream.print() ssc.start() } } |