diff options
author | Soumitra Kumar <kumar.soumitra@gmail.com> | 2014-11-12 12:25:31 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-11-12 12:25:31 -0800 |
commit | 36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a (patch) | |
tree | 08841ec5510053f2788c3f4597ffef25767455fe /examples/src | |
parent | 4b736dbab3e177e5265439d37063bb501657d830 (diff) | |
download | spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.gz spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.tar.bz2 spark-36ddeb7bf83ac5a1af9d3db07ad4c380777e4d1a.zip |
[SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
SPARK-3660 : Initial RDD for updateStateByKey transformation
I have added a sample StatefulNetworkWordCountWithInitial inspired by StatefulNetworkWordCount.
Please let me know if any changes are required.
Author: Soumitra Kumar <kumar.soumitra@gmail.com>
Closes #2665 from soumitrak/master and squashes the following commits:
ee8980b [Soumitra Kumar] Fixed copy/paste issue.
304f636 [Soumitra Kumar] Added simpler version of updateStateByKey API with initialRDD and test.
9781135 [Soumitra Kumar] Fixed test, and renamed variable.
3da51a2 [Soumitra Kumar] Adding updateStateByKey with initialRDD API to JavaPairDStream.
2f78f7e [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
d4fdd18 [Soumitra Kumar] Renamed variable and moved method.
d0ce2cd [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
31399a4 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
4efa58b [Soumitra Kumar] [SPARK-3660][STREAMING] Initial RDD for updateStateByKey transformation
8f40ca0 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
dde4271 [Soumitra Kumar] Merge remote-tracking branch 'upstream/master'
fdd7db3 [Soumitra Kumar] Adding support of initial value for state update. SPARK-3660 : Initial RDD for updateStateByKey transformation
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index a4d159bf38..514252b89e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -18,12 +18,13 @@ package org.apache.spark.examples.streaming import org.apache.spark.SparkConf +import org.apache.spark.HashPartitioner import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every - * second. + * second starting with initial value of word count. * Usage: StatefulNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive * data. @@ -51,11 +52,18 @@ object StatefulNetworkWordCount { Some(currentCount + previousCount) } + val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { + iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) + } + val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") + // Initial RDD input to updateStateByKey + val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) + // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) @@ -64,7 +72,8 @@ object StatefulNetworkWordCount { // Update the cumulative count using updateStateByKey // This will give a Dstream made of state (which is the cumulative count of the words) - val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) + val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, + new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() |