aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorBryan Cutler <bjcutler@us.ibm.com>2015-12-10 14:21:15 -0800
committerDavies Liu <davies.liu@gmail.com>2015-12-10 14:21:15 -0800
commit6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a (patch)
treea3b24c4b3c8682e6a4dfb659f600d7474636b9d3 /streaming
parent4a46b8859d3314b5b45a67cdc5c81fecb6e9e78c (diff)
downloadspark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.gz
spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.tar.bz2
spark-6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a.zip
[SPARK-11713] [PYSPARK] [STREAMING] Initial RDD updateStateByKey for PySpark
Adding ability to define an initial state RDD for use with updateStateByKey PySpark. Added unit test and changed stateful_network_wordcount example to use initial RDD. Author: Bryan Cutler <bjcutler@us.ibm.com> Closes #10082 from BryanCutler/initial-rdd-updateStateByKey-SPARK-11713.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala14
1 files changed, 12 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 994309ddd0..056248ccc7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -264,9 +264,19 @@ private[python] class PythonTransformed2DStream(
*/
private[python] class PythonStateDStream(
parent: DStream[Array[Byte]],
- reduceFunc: PythonTransformFunction)
+ reduceFunc: PythonTransformFunction,
+ initialRDD: Option[RDD[Array[Byte]]])
extends PythonDStream(parent, reduceFunc) {
+ def this(
+ parent: DStream[Array[Byte]],
+ reduceFunc: PythonTransformFunction) = this(parent, reduceFunc, None)
+
+ def this(
+ parent: DStream[Array[Byte]],
+ reduceFunc: PythonTransformFunction,
+ initialRDD: JavaRDD[Array[Byte]]) = this(parent, reduceFunc, Some(initialRDD.rdd))
+
super.persist(StorageLevel.MEMORY_ONLY)
override val mustCheckpoint = true
@@ -274,7 +284,7 @@ private[python] class PythonStateDStream(
val lastState = getOrCompute(validTime - slideDuration)
val rdd = parent.getOrCompute(validTime)
if (rdd.isDefined) {
- func(lastState, rdd, validTime)
+ func(lastState.orElse(initialRDD), rdd, validTime)
} else {
lastState
}