aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorSoumitra Kumar <kumar.soumitra@gmail.com>2014-11-12 12:25:31 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-12 12:25:31 -0800
commit36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a (patch)
tree08841ec5510053f2788c3f4597ffef25767455fe /examples/src
parent4b736dbab3e177e5265439d37063bb501657d830 (diff)
downloadspark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.gz
spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.bz2
spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.zip
[SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
SPARK-3660 : Initial RDD for updateStateByKey transformation I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount. Please let me know if any changes are required. Author: Soumitra Kumar <kumar.soumitra@gmail.com> Closes #2665 from soumitrak/master and squashes the following commits: ee8980b [Soumitra Kumar] Fixed copy/paste issue. 304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test. 9781135 [Soumitra Kumar] Fixed test, and renamed variable. 3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream. 2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' d4fdd18 [Soumitra Kumar] Renamed variable and moved method. d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' 4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation 8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master' fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala13
1 files changed, 11 insertions, 2 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index a4d159bf38..514252b89e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -18,12 +18,13 @@
package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
+import org.apache.spark.HashPartitioner
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
/**
* Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
- * second.
+ * second starting with initial value of word count.
* Usage: StatefulNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
* data.
@@ -51,11 +52,18 @@ object StatefulNetworkWordCount {
Some(currentCount + previousCount)
}
+ val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
+ iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
+ }
+
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
+ // Initial RDD input to updateStateByKey
+ val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
+
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
@@ -64,7 +72,8 @@ object StatefulNetworkWordCount {
// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
- val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
+ val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
+ new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
stateDstream.print()
ssc.start()
ssc.awaitTermination()