aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorunknown <skumar@SKUMAR01.Trueffect.local>2013-04-30 23:01:32 -0600
committerunknown <skumar@SKUMAR01.Trueffect.local>2013-04-30 23:01:32 -0600
commit1d54401d7e41095d8cbeeefd42c9d39ee500cd9f (patch)
tree6ba0b82cefcf4442aaae2ba63c41998a89959b85 /examples
parent0dc1e2d60f89f07f54e0985d37cdcd32ad388f6a (diff)
downloadspark-1d54401d7e41095d8cbeeefd42c9d39ee500cd9f.tar.gz
spark-1d54401d7e41095d8cbeeefd42c9d39ee500cd9f.tar.bz2
spark-1d54401d7e41095d8cbeeefd42c9d39ee500cd9f.zip
Modified as per TD's suggestions
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala (renamed from examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala)23
1 files changed, 6 insertions, 17 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala
index db62246387..b662cb1162 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCumulativeCountUpdateStateByKey.scala
+++ b/examples/src/main/scala/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -5,36 +5,31 @@ import spark.streaming.StreamingContext._
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCumulativeCountUpdateStateByKey <master> <hostname> <port>
+ * Usage: StatefulNetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run spark.streaming.examples.NetworkWordCumulativeCountUpdateStateByKey local[2] localhost 9999`
+ * `$ ./run spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999`
*/
-object NetworkWordCumulativeCountUpdateStateByKey {
+object StatefulNetworkWordCount {
private def className[A](a: A)(implicit m: Manifest[A]) = m.toString
def main(args: Array[String]) {
if (args.length < 3) {
- System.err.println("Usage: NetworkWordCountUpdateStateByKey <master> <hostname> <port>\n" +
+ System.err.println("Usage: StatefulNetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
- //println("currentCount: " + currentCount)
val previousCount = state.getOrElse(0)
- //println("previousCount: " + previousCount)
- val cumulative = Some(currentCount + previousCount)
- //println("Cumulative: " + cumulative)
-
- cumulative
+ Some(currentCount + previousCount)
}
// Create the context with a 10 second batch size
@@ -51,13 +46,7 @@ object NetworkWordCumulativeCountUpdateStateByKey {
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
-
- stateDstream.foreach(rdd => {
- rdd.foreach(rddVal => {
- println("Current Count: " + rddVal)
- })
- })
-
+ stateDstream.print()
ssc.start()
}
}