aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-01-14 17:22:03 -0700
committerseanm <sean.mcnamara@webtrends.com>2013-01-14 17:22:03 -0700
commitc203a292963a018bd9b84f02bb522fd191a110af (patch)
treebbdc51f3755eb8165d83a9bf7bd092c4b72231c7 /streaming
parent82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (diff)
downloadspark-c203a292963a018bd9b84f02bb522fd191a110af.tar.gz
spark-c203a292963a018bd9b84f02bb522fd191a110af.tar.bz2
spark-c203a292963a018bd9b84f02bb522fd191a110af.zip
StateDStream changes to give updateStateByKey consistent behavior
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala12
1 files changed, 10 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
index a1ec2f5454..4e57968eed 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala
@@ -48,8 +48,16 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
//logDebug("Generating state RDD for time " + validTime)
return Some(stateRDD)
}
- case None => { // If parent RDD does not exist, then return old state RDD
- return Some(prevStateRDD)
+ case None => { // If parent RDD does not exist
+
+ // Re-apply the update function to the old state RDD
+ val updateFuncLocal = updateFunc
+ val finalFunc = (iterator: Iterator[(K, S)]) => {
+ val i = iterator.map(t => (t._1, Seq[V](), Option(t._2)))
+ updateFuncLocal(i)
+ }
+ val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning)
+ return Some(stateRDD)
}
}
}