diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-01-14 17:22:03 -0700 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-01-14 17:22:03 -0700 |
commit | c203a292963a018bd9b84f02bb522fd191a110af (patch) | |
tree | bbdc51f3755eb8165d83a9bf7bd092c4b72231c7 | |
parent | 82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (diff) | |
download | spark-c203a292963a018bd9b84f02bb522fd191a110af.tar.gz spark-c203a292963a018bd9b84f02bb522fd191a110af.tar.bz2 spark-c203a292963a018bd9b84f02bb522fd191a110af.zip |
StateDStream changes to give updateStateByKey consistent behavior
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index a1ec2f5454..4e57968eed 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -48,8 +48,16 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife //logDebug("Generating state RDD for time " + validTime) return Some(stateRDD) } - case None => { // If parent RDD does not exist, then return old state RDD - return Some(prevStateRDD) + case None => { // If parent RDD does not exist + + // Re-apply the update function to the old state RDD + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, S)]) => { + val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) + updateFuncLocal(i) + } + val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) + return Some(stateRDD) } } } |